← 返回数据分析目录
← 返回学习笔记首页
数据流水线构建
数据分析专题 · 可复现的数据处理流程
专题: Python数据分析系统学习
关键词: 数据分析, Pipeline, ColumnTransformer, DVC, Airflow, Prefect, data pipeline, 流水线
一、概述:为什么需要数据流水线
在数据科学和机器学习项目中,数据处理通常涉及多个步骤:数据加载、清洗、特征工程、模型训练、评估和部署。如果这些步骤缺乏系统化管理,很容易出现代码混乱、结果不可复现、协作困难等问题。数据流水线(Data Pipeline)正是为了解决这些痛点而生。
数据流水线的核心理念是将数据处理流程分解为多个可组合、可复用、可测试的步骤,每个步骤处理特定的数据变换,步骤之间通过明确定义的接口连接。这种设计模式带来了显著的好处:提高代码的可维护性,确保分析结果的可复现性,简化团队协作,以及方便后续的扩展和修改。
本章从Python生态中主流的流水线工具出发,系统介绍从数据处理到工作流编排的完整技术栈。我们将依次探讨scikit-learn Pipeline用于机器学习建模、FeatureUnion用于特征联合、自定义Transformer实现灵活的数据变换、pandas的pipe机制用于链式数据清洗、DVC用于数据版本控制、以及Airflow和Prefect/Luigi用于工作流调度。
核心原则: 一个好的数据流水线应当具备以下特性——可复现性(相同输入产生相同输出)、模块化(每步骤职责单一)、可追溯性(记录每个步骤的输入输出和参数)、容错性(失败时能优雅恢复或重试)。
工具选型建议: 单机数据处理优先用scikit-learn Pipeline + pandas pipe;需要版本控制和团队协作时引入DVC;复杂定时任务和跨系统编排选择Airflow;中小规模工作流可选用Prefect或Luigi。应根据团队规模和业务复杂度选择合适的工具组合,避免过度工程化。
二、scikit-learn Pipeline:机器学习流水线
scikit-learn提供的Pipeline类是构建机器学习流水线的核心工具。它将数据预处理、特征提取、降维和模型训练等步骤串联成一个整体,确保交叉验证和模型评估时不会发生数据泄露(data leakage)。Pipeline的核心思想是将一系列transformer和一个最终estimator组合为一个整体,调用fit时自动按顺序执行每一步的transform和最后的fit。
2.1 make_pipeline 快速构建
make_pipeline是构建Pipeline的简便函数,自动为每个步骤生成名称。适合步骤较少、不需要命名控制的场景。
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
# 使用make_pipeline一行构建流水线
pipe = make_pipeline(
StandardScaler(),
PCA(n_components=10 ),
SVC(kernel='rbf' , C=1.0 )
)
# 训练和预测与普通estimator完全相同
pipe.fit(X_train, y_train)
y_pred = pipe.predict(X_test)
print (f"Accuracy: {pipe.score(X_test, y_test):.3f}" )
make_pipeline自动将步骤命名为standardscaler、pca和svc。步骤名称可通过pipe.named_steps字典访问和修改参数,例如pipe.named_steps['svc'].C = 0.1。
2.2 Pipeline 显式命名
当需要精确控制每个步骤的名称(尤其是在网格搜索中引用参数时),应使用Pipeline构造函数,通过二元组列表显式指定名称。
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures
from sklearn.linear_model import Ridge
pipe = Pipeline([
('scaler' , MinMaxScaler()),
('poly' , PolynomialFeatures(degree=2 , include_bias=False )),
('reg' , Ridge(alpha=1.0 ))
])
# 网格搜索中通过双下划线分隔步骤名和参数名
param_grid = {
'poly__degree' : [1 , 2 , 3 ],
'reg__alpha' : [0.1 , 1.0 , 10.0 ]
}
from sklearn.model_selection import GridSearchCV
grid = GridSearchCV(pipe, param_grid, cv=5 )
grid.fit(X_train, y_train)
print (f"Best params: {grid.best_params_}" )
2.3 各步骤串联实现完整流程
真实项目中,Pipeline的步骤可以非常丰富,涵盖缺失值填充、编码、特征选择、降维和模型训练等多个阶段。以下展示一个包含自定义数据清洗步骤的完整流水线。
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import RandomForestClassifier
full_pipe = Pipeline([
('impute' , SimpleImputer(strategy='median' )),
('scale' , StandardScaler()),
('select' , SelectKBest(score_func=f_classif, k=20 )),
('classify' , RandomForestClassifier(n_estimators=100 , random_state=42 ))
])
# 训练完成后可以直接保存整个流水线
import joblib
joblib.dump(full_pipe, 'model_pipeline.pkl' )
# 加载后无需重复预处理,直接预测新数据
loaded_pipe = joblib.load('model_pipeline.pkl' )
new_data = pd.DataFrame({'feature_1' : [1.5 , 2.3 ], 'feature_2' : [3.7 , 4.1 ]})
predictions = loaded_pipe.predict(new_data)
2.4 ColumnTransformer:异构数据处理
真实数据集中通常同时包含数值列、类别列和文本列,每类特征需要不同的预处理方式。ColumnTransformer正是为解决这种异构数据问题而设计的,它允许对不同列应用不同的变换,最后将结果拼接为一个整体矩阵。
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
# 定义每类特征的预处理方式
numeric_features = ['age' , 'income' , 'education_years' ]
categorical_features = ['gender' , 'occupation' , 'city' ]
text_features = ['description' ]
preprocessor = ColumnTransformer([
('num' , Pipeline([
('impute' , SimpleImputer(strategy='median' )),
('scale' , StandardScaler())
]), numeric_features),
('cat' , Pipeline([
('impute' , SimpleImputer(strategy='most_frequent' )),
('encode' , OneHotEncoder(drop='first' , handle_unknown='ignore' ))
]), categorical_features),
('text' , TfidfVectorizer(max_features=500 ), text_features[0 ])
])
# ColumnTransformer + 最终estimator构成完整的异构数据流水线
model = Pipeline([
('preprocess' , preprocessor),
('classify' , RandomForestClassifier(n_estimators=200 , max_depth=10 ))
])
model.fit(X_train, y_train)
print (f"Heterogeneous pipeline AUC: {model.score(X_test, y_test):.4f}" )
ColumnTransformer的核心优势在于:它将特征工程的逻辑封装在流水线内部,训练时通过fit_transform学习每列的参数(如编码映射、归一化均值和方差),预测时通过transform复用这些参数,从根本上杜绝了数据泄露问题。
最佳实践: 使用ColumnTransformer时,建议通过remainder='passthrough'或remainder='drop'明确指定未列出的列的处理方式。同时在定义列选择时,优先使用列名列表而非列索引,以提高代码的可读性和健壮性。
三、FeatureUnion:特征联合与并行提取
FeatureUnion是scikit-learn中用于并行特征提取和合并的工具。与Pipeline的顺序执行不同,FeatureUnion并行执行多个变换器,然后将每个变换器的输出在特征维度上拼接起来。这在需要从同一数据中提取多种不同类型的特征时特别有用。
3.1 并行特征提取
在文本分类任务中,我们可能同时需要词袋特征、词性标注特征和文本长度特征。FeatureUnion可以并行计算这些特征并合并。
from sklearn.pipeline import FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class TextLengthExtractor (BaseEstimator, TransformerMixin):
def fit (self , X, y=None ):
return self
def transform (self , X):
return np.array([[len (text)] for text in X])
class WordCountExtractor (BaseEstimator, TransformerMixin):
def fit (self , X, y=None ):
return self
def transform (self , X):
return np.array([[len (text.split ())] for text in X])
from sklearn.feature_extraction.text import CountVectorizer
feature_union = FeatureUnion([
('bow' , CountVectorizer(max_features=1000 )),
('length' , TextLengthExtractor()),
('word_count' , WordCountExtractor())
])
# FeatureUnion可以嵌套在Pipeline中使用
pipe = Pipeline([
('features' , feature_union),
('clf' , RandomForestClassifier())
])
3.2 合并多个变换
FeatureUnion的transformer_list参数接受一个或多个(名称,变换器)二元组。所有变换器必须实现transform方法。如果某个变换器中途失败,FeatureUnion不会中断其他变换器的执行。
使用n_jobs参数可以控制并行计算的进程数(-1表示使用所有CPU核心),在大数据量场景下能显著加速特征提取过程。需要注意的是,如果变换器之间存在共享资源(如文件句柄),并行可能带来线程安全问题。
# 并行特征提取的另一种用法:权重分配
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import MiniBatchKMeans
feature_union_weighted = FeatureUnion([
('svd' , TruncatedSVD(n_components=50 )),
('kmeans' , Pipeline([
('cluster' , MiniBatchKMeans(n_clusters=10 , random_state=42 )),
('onehot' , OneHotEncoder(sparse_output=False ))
]))
], n_jobs=-1 ) # 并行执行
四、自定义Transformer
尽管scikit-learn内置了丰富的变换器,但实际项目中常常需要自定义的数据变换逻辑。scikit-learn提供了三种主要方式来实现自定义Transformer,以适应不同的灵活性和复杂度需求。
4.1 FunctionTransformer 快速包装
对于简单的函数变换,FunctionTransformer是最快捷的方式。它可以将任意Python函数包装为scikit-learn兼容的Transformer,无需编写完整的类定义。
from sklearn.preprocessing import FunctionTransformer
import pandas as pd
# 自定义数据清洗函数
def clip_outliers (X):
"""将超过3倍标准差的异常值替换为边界值"""
mean = np.nanmean (X, axis=0 )
std = np.nanstd (X, axis=0 )
lower = mean - 3 * std
upper = mean + 3 * std
return np.clip (X, lower, upper)
def add_interaction (X):
"""添加特征交叉项:每两列相乘"""
n = X.shape[1 ]
interactions = []
for i in range (n):
for j in range (i+1 , n):
interactions.append((X[:, i] * X[:, j]).reshape (-1 , 1 ))
return np.hstack ([X] + interactions)
# 包装为Transformer
clip_transformer = FunctionTransformer(clip_outliers, validate=True )
interaction_transformer = FunctionTransformer(add_interaction, validate=True )
# 在Pipeline中使用
pipe = Pipeline([
('clip' , clip_transformer),
('interact' , interaction_transformer),
('scale' , StandardScaler()),
('model' , Ridge())
])
4.2 BaseEstimator + TransformerMixin 完整自定义
对于需要学习参数(如fit阶段保存数据分布)或包含配置参数的复杂变换器,应通过继承BaseEstimator和TransformerMixin来定义完整的Transformer类。BaseEstimator提供了get_params和set_params方法(使变换器支持网格搜索),TransformerMixin提供了fit_transform方法(自动组合fit和transform)。
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_array, check_is_fitted
import pandas as pd
class OutlierHandler (BaseEstimator, TransformerMixin):
"""基于IQR方法的异常值处理,支持分组统计"""
def __init__ (self , factor=1.5 , strategy='clip' ):
self.factor = factor
self.strategy = strategy
def fit (self , X, y=None ):
X = check_array(X, force_all_finite=True )
self.statistics_ = {}
for col in range (X.shape[1 ]):
q1 = np.percentile (X[:, col], 25 )
q3 = np.percentile (X[:, col], 75 )
iqr = q3 - q1
self.statistics_[col] = {
'lower' : q1 - self.factor * iqr,
'upper' : q3 + self.factor * iqr
}
return self
def transform (self , X):
check_is_fitted(self , 'statistics_' )
X = check_array(X, force_all_finite=True )
X_out = X.copy ()
for col, bounds in self.statistics_.items():
col_mask = (X_out[:, col] < bounds['lower' ]) | (X_out[:, col] > bounds['upper' ])
print (f"Column {col}: {col_mask.sum()} outliers detected" )
if self.strategy == 'clip' :
X_out[:, col] = np.clip (X_out[:, col], bounds['lower' ], bounds['upper' ])
elif self.strategy == 'remove' :
X_out = X_out[~col_mask]
return X_out
# 可以在网格搜索中调优参数
pipe = Pipeline([
('outlier' , OutlierHandler()),
('model' , RandomForestRegressor())
])
param_grid = {'outlier__factor' : [1.5 , 2.0 , 3.0 ]}
grid = GridSearchCV(pipe, param_grid)
4.3 clone 机制
scikit-learn的clone函数用于创建estimator的深拷贝,同时保留参数设置但不保留拟合状态。这在交叉验证中非常关键:每次fold都使用全新的estimator副本进行训练,确保评估结果的独立性。
from sklearn.base import clone
# clone创建一个"干净"的副本
original = OutlierHandler(factor=2.0 )
original.fit (X_train)
print (original.statistics_) # 已拟合
# 克隆不携带拟合状态
cloned = clone(original)
print (cloned.factor) # 2.0,参数保留
# print(cloned.statistics_) # AttributeError,未拟合
# clone的核心用途:在自定义交叉验证中保证独立性
def custom_cv (estimator, X, y, folds=5 ):
scores = []
for train_idx, val_idx in StratifiedKFold(folds).split (X, y):
est_copy = clone(estimator) # 每次fold从原始estimator克隆
est_copy.fit (X[train_idx], y[train_idx])
scores.append (est_copy.score (X[val_idx], y[val_idx]))
return scores
五、pandas 管道:链式数据清洗
pandas的pipe方法提供了一种链式调用风格,让数据清洗和特征工程的代码更加简洁、可读和可维护。pipe的核心思想是将DataFrame或Series通过一系列函数传递,每个函数接收DataFrame并返回处理后的DataFrame,形成自然的处理流水线。
5.1 pipe 链式调用基础
import pandas as pd
import numpy as np
# 定义清洗函数——每个函数接收DataFrame,返回DataFrame
def drop_duplicates (df, subset=None ):
n_before = len (df)
df = df.drop_duplicates (subset=subset)
print (f"Dropped {n_before - len(df)} duplicate rows" )
return df
def fill_missing (df, numeric_strategy='median' , categorical_strategy='mode' ):
for col in df.select_dtypes (include=[np.number]).columns:
if df[col].isna ().any ():
fill_val = df[col].median () if numeric_strategy == 'median' else df[col].mean ()
df[col] = df[col].fillna (fill_val)
for col in df.select_dtypes (include=['object' , 'category' ]).columns:
if df[col].isna ().any ():
df[col] = df[col].fillna (df[col].mode ()[0 ] if not df[col].mode ().empty else 'Unknown' )
return df
def rename_columns (df, mapping):
return df.rename (columns=mapping)
def create_features (df):
"""衍生特征生成"""
if 'purchase_date' in df.columns:
df['purchase_date' ] = pd.to_datetime (df['purchase_date' ])
df['year' ] = df['purchase_date' ].dt.year
df['month' ] = df['purchase_date' ].dt.month
df['day_of_week' ] = df['purchase_date' ].dt.dayofweek
df['is_weekend' ] = df['day_of_week' ].isin ([5 , 6 ]).astype (int )
return df
# 链式调用——可读性极佳的数据清洗流水线
df_clean = (df
.pipe (drop_duplicates, subset=['user_id' , 'purchase_date' ])
.pipe (fill_missing, numeric_strategy='median' )
.pipe (rename_columns, {'cust_id' : 'user_id' , 'amt' : 'amount' })
.pipe (create_features)
.query ('amount > 0' )
.reset_index (drop=True )
)
5.2 自定义管道函数与参数化
pipe的强大之处在于可以在链式调用的任意步骤中传递函数和参数。通过将处理逻辑封装为独立的函数,每个步骤都可以单独测试、复用和调试,同时pipe链保证了整体流程的清晰可读。
# 带参数的自定义管道函数
def filter_by_percentile (df, column, lower=0.01 , upper=0.99 ):
lo = df[column].quantile (lower)
hi = df[column].quantile (upper)
return df.query (f"{column} >= @lo and {column} <= @hi" )
def aggregate_features (df, group_cols, agg_dict):
"""按分组列聚合生成统计特征"""
agg_df = df.groupby (group_cols).agg (agg_dict)
agg_df.columns = [f"{col}_{agg}" for col, agg in agg_dict.items()]
agg_df = agg_df.reset_index ()
return df.merge (agg_df, on=group_cols, how='left' )
# 复杂的数据清洗流水线
df_final = (df_raw
.pipe (drop_duplicates, subset=['transaction_id' ])
.pipe (fill_missing)
.pipe (filter_by_percentile, 'amount' , 0.01 , 0.99 )
.pipe (create_features)
.pipe (aggregate_features,
group_cols=['user_id' ],
agg_dict={'amount' : ['mean' , 'sum' , 'count' ], 'month' : 'nunique' })
)
5.3 链式数据清洗实战
以下展示一个完整的电商数据清洗流程,将多个pipe步骤组合为一条清晰的链式处理流水线。每个步骤都有明确的职责,便于后续维护和修改。
# 完整电商数据清洗流水线
def clean_ecommerce_data (raw_df):
"""电商数据标准清洗流水线"""
def standardize_dates (df):
date_cols = ['order_date' , 'ship_date' , 'return_date' ]
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime (df[col], errors='coerce' )
return df
def clean_categories (df):
"""标准化类别列:统一大小写、去除前后空格"""
cat_cols = df.select_dtypes (include=['object' , 'category' ]).columns
for col in cat_cols:
df[col] = (df[col]
.str .strip ()
.str .lower ()
.replace ({'' : np.nan , 'na' : np.nan , 'null' : np.nan })
)
return df
def validate_logic (df):
"""业务逻辑校验:过滤异常记录"""
if 'order_date' in df.columns:
df = df[df['order_date' ] <= pd.Timestamp .today ()]
if 'quantity' in df.columns:
df = df[df['quantity' ] > 0 ]
if 'price' in df.columns and 'discount' in df.columns:
df['net_price' ] = df['price' ] * (1 - df['discount' ])
df = df[df['net_price' ] >= 0 ]
return df
return (raw_df
.pipe (drop_duplicates, subset=['order_id' ])
.pipe (standardize_dates)
.pipe (clean_categories)
.pipe (fill_missing)
.pipe (validate_logic)
.reset_index (drop=True )
)
六、DVC:数据流水线版本控制
DVC(Data Version Control)是一个专门为数据科学和机器学习项目设计的版本控制系统。它建立在Git之上,专注于管理数据集、模型和实验指标,同时提供强大的流水线定义和重现能力。DVC的核心思想是将数据文件存储在独立的缓存中(如S3、GCS、本地目录),而Git只跟踪数据的元信息(.dvc文件),从而避免将大文件直接纳入Git仓库。
6.1 DVC数据版本管理
# 安装DVC
pip install dvc # 基础版本
pip install dvc[s3] # 支持S3远程存储
pip install dvc[gs] # 支持Google Cloud Storage
# 初始化DVC项目
git init
dvc init
# 添加远程存储
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote add -d myremote gs://my-bucket/dvc-store
dvc remote add -d myremote /path/to/local/cache
# 跟踪数据文件
dvc add data/raw/dataset.csv
# 会生成 dataset.csv.dvc 文件(元信息),data/raw/dataset.csv 加入 .gitignore
git add data/raw/dataset.csv.dvc .gitignore
git commit -m "Track raw dataset with DVC"
dvc push # 将实际数据上传到远程存储
# 切换版本:像切换代码分支一样切换数据版本
git checkout v1.0
dvc checkout # 自动将数据恢复到v1.0对应的版本
6.2 DVC流水线定义与依赖追踪
DVC的流水线(pipeline)通过dvc.yaml文件定义,描述数据处理各步骤的输入、输出和命令。DVC自动追踪每个步骤的依赖关系和输出文件的哈希值,当依赖发生变化时,仅重新运行受影响的步骤。
# dvc.yaml 示例:定义完整的ML流水线
stages:
load_data:
cmd: python src/load_data.py
deps:
- data/raw/source.db
- src/load_data.py
outs:
- data/processed/raw_dataset.parquet:
cache: true
clean_data:
cmd: python src/clean_data.py --config params.yaml
deps:
- data/processed/raw_dataset.parquet
- src/clean_data.py
params:
- clean.threshold
- clean.fill_strategy
outs:
- data/processed/clean_dataset.parquet
feature_engineering:
cmd: python src/feature_engineering.py
deps:
- data/processed/clean_dataset.parquet
- src/feature_engineering.py
params:
- features.window_size
- features.max_lag
outs:
- data/processed/features.parquet
- data/processed/feature_meta.json
train:
cmd: python src/train.py
deps:
- data/processed/features.parquet
- src/train.py
params:
- train.model_type
- train.learning_rate
- train.n_estimators
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
plots:
- plots/feature_importance.png:
cache: false
6.3 流水线重现与参数调优
DVC最强大的功能之一是流水线重现(reproduce)。当源代码、数据或参数发生变化时,DVC自动计算依赖图,仅重新运行受影响的部分。params.yaml文件集中管理所有可调参数,便于实验管理。
# params.yaml — 集中管理所有可调参数
clean:
threshold: 3.0
fill_strategy: median
features:
window_size: 7
max_lag: 3
train:
model_type: random_forest
learning_rate: 0.1
n_estimators: 200
max_depth: 10
# 运行流水线——自动检测变化,仅执行受影响步骤
dvc repro
# 强制重新运行所有步骤
dvc repro --force
# 查看流水线依赖关系图
dvc dag
# 对比不同实验的指标
git checkout experiment-1
dvc metrics show
git checkout experiment-2
dvc metrics show
# 跨分支对比指标
dvc metrics diff experiment-1 experiment-2
DVC最佳实践: 1)将dvc.yaml和params.yaml纳入Git版本管理;2)使用dvc metrics和dvc plots跟踪模型性能指标;3)结合Git分支进行实验管理,每个实验对应一个分支;4)大型数据集使用dvc checkout --relink快速切换版本。
七、Airflow:数据工作流调度
Apache Airflow是业界最广泛使用的工作流调度平台之一,适用于构建、调度和监控复杂的数据管道。Airflow的核心概念包括DAG(有向无环图)、Task(任务)和Operator(操作符),通过Python代码定义工作流逻辑,提供Web UI进行可视化管理。
7.1 DAG定义与Task/Operator基础
每个Airflow工作流由一个DAG定义,DAG包含多个Task,Task通过Operator实例化。依赖关系通过>>运算符或set_upstream/set_downstream方法指定。
# dags/etl_pipeline.py — 完整的ETL工作流
from datetime
import datetime, timedelta
from airflow
import DAG
from airflow.operators.python
import PythonOperator
from airflow.operators.bash
import BashOperator
from airflow.operators.email
import EmailOperator
from airflow.providers.postgres.operators.postgres
import PostgresOperator
default_args = {
'owner' :
'data_team' ,
'depends_on_past' :
False ,
'email_on_failure' :
True ,
'email_on_retry' :
False ,
'email' : [
'alerts@company.com' ],
'retries' :
2 ,
'retry_delay' : timedelta(minutes=
5 ),
}
dag = DAG(
'etl_pipeline' ,
default_args=default_args,
description=
'Daily ETL pipeline' ,
schedule=
'0 2 * * *' ,
# 每天凌晨2点执行
start_date=datetime(
2025 ,
1 ,
1 ),
catchup=
False ,
tags=[
'etl' ,
'production' ],
)
def extract_from_api (**context):
"""从API提取数据"""
api_url = context[
'params' ].
get (
'api_url' ,
'https://api.example.com/data' )
response = requests.
get (api_url, timeout=
30 )
response.
raise_for_status ()
raw_data = response.
json ()
# 将原始数据写入临时文件
output_path =
f'/tmp/raw_data_{{context["ds"]}}.json'
with open (output_path,
'w' )
as f:
json.
dump (raw_data, f)
# 通过XCom传递文件路径给下游任务
context[
'ti' ].
xcom_push (key=
'raw_data_path' , value=output_path)
def transform_data (**context):
"""数据变换和清洗"""
ti = context[
'ti' ]
raw_path = ti.
xcom_pull (task_ids=
'extract' , key=
'raw_data_path' )
with open (raw_path)
as f:
data = json.
load (f)
# 数据清洗逻辑
df = pd.
DataFrame (data)
df = df.
drop_duplicates (subset=[
'id' ])
df = df.
fillna ({
'amount' :
0 ,
'category' :
'unknown' })
cleaned_path =
f'/tmp/cleaned_data_{{context["ds"]}}.parquet'
df.
to_parquet (cleaned_path)
ti.
xcom_push (key=
'cleaned_path' , value=cleaned_path)
# 定义任务
extract = PythonOperator(
task_id=
'extract' ,
python_callable=extract_from_api,
params={
'api_url' :
'https://api.company.com/orders' },
dag=dag,
)
transform = PythonOperator(
task_id=
'transform' ,
python_callable=transform_data,
dag=dag,
)
load = PostgresOperator(
task_id=
'load_to_db' ,
postgres_conn_id=
'data_warehouse' ,
sql=
"""
TRUNCATE TABLE staging.orders;
COPY staging.orders FROM '/tmp/cleaned_data_{{ ds }}.parquet' WITH (FORMAT PARQUET);
""" ,
dag=dag,
)
backup = BashOperator(
task_id=
'backup_raw' ,
bash_command=
'cp /tmp/raw_data_{{ ds }}.json /backup/raw/{{ ds }}/' ,
dag=dag,
)
notification = EmailOperator(
task_id=
'send_notification' ,
to=
'data-team@company.com' ,
subject=
'ETL Pipeline Completed - {{ ds }}' ,
html_content=
'ETL Pipeline Completed Date: {{ ds }}
' ,
dag=dag,
)
# 定义依赖关系——>>运算符指定执行顺序
extract >> transform >> load >> notification
extract >> backup
7.2 BranchPythonOperator与传感器
BranchPythonOperator用于实现条件分支逻辑,根据运行时的条件决定后续执行路径。传感器(Sensor)用于等待外部条件满足后再继续执行,常用于依赖上游系统数据就绪的场景。
from airflow.operators.python import BranchPythonOperator
from airflow.sensors.filesystem import FileSensor
def choose_branch (**context):
"""根据数据量大小选择处理分支"""
record_count = context['ti' ].xcom_pull (key='record_count' )
if record_count > 1000000 :
return 'spark_process' # 大数据量走Spark分支
else :
return 'local_process' # 小数据量走本地处理分支
branch = BranchPythonOperator(
task_id='branch_decision' ,
python_callable=choose_branch,
dag=dag,
)
# 传感器:等待上游系统文件就绪
wait_for_file = FileSensor(
task_id='wait_for_source_file' ,
filepath='/data/incoming/orders_{{ ds }}.csv' ,
poke_interval=30 , # 每30秒检查一次
timeout=3600 , # 1小时超时
mode='reschedule' , # 释放worker slot
dag=dag,
)
spark_process = SparkSubmitOperator(
task_id='spark_process' ,
application='/opt/spark_jobs/process_large.py' ,
dag=dag,
)
local_process = PythonOperator(
task_id='local_process' ,
python_callable=process_small_data,
dag=dag,
)
# 合并分支:无论走哪个分支,最终都汇总到同一个任务
from airflow.operators.dummy import DummyOperator
merge = DummyOperator(task_id='merge' , dag=dag)
wait_for_file >> extract >> branch
branch >> [spark_process, local_process]
spark_process >> merge
local_process >> merge
7.3 重试策略与报警机制
Airflow提供了灵活的重试和报警配置。关键生产任务需要配置合理的重试策略,同时在重试耗尽后触发报警通知。通过SLAs(服务等级协定)可以监控任务执行时长,及时发现性能异常。
# 精细化的重试策略
critical_task = PythonOperator(
task_id='critical_data_load' ,
python_callable=load_critical_data,
retries=3 ,
retry_delay=timedelta(minutes=2 ),
retry_exponential_backoff=True , # 指数退避:2min, 4min, 8min
max_retry_delay=timedelta(hours=1 ),
execution_timeout=timedelta(hours=2 ),
sla=timedelta(hours=1 ),
dag=dag,
)
# DAG级报警配置(通过default_args)
# 配合Aipflow监控工具(如PagerDuty、Slack)实现实时报警
# 在airflow.cfg中配置:
# [email]
# email_backend = airflow.utils.email.send_email_smtp
# 自定义回调:任务失败时发送Slack消息
def slack_failure_callback (context):
import requests
webhook_url = 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
task_instance = context['task_instance' ]
message = {
'text' : f':x: Task Failed!\n'
f'DAG: {context["dag"].dag_id}\n'
f'Task: {task_instance.task_id}\n'
f'Execution: {context["ds"]}\n'
f'Log: {task_instance.log_url}'
}
requests.post (webhook_url, json=message)
# 在DAG中使用on_failure_callback
dag = DAG(
'monitored_pipeline' ,
default_args={
'on_failure_callback' : slack_failure_callback,
'email_on_failure' : True ,
'email' : ['oncall@company.com' ],
},
schedule='@daily' ,
start_date=datetime(2025 , 1 , 1 ),
)
八、Prefect / Luigi:轻量级工作流
对于不需要Airflow那样完整基础设施的团队,Prefect和Luigi提供了更轻量级的替代方案。Prefect以"Pythonic"的API设计和强大的状态管理著称,Luigi则以其简洁性和与Spotify内部工具的深度集成为特点。
8.1 Prefect 现代工作流引擎
Prefect的核心理念是"将工作流视为Python代码"。通过装饰器(@flow、@task)可以轻松将任意Python函数转化为工作流任务,支持自动重试、缓存、并行执行等功能。
# Prefect 2.x 的现代工作流定义
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
# 定义任务——通过@task装饰器
@task(retries=2 , retry_delay_seconds=30 ,
cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1 ))
def fetch_data (url: str) -> pd.DataFrame:
"""从API获取数据,结果缓存1小时"""
response = requests.get (url, timeout=60 )
response.raise_for_status ()
return pd.DataFrame (response.json ())
@task
def clean_data (df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗"""
df = df.drop_duplicates ()
df = df.fillna ({'value' : 0 , 'category' : 'unknown' })
return df
@task
def compute_statistics (df: pd.DataFrame) -> dict:
"""计算统计指标"""
return {
'mean' : df['value' ].mean (),
'std' : df['value' ].std (),
'count' : len (df)
}
@task
def save_results (stats: dict, path: str):
import json
with open (path, 'w' ) as f:
json.dump (stats, f, indent=2 )
# 定义工作流——通过@flow装饰器
@flow(name="data_analysis_pipeline" ,
task_runner=ConcurrentTaskRunner())
def run_pipeline (api_url: str, output_path: str):
"""完整数据分析工作流"""
raw_data = fetch_data(api_url)
cleaned = clean_data(raw_data)
stats = compute_statistics(cleaned)
save_results(stats, output_path)
return stats
# 执行工作流——像调用普通函数一样
if __name__ == '__main__' :
stats = run_pipeline(
api_url='https://api.example.com/dataset' ,
output_path='./results/stats.json'
)
print (f"Computed stats: {stats}" )
8.2 Luigi 任务编排
Luigi是Spotify开发的工作流库,其核心思想是每个任务继承luigi.Task类,实现requires(定义依赖)、output(定义输出目标)和run(执行逻辑)三个方法。Luigi的依赖解析机制通过目标文件的存在性自动判断任务是否需要执行。
# Luigi 工作流定义
import luigi
import pandas as pd
from luigi.contrib.s3 import S3Target
class FetchData (luigi.Task):
"""获取原始数据"""
date = luigi.DateParameter()
api_key = luigi.Parameter(default='default_key' )
def output (self ):
return S3Target(f's3://bucket/raw/{{self.date}}/data.csv' )
def run (self ):
response = requests.get (f'https://api.example.com/data?date={{self.date}}' )
with self.output ().open ('w' ) as f:
f.write (response.text)
class CleanData (luigi.Task):
"""数据清洗"""
date = luigi.DateParameter()
def requires (self ):
return FetchData(date=self.date)
def output (self ):
return S3Target(f's3://bucket/clean/{{self.date}}/data.parquet' )
def run (self ):
with self.input ().open ('r' ) as f:
df = pd.read_csv (f)
df = df.drop_duplicates ().fillna (0 )
with self.output ().open ('w' ) as f:
df.to_parquet (f)
class TrainModel (luigi.Task):
"""训练模型"""
date = luigi.DateParameter()
model_type = luigi.Parameter(default='random_forest' )
def requires (self ):
return CleanData(date=self.date)
def output (self ):
return S3Target(f's3://bucket/models/{{self.date}}/{{self.model_type}}.pkl' )
def run (self ):
with self.input ().open ('r' ) as f:
df = pd.read_parquet (f)
model = RandomForestClassifier(n_estimators=100 )
model.fit (df.drop ('target' , axis=1 ), df['target' ])
with self.output ().open ('w' ) as f:
joblib.dump (model, f)
# 命令行执行:luigi --module my_pipeline TrainModel --date 2025-01-01
if __name__ == '__main__' :
luigi.build ([TrainModel(date=datetime.today ())], local_scheduler=True )
8.3 Prefect vs Luigi vs Airflow 对比
特性 Airflow Prefect Luigi
安装复杂度 较高,需要独立部署 低,pip install即可 低,pip install即可
调度能力 强大,支持cron和复杂调度 良好,支持cron和事件驱动 基础,需外部调度器
Web UI 功能完善,生产级 Prefect Cloud/Server 基础CLI + 简单UI
状态管理 数据库存储 数据库 + 自动重试/缓存 文件系统 + 目标检测
Python化程度 DAG定义Pythonic,Operator需学习 完全Pythonic(装饰器模式) 类继承模式,学习成本较低
社区生态 最丰富,Provider众多 快速增长中 成熟但增长放缓
九、总结与实践建议
数据流水线构建是现代数据工程的基石。从单机上的scikit-learn Pipeline到分布式的Airflow工作流,每种工具都在特定场景下发挥着不可替代的作用。在实践中,应根据项目规模、团队能力和基础设施条件合理选择工具组合。
工具选型矩阵:
单机ML实验: scikit-learn Pipeline + ColumnTransformer + FeatureUnion + 自定义Transformer
数据清洗和分析: pandas pipe链式调用
数据版本控制和实验管理: DVC + Git
企业级定时任务和复杂工作流: Airflow
中小规模或快速原型: Prefect(装饰器风格)或 Luigi(类继承风格)
架构原则: 无论使用何种工具,始终遵循以下原则——1)每个步骤职责单一,输入输出明确;2)所有步骤可独立测试;3)流水线定义和业务逻辑分离(即DAG代码和执行代码分离);4)关键路径设置监控和报警;5)数据血缘关系可追溯。坚持这些原则,才能在数据规模和团队扩张时保持流水线的健康度。
常见陷阱: 1)过早引入复杂的调度框架(应先用手动脚本+简单流水线);2)流水线粒度过粗(一个任务做太多事,难以调试和复用);3)忽略数据版本控制(导致实验结果无法复现);4)缺乏监控报警(失败后数小时才发现);5)忽略流水线的可测试性(无法单元测试导致线上故障频发)。
数据流水线的构建是一个持续演进的过程。在项目初期,使用pandas pipe + scikit-learn Pipeline即可满足需求;随着数据量和复杂度的增加,逐步引入DVC进行版本控制,再用Prefect或Airflow进行调度编排。关键在于保持对流水线健康度的持续关注,定期审视每个步骤的必要性和性能表现,确保流水线始终为业务价值服务。