用 Python 写一个“高可维护日志系统 + 运行追踪 ID”脚手架
用 Python 写一个“高可维护日志系统 + 运行追踪 ID”脚手架(含完整代码)
脚本跑得久、任务跑得多的时候,最难受的不是报错,而是:你不知道它到底在干什么。控制台刷屏、信息杂乱、错误找不到上下文、不同任务混在一起,排查成本非常高。一个实用的做法是给每次运行生成一个 Run ID,给每个任务生成 Trace ID,并把日志同时输出到控制台和文件,格式统一、可检索、可回放。
这篇我给你做一个“可直接复用”的 Python 日志脚手架,适合:
- 批量发布/批量导入/爬虫
- 多线程/异步任务
- 需要定位某个任务为什么失败
- 需要把日志发给客户或自己留档
下面代码 Python 3.10+ 可运行,复制为 logger_bootstrap.py 即可。
目标:结构化日志 + 追踪 ID + 文件落盘 + 错误堆栈
我们实现:
- 每次启动生成
run_id - 每个任务有
trace_id - 日志格式:时间 + 级别 + run_id + trace_id + message + extra(JSON)
- 控制台输出简洁
- 文件输出详细(含堆栈)
- 自动按日期/运行归档
1)生成 Run ID 与 Trace ID
from future import annotations
import os
import uuid
import time
from dataclasses import dataclass
from datetime import datetime
def new_run_id() -> str:
# 例:20260107_185533_8f2a1c
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
short = uuid.uuid4().hex[:6]
return f"{ts}_{short}"
def new_trace_id() -> str:
return uuid.uuid4().hex[:10]
2)统一日志格式(console 简洁、file 详细)
这里用 Python 标准库 logging,无需额外依赖。
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional
class JsonFormatter(logging.Formatter):
"""把日志变成 JSON 行,便于检索/回放。"""
def format(self, record: logging.LogRecord) -> str:
base: Dict[str, Any] = {
"time": datetime.fromtimestamp(record.created).isoformat(timespec="seconds"),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
}
# 追加自定义字段
for k in ("run_id", "trace_id"):
if hasattr(record, k):
base[k] = getattr(record, k)
if hasattr(record, "extra_data"):
base["extra"] = getattr(record, "extra_data")
if record.exc_info:
base["exc"] = self.formatException(record.exc_info)
return json.dumps(base, ensure_ascii=False)</code></pre>
3)Logger 工厂:一次配置,全局复用
@dataclass
class LoggerBundle:
logger: logging.Logger
run_id: str
log_file: Path
def setup_logger(app_name: str = "runner", log_dir: str = "logs") -> LoggerBundle:
run_id = new_run_id()
root = logging.getLogger(app_name)
root.setLevel(logging.INFO)
root.handlers.clear()
root.propagate = False
Path(log_dir).mkdir(parents=True, exist_ok=True)
log_file = Path(log_dir) / f"{app_name}_{run_id}.jsonl"
# Console handler(简洁)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter(
fmt="%(asctime)s | %(levelname)s | run=%(run_id)s | trace=%(trace_id)s | %(message)s",
datefmt="%H:%M:%S"
))
# File handler(JSON 行)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(JsonFormatter())
root.addHandler(ch)
root.addHandler(fh)
# 默认 trace_id 先给一个占位
root = logging.LoggerAdapter(root, {"run_id": run_id, "trace_id": "-"})
return LoggerBundle(logger=root, run_id=run_id, log_file=log_file)</code></pre>
4)任务上下文:让 trace_id 自动注入
我们用一个小工具,让每个任务执行时自动带上 trace_id,并支持额外字段。
from contextlib import contextmanager
@contextmanager
def task_context(logger: logging.LoggerAdapter, trace_id: str):
# 临时注入 trace_id
old = logger.extra.get("trace_id", "-")
logger.extra["trace_id"] = trace_id
try:
yield
finally:
logger.extra["trace_id"] = old
def log_with_extra(logger: logging.LoggerAdapter, msg: str, **extra):
# 兼容 console 与 file(file 会输出 extra_data)
logger.info(msg, extra={"extra_data": extra, "run_id": logger.extra["run_id"], "trace_id": logger.extra["trace_id"]})
5)示例:模拟批量任务(含失败堆栈)
import random
def do_one_job(i: int) -> None:
# 模拟业务:20% 概率失败
if random.random() < 0.2:
raise RuntimeError(f"job {i} temporary failure")
time.sleep(random.uniform(0.05, 0.15))
6)主程序:跑一批任务,并输出可回放日志
def main():
bundle = setup_logger(app_name="batch_runner", log_dir="logs")
logger = bundle.logger
logger.info(f"start run, log_file={bundle.log_file}", extra={"run_id": bundle.run_id, "trace_id": "-"})
total = 30
ok = 0
failed = 0
for i in range(total):
trace_id = new_trace_id()
with task_context(logger, trace_id):
try:
log_with_extra(logger, "job start", job_index=i)
do_one_job(i)
log_with_extra(logger, "job success", job_index=i)
ok += 1
except Exception:
failed += 1
# console 显示简洁,file 自动带堆栈
logger.exception("job failed", extra={"run_id": bundle.run_id, "trace_id": trace_id})
logger.info(
f"run finished total={total} ok={ok} failed={failed}",
extra={"run_id": bundle.run_id, "trace_id": "-"}
)
print(f"\n日志已写入:{bundle.log_file}")
if name == "main":
main()
你用它能解决什么痛点
- 定位某次运行:用 run_id 精确查一轮任务
- 定位某个任务:用 trace_id 找这个任务的完整生命周期
- 可回放:JSONL 每行一个事件,后面做统计/可视化很方便
- 可发给客户/团队:日志干净统一,不用截图控制台
- 与任务队列结合:你上一篇的队列框架里,worker 直接注入 trace_id 就行
评论 0