Python数据分析与可视化:从数据到洞察

张开发
2026/4/11 8:02:44 15 分钟阅读

分享文章

Python数据分析与可视化:从数据到洞察
Python数据分析与可视化从数据到洞察1. 背景介绍数据分析与可视化是现代数据驱动决策的核心环节Python凭借其丰富的生态系统成为数据分析的首选工具。从数据采集、清洗、分析到可视化呈现Python提供了完整的工具链。本文将深入探讨Python数据分析与可视化的核心技术、库和实践方法帮助读者从原始数据中提取有价值的洞察。2. 核心概念与技术2.1 数据分析流程数据采集获取原始数据数据清洗处理缺失值、异常值数据转换数据格式转换、特征工程数据分析统计分析、机器学习数据可视化图表展示、交互式可视化结果呈现报告、仪表盘2.2 核心库库功能用途NumPy数值计算数组操作、数学运算Pandas数据处理数据框操作、数据清洗Matplotlib数据可视化基础图表绘制Seaborn统计可视化高级统计图表Plotly交互式可视化交互式图表、仪表盘Scikit-learn机器学习数据建模、预测分析Statsmodels统计分析统计模型、假设检验2.3 数据类型结构化数据表格数据、CSV、Excel半结构化数据JSON、XML非结构化数据文本、图像、音频时序数据时间序列、传感器数据3. 代码实现3.1 数据处理与分析import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from scipy import stats from sklearn.preprocessing import StandardScaler, MinMaxScaler class DataProcessor: 数据处理器 def __init__(self, dataNone): self.data data def load_data(self, file_path, file_typecsv): 加载数据 if file_type csv: self.data pd.read_csv(file_path) elif file_type excel: self.data pd.read_excel(file_path) elif file_type json: self.data pd.read_json(file_path) return self.data def basic_info(self): 基本信息 print(Data shape:, self.data.shape) print(\nColumns:, self.data.columns.tolist()) print(\nData types:) print(self.data.dtypes) print(\nMissing values:) print(self.data.isnull().sum()) print(\nDescriptive statistics:) print(self.data.describe()) def handle_missing_values(self, strategymean): 处理缺失值 numeric_cols self.data.select_dtypes(include[np.number]).columns categorical_cols self.data.select_dtypes(include[object]).columns if strategy mean: self.data[numeric_cols] self.data[numeric_cols].fillna(self.data[numeric_cols].mean()) elif strategy median: self.data[numeric_cols] self.data[numeric_cols].fillna(self.data[numeric_cols].median()) elif strategy mode: for col in numeric_cols: self.data[col] self.data[col].fillna(self.data[col].mode()[0]) # 类别型数据填充 for col in categorical_cols: self.data[col] self.data[col].fillna(self.data[col].mode()[0]) return self.data def handle_outliers(self, methodiqr): 处理异常值 numeric_cols self.data.select_dtypes(include[np.number]).columns for col in numeric_cols: if method iqr: Q1 self.data[col].quantile(0.25) Q3 self.data[col].quantile(0.75) IQR Q3 - Q1 lower_bound Q1 - 1.5 * IQR upper_bound Q3 1.5 * IQR # 截断异常值 self.data[col] self.data[col].clip(lower_bound, upper_bound) elif method zscore: z_scores np.abs(stats.zscore(self.data[col])) threshold 3 self.data.loc[z_scores threshold, col] self.data[col].median() return self.data def feature_engineering(self): 特征工程 # 示例创建新特征 if date in self.data.columns: self.data[date] pd.to_datetime(self.data[date]) self.data[year] self.data[date].dt.year self.data[month] self.data[date].dt.month self.data[day] self.data[date].dt.day self.data[day_of_week] self.data[date].dt.dayofweek self.data[is_weekend] (self.data[day_of_week] 5).astype(int) # 示例创建交互特征 numeric_cols self.data.select_dtypes(include[np.number]).columns if len(numeric_cols) 2: for i in range(len(numeric_cols)): for j in range(i 1, len(numeric_cols)): col1, col2 numeric_cols[i], numeric_cols[j] self.data[f{col1}_x_{col2}] self.data[col1] * self.data[col2] self.data[f{col1}_div_{col2}] self.data[col1] / (self.data[col2] 1e-8) return self.data def normalize_features(self, methodstandard): 特征归一化 numeric_cols self.data.select_dtypes(include[np.number]).columns if method standard: scaler StandardScaler() elif method minmax: scaler MinMaxScaler() self.data[numeric_cols] scaler.fit_transform(self.data[numeric_cols]) return self.data # 使用示例 if __name__ __main__: processor DataProcessor() # 加载数据 data processor.load_data(data.csv) # 基本信息 processor.basic_info() # 处理缺失值 processor.handle_missing_values(strategymedian) # 处理异常值 processor.handle_outliers(methodiqr) # 特征工程 processor.feature_engineering() # 归一化 processor.normalize_features(methodstandard) print(\nProcessed data shape:, processor.data.shape)3.2 数据可视化import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import pandas as pd import numpy as np class DataVisualizer: 数据可视化工具 def __init__(self, data): self.data data # 设置样式 sns.set(stylewhitegrid) plt.rcParams[figure.figsize] (12, 8) def histograms(self, columnsNone, bins30): 直方图 if columns is None: columns self.data.select_dtypes(include[np.number]).columns n_cols 2 n_rows (len(columns) n_cols - 1) // n_cols fig, axes plt.subplots(n_rows, n_cols, figsize(15, 5 * n_rows)) axes axes.flatten() for i, col in enumerate(columns): sns.histplot(self.data[col], binsbins, axaxes[i]) axes[i].set_title(fHistogram of {col}) axes[i].set_xlabel(col) axes[i].set_ylabel(Frequency) plt.tight_layout() plt.show() def box_plots(self, columnsNone): 箱线图 if columns is None: columns self.data.select_dtypes(include[np.number]).columns n_cols 2 n_rows (len(columns) n_cols - 1) // n_cols fig, axes plt.subplots(n_rows, n_cols, figsize(15, 5 * n_rows)) axes axes.flatten() for i, col in enumerate(columns): sns.boxplot(yself.data[col], axaxes[i]) axes[i].set_title(fBox Plot of {col}) axes[i].set_ylabel(col) plt.tight_layout() plt.show() def scatter_plots(self, x_col, y_colsNone): 散点图 if y_cols is None: y_cols [col for col in self.data.select_dtypes(include[np.number]).columns if col ! x_col] n_cols 2 n_rows (len(y_cols) n_cols - 1) // n_cols fig, axes plt.subplots(n_rows, n_cols, figsize(15, 5 * n_rows)) axes axes.flatten() for i, y_col in enumerate(y_cols): sns.scatterplot(xx_col, yy_col, dataself.data, axaxes[i]) axes[i].set_title(fScatter Plot: {x_col} vs {y_col}) axes[i].set_xlabel(x_col) axes[i].set_ylabel(y_col) plt.tight_layout() plt.show() def correlation_heatmap(self): 相关性热力图 numeric_cols self.data.select_dtypes(include[np.number]).columns corr_matrix self.data[numeric_cols].corr() plt.figure(figsize(12, 10)) sns.heatmap(corr_matrix, annotTrue, cmapcoolwarm, fmt.2f, squareTrue) plt.title(Correlation Heatmap) plt.tight_layout() plt.show() def time_series_plot(self, date_col, value_cols): 时间序列图 # 确保日期列是datetime类型 self.data[date_col] pd.to_datetime(self.data[date_col]) self.data self.data.sort_values(date_col) plt.figure(figsize(15, 8)) for col in value_cols: sns.lineplot(xdate_col, ycol, dataself.data, labelcol) plt.title(Time Series Analysis) plt.xlabel(Date) plt.ylabel(Value) plt.legend() plt.tight_layout() plt.show() def interactive_scatter(self, x_col, y_col, color_colNone, size_colNone): 交互式散点图 fig px.scatter( self.data, xx_col, yy_col, colorcolor_col, sizesize_col, hover_dataself.data.columns.tolist(), titlefInteractive Scatter: {x_col} vs {y_col} ) fig.show() def interactive_histogram(self, column, nbins30): 交互式直方图 fig px.histogram( self.data, xcolumn, nbinsnbins, titlefInteractive Histogram of {column}, marginalbox ) fig.show() def dashboard(self): 创建交互式仪表盘 numeric_cols self.data.select_dtypes(include[np.number]).columns # 创建子图 fig make_subplots( rows2, cols2, subplot_titles( Correlation Heatmap, Top 10 Values, Distribution of Values, Time Series Trend ) ) # 相关性热力图 corr_matrix self.data[numeric_cols].corr() fig.add_trace( go.Heatmap( zcorr_matrix.values, xcorr_matrix.columns, ycorr_matrix.index, colorscaleRdBu_r ), row1, col1 ) # 前10个值 if value in self.data.columns: top10 self.data.nlargest(10, value) fig.add_trace( go.Bar( xtop10.index, ytop10[value], marker_colorskyblue ), row1, col2 ) # 值的分布 if len(numeric_cols) 0: fig.add_trace( go.Histogram( xself.data[numeric_cols[0]], nbinsx30, marker_colorcoral ), row2, col1 ) # 时间序列趋势 if date in self.data.columns: self.data[date] pd.to_datetime(self.data[date]) fig.add_trace( go.Scatter( xself.data[date], yself.data[numeric_cols[0]] if len(numeric_cols) 0 else [], modelines, linedict(colorgreen) ), row2, col2 ) fig.update_layout(height800, width1000, title_textData Dashboard) fig.show() # 使用示例 if __name__ __main__: # 创建示例数据 np.random.seed(42) dates pd.date_range(2020-01-01, 2023-12-31, freqD) data pd.DataFrame({ date: dates, value: np.random.normal(100, 20, len(dates)), category: np.random.choice([A, B, C], len(dates)), quantity: np.random.randint(1, 100, len(dates)) }) visualizer DataVisualizer(data) # 基本图表 visualizer.histograms(columns[value, quantity]) visualizer.box_plots(columns[value, quantity]) visualizer.scatter_plots(quantity, [value]) visualizer.correlation_heatmap() visualizer.time_series_plot(date, [value]) # 交互式图表 visualizer.interactive_scatter(quantity, value, color_colcategory, size_colquantity) visualizer.interactive_histogram(value) # 仪表盘 visualizer.dashboard()3.3 高级分析import pandas as pd import numpy as np from sklearn.cluster import KMeans from sklearn.decomposition import PCA from sklearn.linear_model import LinearRegression from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score import statsmodels.api as sm class AdvancedAnalyzer: 高级分析工具 def __init__(self, data): self.data data def linear_regression(self, target_col, feature_cols): 线性回归分析 X self.data[feature_cols] y self.data[target_col] # 划分训练集和测试集 X_train, X_test, y_train, y_test train_test_split(X, y, test_size0.2, random_state42) # 训练模型 model LinearRegression() model.fit(X_train, y_train) # 预测 y_pred model.predict(X_test) # 评估 mse mean_squared_error(y_test, y_pred) r2 r2_score(y_test, y_pred) print(fLinear Regression Results:) print(fMean Squared Error: {mse:.4f}) print(fR-squared: {r2:.4f}) print(fCoefficients:) for i, col in enumerate(feature_cols): print(f {col}: {model.coef_[i]:.4f}) print(fIntercept: {model.intercept_:.4f}) return model, mse, r2 def time_series_analysis(self, time_col, value_col): 时间序列分析 # 确保时间列是datetime类型 self.data[time_col] pd.to_datetime(self.data[time_col]) self.data self.data.sort_values(time_col) # 设置时间索引 ts self.data.set_index(time_col)[value_col] # 基本统计 print(fTime Series Analysis:) print(fMean: {ts.mean():.4f}) print(fStd: {ts.std():.4f}) print(fMin: {ts.min():.4f}) print(fMax: {ts.max():.4f}) # 简单移动平均 ts_ma7 ts.rolling(window7).mean() ts_ma30 ts.rolling(window30).mean() # 趋势分析 X np.arange(len(ts)).reshape(-1, 1) y ts.values model LinearRegression() model.fit(X, y) trend model.predict(X) return ts, ts_ma7, ts_ma30, trend def clustering(self, feature_cols, n_clusters3): 聚类分析 X self.data[feature_cols] # 标准化 from sklearn.preprocessing import StandardScaler scaler StandardScaler() X_scaled scaler.fit_transform(X) # K-means聚类 kmeans KMeans(n_clustersn_clusters, random_state42) clusters kmeans.fit_predict(X_scaled) # 添加聚类结果到数据 self.data[cluster] clusters # 分析聚类结果 print(fClustering Results:) for i in range(n_clusters): cluster_data self.data[self.data[cluster] i] print(fCluster {i}:) print(f Size: {len(cluster_data)}) print(f Mean values:) for col in feature_cols: print(f {col}: {cluster_data[col].mean():.4f}) return clusters def pca_analysis(self, feature_cols, n_components2): 主成分分析 X self.data[feature_cols] # 标准化 scaler StandardScaler() X_scaled scaler.fit_transform(X) # PCA pca PCA(n_componentsn_components) principal_components pca.fit_transform(X_scaled) # 创建新的数据框 pc_columns [fPC{i1} for i in range(n_components)] pca_df pd.DataFrame(dataprincipal_components, columnspc_columns) # 解释方差 explained_variance pca.explained_variance_ratio_ print(fPCA Results:) for i, var in enumerate(explained_variance): print(fPC{i1} explains {var:.4f} of variance) print(fTotal explained variance: {sum(explained_variance):.4f}) return pca_df, explained_variance def hypothesis_testing(self, group_col, value_col): 假设检验 groups [] for group in self.data[group_col].unique(): group_data self.data[self.data[group_col] group][value_col] if len(group_data) 0: groups.append(group_data) # 方差分析 if len(groups) 2: from scipy.stats import f_oneway f

更多文章