用 Python 写一个“可复用的任务队列 + 失败重试”小框架(带完整代码)

用 Python 写一个“可复用的任务队列 + 失败重试”小框架(带完整代码)

很多脚本型项目一开始都很简单:循环列表、发请求、写文件。但一旦任务量变大、网络不稳定、接口限流、偶发错误出现,脚本就会慢慢变成“跑一半就挂、挂了又不知道从哪继续”的状态。与其堆很多 if/try,不如用一个清晰的小框架把常见需求一次性解决:

  • 任务队列(生产/消费)
  • 并发执行(限制并发数)
  • 失败重试(指数退避)
  • 任务去重(避免重复跑)
  • 断点续跑(保存状态)
  • 统一日志(便于排障)

下面我用 Python 3.10+ 写一个“轻量级任务队列执行器”,适合爬虫、批量 API 调用、自动发布、批量导入等场景。代码完整可运行,你可以直接复制到 task_runner.py 里测试。


核心思路:把“任务”变成可序列化对象

我们用一个 Task 数据类描述任务,包含唯一 key、payload、重试次数等。任务执行函数只需要关注“怎么做”,队列框架负责“怎么跑得稳”。

from future import annotations
import asyncio
import json
import random
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Set, List

@dataclass class Task: key: str payload: Dict[str, Any] attempts: int = 0 max_attempts: int = 5 last_error: str = ""


状态存储:支持断点续跑

我们把已完成任务 key 存到本地文件。下次运行时,自动跳过已完成任务。

class StateStore:
    def init(self, path: str = "state.json"):
        self.path = Path(path)
        self.done: Set[str] = set()

def load(self) -> None:
    if self.path.exists():
        data = json.loads(self.path.read_text(encoding="utf-8"))
        self.done = set(data.get("done", []))

def save(self) -> None:
    data = {"done": sorted(self.done), "updated_at": int(time.time())}
    self.path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")

def mark_done(self, key: str) -> None:
    self.done.add(key)</code></pre>


重试策略:指数退避 + 抖动(jitter)

指数退避能减少对目标接口的压力;抖动避免并发任务同时重试造成“重试风暴”。

def backoff_delay(attempt: int, base: float = 0.6, cap: float = 10.0) -&gt; float:
    # attempt 从 1 开始更直观
    raw = min(cap, base * (2 ** (attempt - 1)))
    jitter = random.uniform(0, raw * 0.25)
    return raw + jitter


执行器:并发、重试、日志、统计

核心是一个 TaskRunner

  • asyncio.Queue 做任务队列
  • Semaphore 控制并发
  • 失败自动重试,超过次数进入“失败列表”
  • 每成功一个就落盘保存状态
class TaskRunner:
    def init(
        self,
        worker: Callable[[Task], "asyncio.Future[None]"],
        concurrency: int = 5,
        state_path: str = "state.json",
    ):
        self.worker = worker
        self.concurrency = concurrency
        self.state = StateStore(state_path)
        self.queue: asyncio.Queue[Task] = asyncio.Queue()
        self.sem = asyncio.Semaphore(concurrency)

    self.failed: List[Task] = []
    self.total = 0
    self.succeeded = 0
    self.skipped = 0

async def add_tasks(self, tasks: List[Task]) -&amp;gt; None:
    self.state.load()
    for t in tasks:
        self.total += 1
        if t.key in self.state.done:
            self.skipped += 1
            continue
        await self.queue.put(t)

async def _run_one(self, task: Task) -&amp;gt; None:
    async with self.sem:
        while True:
            try:
                await self.worker(task)
                self.state.mark_done(task.key)
                self.state.save()
                self.succeeded += 1
                print(f&quot;[OK] {task.key}&quot;)
                return
            except Exception as e:
                task.attempts += 1
                task.last_error = str(e)
                if task.attempts &amp;gt;= task.max_attempts:
                    self.failed.append(task)
                    print(f&quot;[FAIL] {task.key} attempts={task.attempts} err={task.last_error}&quot;)
                    return
                delay = backoff_delay(task.attempts)
                print(f&quot;[RETRY] {task.key} attempt={task.attempts} wait={delay:.2f}s err={task.last_error}&quot;)
                await asyncio.sleep(delay)

async def run(self) -&amp;gt; None:
    workers = []
    while not self.queue.empty():
        task = await self.queue.get()
        workers.append(asyncio.create_task(self._run_one(task)))

    if workers:
        await asyncio.gather(*workers)

    print(&quot;\n==== SUMMARY ====&quot;)
    print(f&quot;total_input   : {self.total}&quot;)
    print(f&quot;skipped_done  : {self.skipped}&quot;)
    print(f&quot;succeeded     : {self.succeeded}&quot;)
    print(f&quot;failed        : {len(self.failed)}&quot;)

    if self.failed:
        Path(&quot;failed.json&quot;).write_text(
            json.dumps([asdict(t) for t in self.failed], ensure_ascii=False, indent=2),
            encoding=&quot;utf-8&quot;
        )
        print(&quot;failed tasks saved to failed.json&quot;)</code></pre>


示例 worker:模拟请求(你可替换成真实 API 调用)

这里用随机失败来模拟网络波动。你接入真实业务时,把 worker 里的逻辑替换掉即可。

async def demo_worker(task: Task) -&gt; None:
    # 假装发请求、写库、发布文章等
    await asyncio.sleep(random.uniform(0.05, 0.25))

# 20% 概率失败,用于演示重试
if random.random() &amp;lt; 0.2:
    raise RuntimeError(&quot;temporary network error&quot;)

# 示例:打印 payload 的某个字段
_ = task.payload.get(&quot;name&quot;, &quot;&quot;)</code></pre>


主函数:构造任务并运行

def build_tasks(n: int = 30) -&gt; List[Task]:
    tasks = []
    for i in range(n):
        tasks.append(Task(
            key=f"job-{i}",
            payload={"name": f"task-{i}", "index": i},
            max_attempts=5
        ))
    return tasks

async def main(): runner = TaskRunner(worker=demo_worker, concurrency=6, state_path="state.json") await runner.add_tasks(build_tasks(50)) await runner.run()

if name == "main": asyncio.run(main())


你可以怎么用到真实项目里

  • 批量抓取/爬虫:payload 放 url、headers、解析规则
  • 批量发布:payload 放 title、content、tags、目标站点配置
  • 批量导入产品:payload 放 sku、price、download_url、分类映帮助信息
  • 批量回调/同步:payload 放用户 id、订单 id、API endpoint

只要把 demo_worker() 替换成你的真实逻辑,这个框架就能直接跑起来:失败自动重试、成功自动落盘、崩了也能断点续跑。


评论 0