用 Python 搭一个“轻量级 AI 异常检测”管道:从日志到告警(可落地)

用 Python 搭一个“轻量级 AI 异常检测”管道:从日志到告警(可落地)

很多“人工智能落地”并不是做炫酷的生成式模型,而是把一类非常实用的问题解决掉:异常检测。比如你做站群、做电商、做 API 服务,最头疼的往往不是功能开发,而是“系统什么时候开始变慢、什么时候开始出错、什么时候被打了异常流量”。 我更偏向用一种可维护的方法:不用深度学习、也不依赖 GPU,直接用 Python + 传统机器学习做一个可解释、可持续迭代的异常检测小系统。

下面这篇我会从工程视角讲清楚:数据怎么取、特征怎么做、模型怎么选、阈值怎么定、以及上线后的漂移怎么监控。


1)异常检测的真实难点:不是模型,是“指标定义”

很多人上来就问:Isolation Forest、One-Class SVM、还是 LSTM? 但真正决定效果的,是你定义什么叫“异常”。对网站运维来说,异常不是抽象的,它通常是:

  • 请求量突然飙升(可能是爬虫/攻击)
  • 5xx 比例突然升高(服务不稳或依赖故障)
  • 平均响应时间上升(慢查询/缓存失效)
  • 某个国家/ASN 流量异常集中(异常来源)
  • 某个路径(如 /wp-login.php)被爆破扫描

所以第一步是:把“异常”变成可计算的指标。最常见的是按分钟或 5 分钟聚合:

  • pv、uv、ip_count
  • status_2xx、status_4xx、status_5xx
  • avg_rt、p95_rt(如果有)
  • top_path_entropy(路径分布是否异常)
  • top_ip_ratio(头部 IP 是否过于集中)

2)数据输入:用最小成本从日志聚合成特征表

你不需要一开始就上 Kafka/ES。最小可行的做法是: 把 Nginx 访问日志按时间窗口聚合成 CSV(或直接 DataFrame),每行代表一个窗口。

假设你已经有结构化字段(时间、IP、状态码、请求路径、响应时间等),输出类似:

ts_minute req ip_cnt 4xx 5xx avg_rt
10:01 120 80 12 1 0.23
10:02 980 410 300 20 0.91

接下来,我们用这些数值做异常检测。


3)模型选择:Isolation Forest(轻量、稳、工程友好)

在无监督异常检测里,Isolation Forest很适合运维指标:

  • 不需要异常标签(现实里你也没有)
  • 对非线性分布适应较好
  • 输出一个 anomaly score,容易设阈值
  • 速度快,部署简单

示例代码(直接可跑,假设你的数据已经在 DataFrame):

# pip install pandas scikit-learn

import pandas as pd from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline

df = pd.DataFrame({ "req": [120, 130, 140, 150, 980, 160, 155], "ip_cnt": [80, 85, 90, 95, 410, 98, 96 ], "4xx": [12, 10, 11, 9, 300, 8, 10 ], "5xx": [1, 1, 0, 1, 20, 1, 0 ], "avg_rt": [0.23,0.24,0.25,0.23,0.91,0.22,0.24], })

pipe = Pipeline([ ("scaler", StandardScaler()), ("iso", IsolationForest( n_estimators=300, contamination=0.05, # 先用经验值,后续再校准 random_state=42 )) ])

pipe.fit(df)

IsolationForest: -1 表示异常,1 表示正常

pred = pipe.predict(df) score = pipe.decision_function(df) # 越小越异常

df["is_anomaly"] = (pred == -1) df["score"] = score print(df.sort_values("score").head(3))

你会看到请求暴涨那行的 score 显著更低。


4)阈值怎么定:别迷信 contamination

很多人用 contamination=0.01 或 0.05 就完事了,但上线后会遇到两个问题:

  1. 业务高峰被当异常(误报)
  2. 攻击逐渐升温时模型反应慢(漏报)

更稳的做法是:使用“分位数阈值 + 业务规则兜底”。

比如:

  • score 低于历史 1% 分位数 → 可疑
  • 同时 req 增幅 > 3 倍 或 5xx 比例 > 2% → 真异常

示例(伪代码逻辑):

import numpy as np

threshold = np.quantile(df["score"], 0.01)

df["suspicious"] = df["score"] < threshold df["real_anomaly"] = df["suspicious"] & ( (df["req"] > df["req"].rolling(10).mean() * 3) | (df["5xx"] / df["req"] > 0.02) )

这样会更贴近运维真实需求:宁可先抓到可疑,再用业务规则压误报


5)可解释性:异常到底“异常在哪”?

无监督模型常被吐槽“黑箱”。工程上,我会加一个解释层: 对每条异常记录,计算它相对最近窗口平均值的偏离程度,输出前 3 个偏离最大的指标。

import numpy as np

window = 5 baseline = df.rolling(window).mean()

def explain_row(i): if i < window: return None diff = (df.loc[i, ["req","ip_cnt","4xx","5xx","avg_rt"]] - baseline.loc[i]).abs() top = diff.sort_values(ascending=False).head(3).index.tolist() return ",".join(top)

df["explain"] = [explain_row(i) for i in range(len(df))] print(df[df["is_anomaly"]][["req","ip_cnt","4xx","5xx","avg_rt","score","explain"]])

当你收到告警时,不是“模型说异常”,而是“异常主要来自 req + 4xx + ip_cnt”。这在定位问题时非常省时间。


6)上线后的关键:漂移监控(否则模型会慢慢失效)

运维指标会漂移,原因可能是:

  • 业务增长导致流量基线变大
  • 缓存策略改变导致响应时间分布变了
  • 新页面上线导致路径分布变了
  • 攻击者改变策略,慢慢“温水煮青蛙”

工程上至少要做两件事:

  1. 定期重训:比如每天/每周用最近 N 天数据重训
  2. 监控误报率:如果告警太多但都不是问题,阈值或规则要调整

最可怕的情况不是误报,而是“你以为没事,但模型已经钝化”。


7)总结:轻量 AI 的价值是“可维护”

异常检测不是炫技项目,它是稳定性工程的一部分。一个可落地的系统通常具备:

  • 输入可得:日志/指标能稳定收集
  • 特征简单:按分钟聚合即可
  • 模型轻量:Isolation Forest 足够好
  • 阈值合理:分位数 + 业务规则兜底
  • 可解释:告诉你异常来自哪些指标
  • 可迭代:重训与漂移监控能跑起来

如果你把这条链路跑通了,你会发现:AI 并不神秘,它更像一个“统计增强器”,帮你把原本靠经验盯图表的工作,变成可自动化的预警系统。

评论 0