用 Python 写一个“接口限流 + 并发控制 + 自动重试”的请求器(适合批量跑 API)

用 Python 写一个“接口限流 + 并发控制 + 自动重试”的请求器(适合批量跑 API)

做批量脚本时,最常见的“隐形杀手”不是代码逻辑,而是接口策略:限流、偶发 5xx、网络抖动、连接超时、短时间突发并发导致封禁。很多人一开始用 requests 循环跑,后来越改越乱:到处是 sleep、到处是 try/except、重试策略也不统一。

这篇我给你写一个可直接复用的“请求器”脚手架,核心能力:

  • 并发控制(同时跑多少个请求)
  • 限流(每秒最多多少请求,避免触发风控)
  • 失败重试(指数退避 + 抖动)
  • 超时控制
  • 统一返回结构(成功/失败可追踪)

我用 aiohttp(异步 HTTP)实现,适合你这种“批量发布 / 批量导入 / 爬列表 / 调 API”的场景。Python 3.10+ 可运行。


1)安装依赖

pip install aiohttp


2)核心:令牌桶限流器(Token Bucket)

令牌桶的好处是:可以平滑控制速率,并允许短暂突发(burst),比简单 sleep 更稳定。

import asyncio
import time

class TokenBucket: def init(self, rate: float, capacity: int): """ rate: 每秒生成多少令牌(例如 5 表示 5 rps) capacity: 桶容量(允许突发的最大请求数) """ self.rate = rate self.capacity = capacity self.tokens = capacity self.updated_at = time.monotonic() self.lock = asyncio.Lock()

async def acquire(self, cost: int = 1) -> None:
    async with self.lock:
        while True:
            now = time.monotonic()
            elapsed = now - self.updated_at
            # 补充令牌
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.updated_at = now

            if self.tokens >= cost:
                self.tokens -= cost
                return

            # 不够令牌就等一会儿
            need = (cost - self.tokens) / self.rate
            await asyncio.sleep(max(0.01, need))</code></pre>


3)重试策略:指数退避 + 抖动

import random

def backoff_delay(attempt: int, base: float = 0.5, cap: float = 12.0) -&gt; float: raw = min(cap, base * (2 ** (attempt - 1))) jitter = random.uniform(0, raw * 0.25) return raw + jitter


4)统一返回结构:成功/失败都可追踪

from dataclasses import dataclass
from typing import Any, Optional

@dataclass class FetchResult: ok: bool status: int data: Any = None error: str = "" attempts: int = 0 url: str = ""


5)核心请求器:并发 + 限流 + 重试 + 超时

import aiohttp
from typing import Dict, Optional

class SmartFetcher: def init( self, concurrency: int = 8, rps: float = 5.0, burst: int = 10, timeout_sec: float = 20.0, max_attempts: int = 5, ): self.sem = asyncio.Semaphore(concurrency) self.bucket = TokenBucket(rate=rps, capacity=burst) self.timeout = aiohttp.ClientTimeout(total=timeout_sec) self.max_attempts = max_attempts

async def fetch_json(
    self,
    session: aiohttp.ClientSession,
    method: str,
    url: str,
    headers: Optional[Dict[str, str]] = None,
    json_body: Optional[Dict[str, Any]] = None,
) -&amp;gt; FetchResult:
    attempts = 0
    last_err = &quot;&quot;

    while attempts &amp;lt; self.max_attempts:
        attempts += 1
        await self.bucket.acquire(1)

        async with self.sem:
            try:
                async with session.request(
                    method=method,
                    url=url,
                    headers=headers,
                    json=json_body,
                    timeout=self.timeout,
                ) as resp:
                    status = resp.status

                    # 常见策略:429/5xx 需要重试
                    if status == 429 or 500 &amp;lt;= status &amp;lt;= 599:
                        text = await resp.text()
                        last_err = f&quot;retryable status={status} body={text[:200]}&quot;
                        await asyncio.sleep(backoff_delay(attempts))
                        continue

                    # 其它状态:尝试解析 JSON,不行就读文本
                    try:
                        data = await resp.json(content_type=None)
                    except Exception:
                        data = await resp.text()

                    if 200 &amp;lt;= status &amp;lt;= 299:
                        return FetchResult(True, status, data=data, attempts=attempts, url=url)

                    # 非重试型错误(如 400/401/403/404),直接返回
                    return FetchResult(False, status, data=data, error=&quot;non-retryable http error&quot;,
                                      attempts=attempts, url=url)

            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                last_err = f&quot;{type(e).__name__}: {e}&quot;
                await asyncio.sleep(backoff_delay(attempts))
                continue
            except Exception as e:
                last_err = f&quot;Unexpected: {type(e).__name__}: {e}&quot;
                await asyncio.sleep(backoff_delay(attempts))
                continue

    return FetchResult(False, 0, error=last_err or &quot;exhausted retries&quot;, attempts=attempts, url=url)</code></pre>


6)示例:批量 GET 请求(可替换成你的接口)

async def run_demo():
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/status/500",  # 模拟 5xx
        "https://httpbin.org/status/429",  # 模拟限流
        "https://httpbin.org/delay/2",
    ] * 5  # 扩大任务量演示

fetcher = SmartFetcher(concurrency=6, rps=4.0, burst=6, timeout_sec=10, max_attempts=5)

async with aiohttp.ClientSession() as session:
    tasks = [fetcher.fetch_json(session, &quot;GET&quot;, u) for u in urls]
    results = await asyncio.gather(*tasks)

ok = sum(1 for r in results if r.ok)
fail = len(results) - ok
print(f&quot;done: ok={ok} fail={fail}&quot;)

# 打印部分失败原因
for r in results:
    if not r.ok:
        print(f&quot;[FAIL] url={r.url} attempts={r.attempts} status={r.status} err={r.error}&quot;)

if name == "main": asyncio.run(run_demo())


7)如何接入你的真实业务(比如批量发布到 WordPress/Woo)

你只需要把 urlheadersjson_body 换成你的接口参数即可,例如:

  • POST /wp-json/wp/v2/posts 发布文章
  • POST /wc/v3/products 上传产品
  • GET /wp-json/wp/v2/product?per_page=100&amp;page=1 拉列表

并且你可以按业务把“重试条件”改得更严格,比如对 408/409 也重试,或对特定错误码不重试。


你还可以加的“实战增强点”(我也能帮你加)

  1. 自动读取 Retry-After(429 时按服务器要求等待)
  2. 代理池(proxy rotation)
  3. 请求签名 / nonce(需要鉴权的接口)
  4. 并发分组(不同 endpoint 不同限流)
  5. 请求日志 + trace_id(和上一篇日志系统结合)

评论 0