用 Python 搭一个“轻量级 AI 异常检测”管道:从日志到告警(可落地)
用 Python 搭一个“轻量级 AI 异常检测”管道:从日志到告警(可落地)
很多“人工智能落地”并不是做炫酷的生成式模型,而是把一类非常实用的问题解决掉:异常检测。比如你做站群、做电商、做 API 服务,最头疼的往往不是功能开发,而是“系统什么时候开始变慢、什么时候开始出错、什么时候被打了异常流量”。 我更偏向用一种可维护的方法:不用深度学习、也不依赖 GPU,直接用 Python + 传统机器学习做一个可解释、可持续迭代的异常检测小系统。
下面这篇我会从工程视角讲清楚:数据怎么取、特征怎么做、模型怎么选、阈值怎么定、以及上线后的漂移怎么监控。
1)异常检测的真实难点:不是模型,是“指标定义”
很多人上来就问:Isolation Forest、One-Class SVM、还是 LSTM? 但真正决定效果的,是你定义什么叫“异常”。对网站运维来说,异常不是抽象的,它通常是:
- 请求量突然飙升(可能是爬虫/攻击)
- 5xx 比例突然升高(服务不稳或依赖故障)
- 平均响应时间上升(慢查询/缓存失效)
- 某个国家/ASN 流量异常集中(异常来源)
- 某个路径(如
/wp-login.php)被爆破扫描
所以第一步是:把“异常”变成可计算的指标。最常见的是按分钟或 5 分钟聚合:
- pv、uv、ip_count
- status_2xx、status_4xx、status_5xx
- avg_rt、p95_rt(如果有)
- top_path_entropy(路径分布是否异常)
- top_ip_ratio(头部 IP 是否过于集中)
2)数据输入:用最小成本从日志聚合成特征表
你不需要一开始就上 Kafka/ES。最小可行的做法是: 把 Nginx 访问日志按时间窗口聚合成 CSV(或直接 DataFrame),每行代表一个窗口。
假设你已经有结构化字段(时间、IP、状态码、请求路径、响应时间等),输出类似:
| ts_minute | req | ip_cnt | 4xx | 5xx | avg_rt |
|---|---|---|---|---|---|
| 10:01 | 120 | 80 | 12 | 1 | 0.23 |
| 10:02 | 980 | 410 | 300 | 20 | 0.91 |
接下来,我们用这些数值做异常检测。
3)模型选择:Isolation Forest(轻量、稳、工程友好)
在无监督异常检测里,Isolation Forest很适合运维指标:
- 不需要异常标签(现实里你也没有)
- 对非线性分布适应较好
- 输出一个 anomaly score,容易设阈值
- 速度快,部署简单
示例代码(直接可跑,假设你的数据已经在 DataFrame):
# pip install pandas scikit-learn
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
df = pd.DataFrame({
"req": [120, 130, 140, 150, 980, 160, 155],
"ip_cnt": [80, 85, 90, 95, 410, 98, 96 ],
"4xx": [12, 10, 11, 9, 300, 8, 10 ],
"5xx": [1, 1, 0, 1, 20, 1, 0 ],
"avg_rt": [0.23,0.24,0.25,0.23,0.91,0.22,0.24],
})
pipe = Pipeline([
("scaler", StandardScaler()),
("iso", IsolationForest(
n_estimators=300,
contamination=0.05, # 先用经验值,后续再校准
random_state=42
))
])
pipe.fit(df)
IsolationForest: -1 表示异常,1 表示正常
pred = pipe.predict(df)
score = pipe.decision_function(df) # 越小越异常
df["is_anomaly"] = (pred == -1)
df["score"] = score
print(df.sort_values("score").head(3))
你会看到请求暴涨那行的 score 显著更低。
4)阈值怎么定:别迷信 contamination
很多人用 contamination=0.01 或 0.05 就完事了,但上线后会遇到两个问题:
- 业务高峰被当异常(误报)
- 攻击逐渐升温时模型反应慢(漏报)
更稳的做法是:使用“分位数阈值 + 业务规则兜底”。
比如:
- score 低于历史 1% 分位数 → 可疑
- 同时 req 增幅 > 3 倍 或 5xx 比例 > 2% → 真异常
示例(伪代码逻辑):
import numpy as np
threshold = np.quantile(df["score"], 0.01)
df["suspicious"] = df["score"] < threshold
df["real_anomaly"] = df["suspicious"] & (
(df["req"] > df["req"].rolling(10).mean() * 3) |
(df["5xx"] / df["req"] > 0.02)
)
这样会更贴近运维真实需求:宁可先抓到可疑,再用业务规则压误报。
5)可解释性:异常到底“异常在哪”?
无监督模型常被吐槽“黑箱”。工程上,我会加一个解释层: 对每条异常记录,计算它相对最近窗口平均值的偏离程度,输出前 3 个偏离最大的指标。
import numpy as np
window = 5
baseline = df.rolling(window).mean()
def explain_row(i):
if i < window:
return None
diff = (df.loc[i, ["req","ip_cnt","4xx","5xx","avg_rt"]] - baseline.loc[i]).abs()
top = diff.sort_values(ascending=False).head(3).index.tolist()
return ",".join(top)
df["explain"] = [explain_row(i) for i in range(len(df))]
print(df[df["is_anomaly"]][["req","ip_cnt","4xx","5xx","avg_rt","score","explain"]])
当你收到告警时,不是“模型说异常”,而是“异常主要来自 req + 4xx + ip_cnt”。这在定位问题时非常省时间。
6)上线后的关键:漂移监控(否则模型会慢慢失效)
运维指标会漂移,原因可能是:
- 业务增长导致流量基线变大
- 缓存策略改变导致响应时间分布变了
- 新页面上线导致路径分布变了
- 攻击者改变策略,慢慢“温水煮青蛙”
工程上至少要做两件事:
- 定期重训:比如每天/每周用最近 N 天数据重训
- 监控误报率:如果告警太多但都不是问题,阈值或规则要调整
最可怕的情况不是误报,而是“你以为没事,但模型已经钝化”。
7)总结:轻量 AI 的价值是“可维护”
异常检测不是炫技项目,它是稳定性工程的一部分。一个可落地的系统通常具备:
- 输入可得:日志/指标能稳定收集
- 特征简单:按分钟聚合即可
- 模型轻量:Isolation Forest 足够好
- 阈值合理:分位数 + 业务规则兜底
- 可解释:告诉你异常来自哪些指标
- 可迭代:重训与漂移监控能跑起来
如果你把这条链路跑通了,你会发现:AI 并不神秘,它更像一个“统计增强器”,帮你把原本靠经验盯图表的工作,变成可自动化的预警系统。
评论 0