二、统计方法检测异常值
统计方法是异常值检测的基础,它们基于数据的分布假设,通过计算统计量来识别离群点。以下介绍最常用的五种统计方法。
2.1 Z-score 方法
Z-score(标准分数)衡量数据点距离均值的标准差倍数。假设数据服从正态分布,Z-score 绝对值大于 3 的数据点通常被视为异常值(因为正态分布中 99.7% 的数据落在 3 个标准差范围内)。
import numpy as np
from scipy import stats
def zscore_outlier_detection(data, threshold=3):
z_scores = np.abs(stats.zscore(data))
outliers = np.where(z_scores > threshold)[0]
return outliers, z_scores
data = np.array([10, 12, 11, 13, 12, 11, 10, 12, 100, 11, 13, 12])
outliers, z_scores = zscore_outlier_detection(data)
print(f"异常值索引: {outliers}")
print(f"Z-scores: {z_scores}")
# 异常值索引: [8]
# Z-scores: [0.58 0.39 0.48 0.29 0.39 0.48 0.58 0.39 3.08 0.48 0.29 0.39]
Z-score 的局限性
Z-score 对均值和标准差非常敏感,而均值和标准差本身会被异常值严重扭曲(掩蔽效应)。当数据集中存在多个异常值时,Z-score 的检测效果会显著下降。此外,Z-score 适用于单变量且近似正态分布的数据。
2.2 改进 Z-score(MAD 法)
改进 Z-score 使用中位数和中位数绝对偏差(MAD)代替均值和标准差,具有更强的稳健性。MAD 定义为:MAD = median(|X_i - median(X)|)。改进 Z-score 的计算公式为:M_i = 0.6745 × (X_i - median(X)) / MAD。通常取阈值 3.5。
import numpy as np
def modified_zscore_outlier_detection(data, threshold=3.5):
median = np.median(data)
mad = np.median(np.abs(data - median))
if mad == 0:
return np.array([], dtype=int), np.zeros_like(data)
modified_z_scores = 0.6745 * (data - median) / mad
outliers = np.where(np.abs(modified_z_scores) > threshold)[0]
return outliers, modified_z_scores
data = np.array([10, 12, 11, 100, 13, 12, 200, 11, 10, 12, 150, 11])
outliers, scores = modified_zscore_outlier_detection(data)
print(f"异常值索引: {outliers}")
print(f"改进Z-scores: {np.round(scores, 2)}")
# 异常值索引: [3 6 10]
# 改进Z-scores: [-0.17 -0.06 -0.11 6.69 0.06 -0.06 13.38 -0.11 -0.17 -0.06 10.04 -0.11]
与标准 Z-score 相比,改进 Z-score 对异常值本身不敏感,能够更可靠地识别多个异常值共存的场景。
2.3 3σ 原则
3σ 原则基于正态分布假设,认为数据几乎全部(99.73%)落在均值 ± 3 个标准差的范围内。超出该范围的值被视为异常值。这一方法简单直观,在工业质检和过程控制中广泛应用。
import numpy as np
def three_sigma_detection(data):
mean = np.mean(data)
std = np.std(data, ddof=1)
lower_bound = mean - 3 * std
upper_bound = mean + 3 * std
outliers = np.where((data < lower_bound) | (data > upper_bound))[0]
return outliers, (lower_bound, upper_bound)
np.random.seed(42)
data = np.random.normal(50, 10, 100)
data = np.append(data, [5, 95, 2, 98])
outliers, bounds = three_sigma_detection(data)
print(f"下界: {bounds[0]:.2f}, 上界: {bounds[1]:.2f}")
print(f"检测到的异常值: {data[outliers]}")
3σ 原则的适用场景
适用于:样本量较大(通常 n > 30)、数据近似正态分布的场景。在统计过程控制(SPC)中,3σ 控制图是经典的工业质量监控工具。不适用于:偏态分布、多峰分布或小样本数据。
2.4 IQR 四分位距法
IQR(Interquartile Range)方法不依赖于数据的分布假设,是一种非参数方法。它使用四分位数来定义正常范围:IQR = Q3 - Q1,正常值范围为 [Q1 - 1.5×IQR, Q3 + 1.5×IQR]。超出此范围的值被视为温和异常值;超出 [Q1 - 3×IQR, Q3 + 3×IQR] 的值为极端异常值。
import numpy as np
def iqr_outlier_detection(data, multiplier=1.5):
q1 = np.percentile(data, 25)
q3 = np.percentile(data, 75)
iqr = q3 - q1
lower_bound = q1 - multiplier * iqr
upper_bound = q3 + multiplier * iqr
outliers = np.where((data < lower_bound) | (data > upper_bound))[0]
return outliers, (lower_bound, upper_bound, q1, q3, iqr)
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 25, 3, 4, 5, 6, 100])
outliers, stats = iqr_outlier_detection(data)
print(f"Q1 = {stats[2]:.2f}, Q3 = {stats[3]:.2f}, IQR = {stats[4]:.2f}")
print(f"下界 = {stats[0]:.2f}, 上界 = {stats[1]:.2f}")
print(f"异常值索引: {outliers}")
print(f"异常值: {data[outliers]}")
extreme_outliers, _ = iqr_outlier_detection(data, multiplier=3.0)
print(f"极端异常值: {data[extreme_outliers]}")
IQR 方法不受数据分布限制,对偏态分布和厚尾分布都表现出良好的稳健性。箱线图中默认使用的就是 1.5×IQR 规则来标记异常值。
2.5 箱线图离群点
箱线图(Boxplot)将 IQR 方法以图形方式呈现。箱体表示 Q1 到 Q3,箱中横线是中位数。触须线延伸到非异常值范围内的最远数据点,超出触须的点即为异常值。
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(42)
normal_data = np.random.normal(50, 10, 200)
outliers_data = np.array([5, 95, 3, 98, 2])
data = np.concatenate([normal_data, outliers_data])
fig, ax = plt.subplots(figsize=(10, 4))
bp = ax.boxplot(data, vert=False, patch_artist=True,
flierprops=dict(marker='o', markerfacecolor='red',
markersize=8, linestyle='none'))
fliers = bp['fliers'][0].get_data()[0]
print(f"箱线图识别的异常值: {fliers}")
箱线图的优势在于:直观展示数据分布特征,同时识别异常值;不依赖分布假设;适用于多组数据之间的比较。在数据探索性分析(EDA)中,箱线图是发现异常值的首选工具。
三、可视化检测方法
可视化方法是异常值检测中最直观的手段。通过图形化展示数据分布和关系,可以快速发现潜在的异常模式。以下介绍四种常用的可视化方法。
3.1 箱线图 (Boxplot)
箱线图已在 2.5 节详细介绍。在实际数据分析中,箱线图常被用来比较不同组别或不同特征的异常值分布。例如,在对比多个传感器读数时,箱线图可以快速筛选出读数异常的设备。
3.2 散点图 (Scatterplot)
散点图是检测双变量或多变量异常值的有效工具。在二维散点图中,明显偏离主体数据簇的点即为异常值。对于高维数据,可以使用 t-SNE 或 PCA 降维至二维后可视化。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
x = np.random.normal(0, 1, 200)
y = 2 * x + np.random.normal(0, 0.5, 200)
x_out = np.array([4, -3, 3.5, -4, 5])
y_out = np.array([10, -8, 9, -10, 12])
x_all = np.concatenate([x, x_out])
y_all = np.concatenate([y, y_out])
center = np.median([x_all, y_all], axis=1)
distances = np.sqrt((x_all - center[0])**2 + (y_all - center[1])**2)
threshold = np.percentile(distances, 95)
is_outlier = distances > threshold
print(f"基于距离检测到的异常值数量: {np.sum(is_outlier)}")
print(f"距离阈值 (95% 分位数): {threshold:.2f}")
3.3 分布图 (Histogram)
直方图展示单变量数据的频率分布。异常值通常表现为孤立的长条或远离主体分布的尾部。核密度估计图(KDE)可以更平滑地显示分布形态,帮助发现异常值。
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
np.random.seed(42)
data = np.random.exponential(scale=2, size=1000)
data = np.append(data, [30, 35, 40, 50])
counts, bins = np.histogram(data, bins=50)
print(f"最右侧两个分箱的数据点数量: {counts[-2:]}")
q99 = np.percentile(data, 99)
q999 = np.percentile(data, 99.9)
print(f"99% 分位数: {q99:.2f}, 99.9% 分位数: {q999:.2f}")
outliers_hist = data[data > q999]
print(f"超过 99.9% 分位数的值: {outliers_hist}")
3.4 QQ-Plot
QQ 图(分位数-分位数图)用于检验数据是否服从指定的理论分布(如正态分布)。如果数据点大致沿对角线分布,则数据符合理论分布;偏离对角线的点即为异常值。QQ 图特别适合检测分布尾部的异常值。
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
np.random.seed(42)
data = np.random.normal(0, 1, 100)
data = np.append(data, [4.5, 5.0, -4.2, 5.5])
(osm, osr) = stats.probplot(data, dist='norm', plot=None)
residuals = np.abs(osr[1] - (osm[0] * osr[0] + osr[1]))
outlier_indices = np.argsort(residuals)[-5:][::-1]
print(f"QQ-plot 识别的最可能异常值索引: {outlier_indices}")
print(f"对应数据值: {data[outlier_indices]}")
stat, p_value = stats.shapiro(data)
print(f"Shapiro-Wilk 检验 p-value: {p_value:.4f}")
print(f"数据{'不' if p_value < 0.05 else ''}服从正态分布")
QQ 图的优势在于不仅显示异常值,还揭示了数据的整体分布特征——偏态、厚尾、双峰等分布特征都可以在 QQ 图上直观体现。
四、机器学习方法
当数据维度较高或分布复杂时,传统的统计方法可能力不从心。机器学习方法提供了更强大的异常值检测能力,尤其适用于高维数据和复杂模式识别。
4.1 Isolation Forest 孤立森林
孤立森林是一种基于集成的无监督异常检测算法。其核心思想是:异常值更容易被"孤立"——即只需要很少的随机分割就能将异常点从数据中分离出来。算法通过构建随机二叉树(iTree)并计算路径长度来评估异常程度。
from sklearn.ensemble import IsolationForest
import numpy as np
np.random.seed(42)
X_normal = np.random.randn(200, 2) * 0.5 + [2, 2]
X_outliers = np.random.uniform(low=-4, high=8, size=(20, 2))
X = np.vstack([X_normal, X_outliers])
iso_forest = IsolationForest(
n_estimators=100,
max_samples='auto',
contamination='auto',
random_state=42
)
iso_forest.fit(X)
y_pred = iso_forest.predict(X)
scores = iso_forest.decision_function(X)
outlier_indices = np.where(y_pred == -1)[0]
print(f"检测到的异常值数量: {len(outlier_indices)}")
print(f"异常值索引: {outlier_indices}")
iso_forest_adj = IsolationForest(
n_estimators=100,
contamination=0.05,
random_state=42
)
iso_forest_adj.fit(X)
y_pred_adj = iso_forest_adj.predict(X)
print(f"调整 contamination 后检测到的异常值数量: {np.sum(y_pred_adj == -1)}")
Isolation Forest 的核心优势
- 线性时间复杂度: O(n),适合大规模数据集
- 无需距离计算: 不需要定义距离度量,对高维数据友好
- 无需分布假设: 不要求数据服从特定分布
- 抗掩蔽效应: 多个异常值共存时仍能有效检测
- 可解释性: 路径长度提供了直观的异常评分
4.2 LOF 局部离群因子
LOF(Local Outlier Factor)是一种基于密度的异常检测算法。它衡量一个数据点相对于其邻域的局部密度偏差。如果某点的密度显著低于其邻居的密度,则该点被认为是异常值。LOF 特别擅长检测局部异常值——即在某些区域正常但在另一些区域异常的数值。
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
np.random.seed(42)
X1 = np.random.randn(100, 2) * 0.3 + [0, 0]
X2 = np.random.randn(100, 2) * 0.8 + [3, 3]
X_outliers = np.random.uniform(low=-2, high=5, size=(15, 2))
X = np.vstack([X1, X2, X_outliers])
lof = LocalOutlierFactor(
n_neighbors=20,
contamination=0.05,
leaf_size=30
)
y_pred = lof.fit_predict(X)
lof_scores = lof.negative_outlier_factor_
outlier_indices = np.where(y_pred == -1)[0]
print(f"检测到的异常值数量: {len(outlier_indices)}")
most_abnormal = np.argsort(lof_scores)[:5]
print(f"最异常的 5 个点索引: {most_abnormal}")
print(f"它们的 LOF 分数: {lof_scores[most_abnormal]}")
LOF 的参数敏感性
LOF 的核心参数是 n_neighbors(邻居数量)。该参数过小时,算法对局部密度变化过于敏感,可能误判正常点为异常;过大时,可能漏检局部异常值。建议在 10~50 之间调参,或使用肘部法则选择最优值。此外,LOF 对高维数据(维度 > 20)效果会显著下降(维度灾难)。
4.3 One-Class SVM
One-Class SVM 是一种基于边界的异常检测方法。它将所有训练数据映射到高维特征空间,然后学习一个能够将大多数数据点包围起来的决策边界。位于边界之外的点被视为异常值。该方法适合高维数据和非线性决策边界。
from sklearn.svm import OneClassSVM
import numpy as np
np.random.seed(42)
theta = np.linspace(0, 2*np.pi, 100)
X_circle = np.column_stack([np.cos(theta), np.sin(theta)]) * 2
X_circle += np.random.randn(100, 2) * 0.2
X_outliers = np.random.uniform(low=-4, high=4, size=(15, 2))
X = np.vstack([X_circle, X_outliers])
ocsvm = OneClassSVM(
kernel='rbf',
nu=0.05,
gamma='auto'
)
ocsvm.fit(X)
y_pred = ocsvm.predict(X)
scores = ocsvm.decision_function(X)
outlier_indices = np.where(y_pred == -1)[0]
print(f"检测到的异常值数量: {len(outlier_indices)}")
print(f"异常值索引: {outlier_indices[:10]}")
SVM 方法的选择指南
- One-Class SVM: 适合小到中等规模数据集(n < 10000),非线性决策边界,高维数据
- nu 参数: 控制训练数据中被判定为异常的比例上限,实际含义是决策边界的"紧致程度"
- gamma 参数: 控制 RBF 核的影响范围,gamma 越大决策边界越复杂,越容易过拟合
4.4 EllipticEnvelope
EllipticEnvelope(椭圆包络)假设正常数据服从多元高斯分布,通过估计数据的均值和协方差矩阵来拟合一个椭圆决策边界。超出椭圆的数据点被标记为异常值。该方法使用 Minimum Covariance Determinant(MCD)来增强对异常值的鲁棒性。
from sklearn.covariance import EllipticEnvelope
import numpy as np
np.random.seed(42)
mean = [0, 0]
cov = [[3, 1.5], [1.5, 2]]
X_normal = np.random.multivariate_normal(mean, cov, 200)
X_outliers = np.random.uniform(low=-8, high=8, size=(15, 2))
X = np.vstack([X_normal, X_outliers])
ee = EllipticEnvelope(
contamination=0.05,
assume_centered=False,
random_state=42
)
ee.fit(X)
y_pred = ee.predict(X)
mahal_dist = ee.mahalanobis(X)
outlier_indices = np.where(y_pred == -1)[0]
print(f"检测到的异常值数量: {len(outlier_indices)}")
sorted_idx = np.argsort(mahal_dist)[::-1]
print(f"马氏距离最大的 5 个点索引: {sorted_idx[:5]}")
print(f"对应的马氏距离: {mahal_dist[sorted_idx[:5]].round(2)}")
EllipticEnvelope 的局限性
该方法假设数据服从多元正态分布,当数据是多峰分布或严重偏态时效果很差。MCD 估计需要足够的样本量(通常 n > 10p,其中 p 是特征数),否则协方差估计不稳定。此外,特征之间存在高度多重共线性时也会影响效果。
4.5 DBSCAN 聚类
DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,同时具有异常值检测能力。它将低密度区域中的点自动标记为噪声点(即异常值),不需要预先指定聚类数量。
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import numpy as np
np.random.seed(42)
X1 = np.random.randn(100, 2) * 0.5 + [-2, -2]
X2 = np.random.randn(100, 2) * 0.5 + [3, 3]
X3 = np.random.randn(100, 2) * 0.3 + [0, 5]
X_noise = np.random.uniform(low=-8, high=8, size=(30, 2))
X = np.vstack([X1, X2, X3, X_noise])
X_scaled = StandardScaler().fit_transform(X)
dbscan = DBSCAN(
eps=0.3,
min_samples=5,
metric='euclidean'
)
labels = dbscan.fit_predict(X_scaled)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = np.sum(labels == -1)
print(f"发现的聚类数量: {n_clusters}")
print(f"异常值(噪声点)数量: {n_noise}")
from sklearn.neighbors import NearestNeighbors
neigh = NearestNeighbors(n_neighbors=5)
neigh.fit(X_scaled)
distances, _ = neigh.kneighbors(X_scaled)
k_dist = np.sort(distances[:, -1], axis=0)
print(f"k-distance 的 90% 分位数: {np.percentile(k_dist, 90):.3f}")
print(f"k-distance 的 95% 分位数: {np.percentile(k_dist, 95):.3f}")
DBSCAN 参数调优指南
eps(邻域半径): 使用 k-distance 图确定。计算每个点到第 k 近邻的距离,按升序排列后找到"肘部"位置,该位置对应的距离即为最佳 eps。过小会导致大量点被误判为噪声;过大会合并本应分开的簇。
min_samples(最小样本数): 通常取特征数的 2 倍,或经验值 5~10。越大则对噪声的定义越宽松,越不容易检测出异常值。
五、多变量异常检测
多变量异常检测考虑多个特征之间的相关性。一个数据点在每个维度上都可能是正常的,但它们的组合在多元空间中可能是异常的。多变量异常检测能够发现单变量方法遗漏的异常模式。
5.1 Mahalanobis 距离
马氏距离(Mahalanobis Distance)考虑了特征之间的相关性,消除了量纲和共线性的影响。对于多元数据,马氏距离定义为:MD(x) = sqrt((x - μ)^T Σ^{-1} (x - μ)),其中 μ 是均值向量,Σ 是协方差矩阵。
import numpy as np
from scipy.stats import chi2
def mahalanobis_outlier_detection(X, alpha=0.01):
n, p = X.shape
mean = np.mean(X, axis=0)
cov = np.cov(X, rowvar=False)
inv_cov = np.linalg.inv(cov)
mahal_dist = np.array([
np.sqrt((x - mean).T @ inv_cov @ (x - mean))
for x in X
])
threshold = np.sqrt(chi2.ppf(1 - alpha, df=p))
outliers = np.where(mahal_dist > threshold)[0]
return outliers, mahal_dist, threshold
np.random.seed(42)
mean = [5, 10]
cov = [[4, 3], [3, 6]]
X_normal = np.random.multivariate_normal(mean, cov, 100)
X_outliers = np.array([[0, 5], [12, 20], [-2, 15], [10, 0]])
X = np.vstack([X_normal, X_outliers])
outliers, mahal_dist, threshold = mahalanobis_outlier_detection(X)
print(f"马氏距离阈值: {threshold:.2f}")
print(f"检测到的异常值索引: {outliers}")
print(f"各点马氏距离(前5个): {mahal_dist[:5].round(2)}")
马氏距离的注意事项
协方差矩阵必须可逆(无完全共线性)。当特征数 p 大于样本数 n 时,需要使用正则化或收缩估计。马氏距离对异常值本身敏感,可以使用 MCD(Minimum Covariance Determinant)增强稳健性。
5.2 PCA 重建误差
PCA(主成分分析)通过降维来检测异常值。基本思路是:正常数据可以用少数主成分很好地重构,而异常数据的重构误差较大。通过比较原始数据与降维后重建数据的差异,可以有效发现异常。
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import numpy as np
np.random.seed(42)
n_features = 5
X_normal = np.random.randn(200, n_features)
X_normal[:, 2] = X_normal[:, 0] * 0.7 + X_normal[:, 1] * 0.3
X_normal[:, 4] = X_normal[:, 3] * 0.8 + np.random.randn(200) * 0.2
X_outliers = np.random.randn(10, n_features) * 3
X = np.vstack([X_normal, X_outliers])
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
pca = PCA(n_components=0.80)
X_reduced = pca.fit_transform(X_scaled)
X_reconstructed = pca.inverse_transform(X_reduced)
reconstruction_error = np.sum((X_scaled - X_reconstructed)**2, axis=1)
threshold = np.percentile(reconstruction_error, 95)
outliers_pca = np.where(reconstruction_error > threshold)[0]
print(f"保留的主成分数量: {pca.n_components_}")
print(f"解释方差比: {pca.explained_variance_ratio_.round(3)}")
print(f"重建误差阈值 (95%分位数): {threshold:.3f}")
print(f"检测到的异常值数量: {len(outliers_pca)}")
print(f"异常值的平均重建误差: {reconstruction_error[outliers_pca].mean():.3f}")
print(f"正常值的平均重建误差: {reconstruction_error[~np.isin(np.arange(len(X)), outliers_pca)].mean():.3f}")
PCA 异常检测的两种视角
- 主成分得分异常: 数据在前几个主成分上的得分偏离正常范围,表示数据在主要变异方向上异常
- 重建误差异常: 数据在"被丢弃"的主成分上有较大投影(即 Q-statistic / SPE 统计量高),表示数据不符合训练数据的相关结构
- Hotelling's T²: 衡量数据在前 k 个主成分上的偏离程度,是另一种常用的 PCA 异常度指标
5.3 自动编码器
自动编码器(Autoencoder)是一种神经网络模型,通过编码器-解码器结构学习数据的低维表示。与 PCA 类似,正常数据的重建误差小,异常数据的重建误差大。自动编码器能够捕捉非线性关系,适用于复杂的高维数据。
import numpy as np
from sklearn.neural_network import MLPRegressor
np.random.seed(42)
n_samples, n_features = 300, 10
X = np.random.randn(n_samples, n_features)
X[:280] += 0.5
autoencoder = MLPRegressor(
hidden_layer_sizes=(16, 8, 16),
activation='relu',
max_iter=500,
random_state=42
)
autoencoder.fit(X, X)
reconstruction_error_ae = np.mean((X - autoencoder.predict(X))**2, axis=1)
threshold_ae = np.percentile(reconstruction_error_ae, 95)
outliers_ae = np.where(reconstruction_error_ae > threshold_ae)[0]
print(f"自动编码器检测到的异常值数量: {len(outliers_ae)}")
print(f"重建误差阈值: {threshold_ae:.4f}")
深度异常检测方法的优势
自动编码器和变分自动编码器(VAE)在图像异常检测、序列异常检测等任务中表现优异,能够捕捉数据中的非线性复杂模式。训练时仅使用正常数据,对未见过的异常模式有良好的泛化能力。数据维度越高、关系越复杂,深度方法的优势越明显。
六、时间序列异常检测
时间序列数据的异常检测需要考虑时间依赖性和趋势、季节性等成分。以下介绍三种广泛应用的时间序列异常检测方法。
6.1 移动平均法
移动平均法通过计算滑动窗口内的均值作为预测值,以实际值与预测值之间的偏差超过阈值为依据判断异常。简单移动平均(SMA)、指数加权移动平均(EWMA)是两种常见的实现方式。
import numpy as np
import pandas as pd
def moving_average_anomaly_detection(series, window=10, threshold=3):
rolling_mean = series.rolling(window=window, center=True).mean()
rolling_std = series.rolling(window=window, center=True).std()
z_scores = (series - rolling_mean) / rolling_std
is_anomaly = np.abs(z_scores) > threshold
return is_anomaly, z_scores
np.random.seed(42)
n = 300
t = np.arange(n)
trend = t * 0.02
seasonal = 3 * np.sin(2 * np.pi * t / 30)
noise = np.random.randn(n) * 0.5
series = pd.Series(trend + seasonal + noise)
series[50] += 8
series[150] -= 10
series[230] += 12
is_anomaly, z_scores = moving_average_anomaly_detection(series)
print(f"检测到的异常点数量: {np.sum(is_anomaly)}")
print(f"异常点索引: {np.where(is_anomaly)[0]}")
EWMA vs SMA
- SMA(简单移动平均): 窗口内的每个点权重相同,对突变反应较慢但更稳定
- EWMA(指数加权移动平均): 越近的点权重越大,对近期变化更敏感,适合快速检测异常
- 适用场景: 移动平均法适合趋势和季节性较稳定的时间序列,对突发性尖峰异常敏感
6.2 STL 分解
STL(Seasonal-Trend decomposition using LOESS)将时间序列分解为趋势、季节性和残差三个部分。残差部分反映了去除规律性成分后的随机波动,残差中的大幅度波动即为异常信号。
import numpy as np
import pandas as pd
from statsmodels.tsa.seasonal import STL
np.random.seed(42)
n = 500
t = np.arange(n)
trend = 0.01 * t + 5 * np.sin(2 * np.pi * t / 200)
seasonal = 2 * np.sin(2 * np.pi * t / 12)
noise = np.random.randn(n) * 0.3
series = trend + seasonal + noise
series[[100, 200, 350, 400]] += [5, -6, 7, -5]
stl = STL(series, period=12, robust=True)
result = stl.fit()
trend_comp = result.trend
seasonal_comp = result.seasonal
residual = result.resid
residual_median = np.median(residual)
residual_mad = np.median(np.abs(residual - residual_median))
residual_z = 0.6745 * (residual - residual_median) / residual_mad
is_anomaly_stl = np.abs(residual_z) > 3.5
print(f"STL 分解检测到的异常点数量: {np.sum(is_anomaly_stl)}")
print(f"异常点索引: {np.where(is_anomaly_stl)[0]}")
STL 分解的优势
STL 对缺失值和异常值具有鲁棒性(使用 LOESS 拟合),能够处理多种季节性周期,且趋势和季节成分平滑变化。在监控系统的指标异常检测(如服务器 CPU 使用率、网络流量等)中,STL 分解是业界标准方法之一。
6.3 Twitter AnomalyDetection
Twitter 开源的 AnomalyDetection(现已发展为 Twitter's Seasonal Hybrid ESD 算法)结合了 STL 分解和广义 ESD 检验,能够自动检测时间序列中的异常点,尤其适合具有强季节性模式的数据。
import numpy as np
import pandas as pd
def seasonal_hybrid_esd(series, period=24, max_anomalies=0.1, alpha=0.05):
from statsmodels.tsa.seasonal import STL
from scipy.stats import t as t_dist
stl = STL(series, period=period, robust=True)
result = stl.fit()
deseasonalized = series - result.seasonal
median = np.median(deseasonalized)
mad = np.median(np.abs(deseasonalized - median))
normalized = np.abs(deseasonalized - median) / (mad + 1e-10)
n = len(series)
if isinstance(max_anomalies, float):
max_outliers = int(n * max_anomalies)
else:
max_outliers = int(max_anomalies)
outliers = []
remaining = normalized.copy()
original_idx = np.arange(n)
for i in range(min(max_outliers, n // 2)):
if len(remaining) < 3:
break
max_idx = np.argmax(remaining)
max_val = remaining[max_idx]
df = n - i - 2
if df <= 0:
break
t_crit = t_dist.ppf(1 - alpha / (2 * (n - i)), df)
lambda_crit = (n - i) * t_crit / np.sqrt((df + t_crit**2) * (df + 1))
if max_val > lambda_crit:
outliers.append(original_idx[np.argmax(remaining)])
remaining = np.delete(remaining, max_idx)
original_idx = np.delete(original_idx, max_idx)
else:
break
return np.array(outliers), normalized
np.random.seed(42)
n = 200
t = np.arange(n)
daily_season = 2 * np.sin(2 * np.pi * t / 24)
hourly_noise = np.random.randn(n) * 0.3
series = pd.Series(daily_season + hourly_noise)
series[[30, 60, 120, 180]] += [5, -6, 7, -5]
outliers, norms = seasonal_hybrid_esd(series, period=24)
print(f"S-H-ESD 检测到的异常点: {outliers}")
Twitter AnomalyDetection 在运维监控领域有着广泛应用。其 R 语言实现提供了友好的可视化界面,Python 中可以使用 greykite 或 adtk 等库实现类似功能。
七、异常值处理策略
检测到异常值之后,需要根据业务场景和分析目的选择合适的处理策略。不同的策略各有优劣,需权衡取舍。
策略一:截尾(Winsorize)
将异常值替换为指定分位数处的值(如将低于 1% 分位数的值替换为 1% 分位数,高于 99% 的替换为 99% 分位数)。这种方法保留了样本量,同时限制了极端值的影响。
from scipy.stats.mstats import winsorize
import numpy as np
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200])
winsorized_data = winsorize(data, limits=[0.1, 0.1])
print(f"原始数据: {data}")
print(f"截尾后: {winsorized_data}")
p1, p99 = np.percentile(data, [1, 99])
data_winsorized = np.clip(data, p1, p99)
print(f"1%-99% 截尾: {data_winsorized}")
策略二:删除异常值
直接删除包含异常值的记录。适用于:数据录入错误、传感器故障等明确无效的数据。删除方法简单直接,但会损失样本量,且可能引入选择性偏差。
import numpy as np
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200])
q1, q3 = np.percentile(data, [25, 75])
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
data_clean = data[(data >= lower) & (data <= upper)]
print(f"原始数据量: {len(data)}, 删除后: {len(data_clean)}")
print(f"删除的值: {data[~((data >= lower) & (data <= upper))]}")
策略三:替换为均值/中位数
用均值、中位数或众数替换异常值。中位数替换对异常值更稳健。这种方法简单,但会降低数据的方差,可能导致统计检验效力下降。
import numpy as np
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200])
median_val = np.median(data)
z_scores = np.abs((data - np.mean(data)) / np.std(data))
outliers = z_scores > 3
data_replaced = data.copy()
data_replaced[outliers] = median_val
print(f"替换前: {data}")
print(f"替换后 (中位数={median_val}): {data_replaced}")
策略四:分箱(Binning)
将连续变量离散化为有序类别,异常值被自动归入首尾箱中。分箱可以消除异常值的极端影响,但会丢失信息。等宽分箱、等频分箱和基于聚类的分箱是常用的分箱方法。
import numpy as np
import pandas as pd
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200])
binned = pd.cut(data, bins=5, labels=False)
print(f"等宽分箱结果: {binned}")
binned_qcut = pd.qcut(data, q=5, labels=False, duplicates='drop')
print(f"等频分箱结果: {binned_qcut}")
策略五:单独建模
对异常值单独建立模型进行处理。例如,在信用风险建模中,对正常客户和异常客户分别建立评分卡模型。这种方法最为精细但也最为复杂,适合异常值包含重要业务信息的场景。
import numpy as np
from sklearn.ensemble import RandomForestRegressor
np.random.seed(42)
n = 1000
X = np.random.randn(n, 5)
y = 2 * X[:, 0] + X[:, 1] - 0.5 * X[:, 2] + np.random.randn(n) * 0.3
y[::20] += np.random.randn(n // 20) * 5
from sklearn.ensemble import IsolationForest
iso = IsolationForest(contamination=0.05)
is_outlier = iso.fit_predict(X) == -1
model_main = RandomForestRegressor(n_estimators=100, random_state=42)
model_main.fit(X[~is_outlier], y[~is_outlier])
model_aux = RandomForestRegressor(n_estimators=50, random_state=42)
model_aux.fit(X[is_outlier], y[is_outlier])
y_pred = np.zeros(n)
y_pred[~is_outlier] = model_main.predict(X[~is_outlier])
y_pred[is_outlier] = model_aux.predict(X[is_outlier])
print(f"正常数据建模样本量: {np.sum(~is_outlier)}")
print(f"异常数据建模样本量: {np.sum(is_outlier)}")
异常值处理策略选择指南
| 策略 |
适用场景 |
优点 |
缺点 |
| 截尾(Winsorize) |
异常值为极端但合理的值 |
保留样本量,限制极端影响 |
人为改变数据分布 |
| 删除 |
明确的错误数据 |
简单直接,消除污染 |
损失样本量,可能引入偏差 |
| 替换为均值/中位数 |
异常值比例很小 |
实现简单,保留样本量 |
降低方差,弱化信号 |
| 分箱 |
需要将连续变离散化 |
自动消除异常值影响 |
丢失信息,粒度降低 |
| 单独建模 |
异常值包含重要业务信息 |
保留异常信息,模型精度高 |
复杂度高,需足够异常样本 |
八、异常值的业务含义分析
异常值不仅是数据清洗的对象,更可能蕴含着重要的业务价值。在不同领域中,异常值往往代表着值得深入挖掘的信号。
8.1 金融风控领域
在金融领域,异常值检测是反欺诈的核心技术。异常的信用卡交易(如突然的大额消费、异地消费)、异常的账户登录行为(如深夜高频登录、大批量查询)、异常的资金流向(如拆分转账、快进快出)等都是欺诈行为的典型特征。一些金融机构将异常交易检出率作为核心风控指标,每提升 1% 的异常检出率就能挽回数亿元的损失。
8.2 工业质检领域
在制造业中,传感器采集的生产数据中的异常值往往是设备故障或产品质量问题的早期信号。例如,振动传感器的异常波动可能预示轴承磨损;温度传感器的突然升高可能意味着散热系统故障。通过及时识别这些异常,企业可以实现预测性维护,减少非计划停机的损失。
8.3 医疗健康领域
医疗数据中的异常值可能代表罕见病、药物不良反应或数据录入错误。例如,某项生化指标的极端值可能是急性疾病的征兆;心电图的异常波形可能是心律失常的信号。在医学研究中,异常值的深入分析有时能带来新的医学发现。但需注意,医疗数据的异常值需要医生结合临床判断审慎处理。
8.4 电商与推荐系统
电商平台的异常流量(如 DDoS 攻击、爬虫抓取)会扭曲流量分析结果,影响运营决策。用户行为中的异常模式可能代表刷单、恶意差评等违规行为。同时,抢购、秒杀等活动产生的异常流量峰值需要系统具有弹性扩展能力。对异常用户行为的识别能有效维护平台生态健康。
8.5 网络运维领域
在 IT 运维中,服务器指标(CPU 使用率、内存占用、请求延迟等)的异常是系统故障的前兆。AIOps 领域大量应用异常检测技术来实现智能预警。通常使用"3σ"基线或动态阈值来定义关键指标的异常范围,一旦检测到异常立即触发告警工单。
业务视角的异常值处理原则
- 不机械删除: 先判断异常值的来源和含义,再决定处理方式
- 区分处理: 不同特征的异常值应采取不同的处理策略
- 记录留痕: 所有异常值的识别和处理过程应有文档记录
- 交叉验证: 结合多种方法检测结果综合判断,避免单一方法误判
- 领域知识优先: 业务专家对异常值的判断往往比纯统计方法更可靠
"在数据科学的实践中,一个被标记为异常的数据点可能比一千个正常数据点更有价值。它是噪声还是信号,取决于分析者是否具备足够的领域知识来解读它。"