用 Python 写一个“高可维护日志系统 + 运行追踪 ID”脚手架

用 Python 写一个“高可维护日志系统 + 运行追踪 ID”脚手架(含完整代码)

脚本跑得久、任务跑得多的时候,最难受的不是报错,而是:你不知道它到底在干什么。控制台刷屏、信息杂乱、错误找不到上下文、不同任务混在一起,排查成本非常高。一个实用的做法是给每次运行生成一个 Run ID,给每个任务生成 Trace ID,并把日志同时输出到控制台和文件,格式统一、可检索、可回放。

这篇我给你做一个“可直接复用”的 Python 日志脚手架,适合:

  • 批量发布/批量导入/爬虫
  • 多线程/异步任务
  • 需要定位某个任务为什么失败
  • 需要把日志发给客户或自己留档

下面代码 Python 3.10+ 可运行,复制为 logger_bootstrap.py 即可。


目标:结构化日志 + 追踪 ID + 文件落盘 + 错误堆栈

我们实现:

  • 每次启动生成 run_id
  • 每个任务有 trace_id
  • 日志格式:时间 + 级别 + run_id + trace_id + message + extra(JSON)
  • 控制台输出简洁
  • 文件输出详细(含堆栈)
  • 自动按日期/运行归档

1)生成 Run ID 与 Trace ID

from future import annotations
import os
import uuid
import time
from dataclasses import dataclass
from datetime import datetime

def new_run_id() -> str: # 例:20260107_185533_8f2a1c ts = datetime.now().strftime("%Y%m%d_%H%M%S") short = uuid.uuid4().hex[:6] return f"{ts}_{short}"

def new_trace_id() -> str: return uuid.uuid4().hex[:10]


2)统一日志格式(console 简洁、file 详细)

这里用 Python 标准库 logging,无需额外依赖。

import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional

class JsonFormatter(logging.Formatter): """把日志变成 JSON 行,便于检索/回放。"""

def format(self, record: logging.LogRecord) -> str:
    base: Dict[str, Any] = {
        "time": datetime.fromtimestamp(record.created).isoformat(timespec="seconds"),
        "level": record.levelname,
        "message": record.getMessage(),
        "logger": record.name,
    }
    # 追加自定义字段
    for k in ("run_id", "trace_id"):
        if hasattr(record, k):
            base[k] = getattr(record, k)

    if hasattr(record, "extra_data"):
        base["extra"] = getattr(record, "extra_data")

    if record.exc_info:
        base["exc"] = self.formatException(record.exc_info)

    return json.dumps(base, ensure_ascii=False)</code></pre>


3)Logger 工厂:一次配置,全局复用

@dataclass
class LoggerBundle:
    logger: logging.Logger
    run_id: str
    log_file: Path

def setup_logger(app_name: str = "runner", log_dir: str = "logs") -&gt; LoggerBundle: run_id = new_run_id() root = logging.getLogger(app_name) root.setLevel(logging.INFO) root.handlers.clear() root.propagate = False

Path(log_dir).mkdir(parents=True, exist_ok=True)
log_file = Path(log_dir) / f&quot;{app_name}_{run_id}.jsonl&quot;

# Console handler(简洁)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter(
    fmt=&quot;%(asctime)s | %(levelname)s | run=%(run_id)s | trace=%(trace_id)s | %(message)s&quot;,
    datefmt=&quot;%H:%M:%S&quot;
))

# File handler(JSON 行)
fh = logging.FileHandler(log_file, encoding=&quot;utf-8&quot;)
fh.setLevel(logging.INFO)
fh.setFormatter(JsonFormatter())

root.addHandler(ch)
root.addHandler(fh)

# 默认 trace_id 先给一个占位
root = logging.LoggerAdapter(root, {&quot;run_id&quot;: run_id, &quot;trace_id&quot;: &quot;-&quot;})

return LoggerBundle(logger=root, run_id=run_id, log_file=log_file)</code></pre>


4)任务上下文:让 trace_id 自动注入

我们用一个小工具,让每个任务执行时自动带上 trace_id,并支持额外字段。

from contextlib import contextmanager

@contextmanager def task_context(logger: logging.LoggerAdapter, trace_id: str): # 临时注入 trace_id old = logger.extra.get("trace_id", "-") logger.extra["trace_id"] = trace_id try: yield finally: logger.extra["trace_id"] = old

def log_with_extra(logger: logging.LoggerAdapter, msg: str, **extra): # 兼容 console 与 file(file 会输出 extra_data) logger.info(msg, extra={"extra_data": extra, "run_id": logger.extra["run_id"], "trace_id": logger.extra["trace_id"]})


5)示例:模拟批量任务(含失败堆栈)

import random

def do_one_job(i: int) -&gt; None: # 模拟业务:20% 概率失败 if random.random() &lt; 0.2: raise RuntimeError(f"job {i} temporary failure") time.sleep(random.uniform(0.05, 0.15))


6)主程序:跑一批任务,并输出可回放日志

def main():
    bundle = setup_logger(app_name="batch_runner", log_dir="logs")
    logger = bundle.logger

logger.info(f&quot;start run, log_file={bundle.log_file}&quot;, extra={&quot;run_id&quot;: bundle.run_id, &quot;trace_id&quot;: &quot;-&quot;})

total = 30
ok = 0
failed = 0

for i in range(total):
    trace_id = new_trace_id()
    with task_context(logger, trace_id):
        try:
            log_with_extra(logger, &quot;job start&quot;, job_index=i)
            do_one_job(i)
            log_with_extra(logger, &quot;job success&quot;, job_index=i)
            ok += 1
        except Exception:
            failed += 1
            # console 显示简洁,file 自动带堆栈
            logger.exception(&quot;job failed&quot;, extra={&quot;run_id&quot;: bundle.run_id, &quot;trace_id&quot;: trace_id})

logger.info(
    f&quot;run finished total={total} ok={ok} failed={failed}&quot;,
    extra={&quot;run_id&quot;: bundle.run_id, &quot;trace_id&quot;: &quot;-&quot;}
)
print(f&quot;\n日志已写入:{bundle.log_file}&quot;)

if name == "main": main()


你用它能解决什么痛点

  1. 定位某次运行:用 run_id 精确查一轮任务
  2. 定位某个任务:用 trace_id 找这个任务的完整生命周期
  3. 可回放:JSONL 每行一个事件,后面做统计/可视化很方便
  4. 可发给客户/团队:日志干净统一,不用截图控制台
  5. 与任务队列结合:你上一篇的队列框架里,worker 直接注入 trace_id 就行

评论 0