用 Python 写一个“接口限流 + 并发控制 + 自动重试”的请求器(适合批量跑 API)
用 Python 写一个“接口限流 + 并发控制 + 自动重试”的请求器(适合批量跑 API)
做批量脚本时,最常见的“隐形杀手”不是代码逻辑,而是接口策略:限流、偶发 5xx、网络抖动、连接超时、短时间突发并发导致封禁。很多人一开始用 requests 循环跑,后来越改越乱:到处是 sleep、到处是 try/except、重试策略也不统一。
这篇我给你写一个可直接复用的“请求器”脚手架,核心能力:
- 并发控制(同时跑多少个请求)
- 限流(每秒最多多少请求,避免触发风控)
- 失败重试(指数退避 + 抖动)
- 超时控制
- 统一返回结构(成功/失败可追踪)
我用 aiohttp(异步 HTTP)实现,适合你这种“批量发布 / 批量导入 / 爬列表 / 调 API”的场景。Python 3.10+ 可运行。
1)安装依赖
pip install aiohttp
2)核心:令牌桶限流器(Token Bucket)
令牌桶的好处是:可以平滑控制速率,并允许短暂突发(burst),比简单 sleep 更稳定。
import asyncio
import time
class TokenBucket:
def init(self, rate: float, capacity: int):
"""
rate: 每秒生成多少令牌(例如 5 表示 5 rps)
capacity: 桶容量(允许突发的最大请求数)
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.updated_at = time.monotonic()
self.lock = asyncio.Lock()
async def acquire(self, cost: int = 1) -> None:
async with self.lock:
while True:
now = time.monotonic()
elapsed = now - self.updated_at
# 补充令牌
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.updated_at = now
if self.tokens >= cost:
self.tokens -= cost
return
# 不够令牌就等一会儿
need = (cost - self.tokens) / self.rate
await asyncio.sleep(max(0.01, need))</code></pre>
3)重试策略:指数退避 + 抖动
import random
def backoff_delay(attempt: int, base: float = 0.5, cap: float = 12.0) -> float:
raw = min(cap, base * (2 ** (attempt - 1)))
jitter = random.uniform(0, raw * 0.25)
return raw + jitter
4)统一返回结构:成功/失败都可追踪
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class FetchResult:
ok: bool
status: int
data: Any = None
error: str = ""
attempts: int = 0
url: str = ""
5)核心请求器:并发 + 限流 + 重试 + 超时
import aiohttp
from typing import Dict, Optional
class SmartFetcher:
def init(
self,
concurrency: int = 8,
rps: float = 5.0,
burst: int = 10,
timeout_sec: float = 20.0,
max_attempts: int = 5,
):
self.sem = asyncio.Semaphore(concurrency)
self.bucket = TokenBucket(rate=rps, capacity=burst)
self.timeout = aiohttp.ClientTimeout(total=timeout_sec)
self.max_attempts = max_attempts
async def fetch_json(
self,
session: aiohttp.ClientSession,
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
json_body: Optional[Dict[str, Any]] = None,
) -&gt; FetchResult:
attempts = 0
last_err = ""
while attempts &lt; self.max_attempts:
attempts += 1
await self.bucket.acquire(1)
async with self.sem:
try:
async with session.request(
method=method,
url=url,
headers=headers,
json=json_body,
timeout=self.timeout,
) as resp:
status = resp.status
# 常见策略:429/5xx 需要重试
if status == 429 or 500 &lt;= status &lt;= 599:
text = await resp.text()
last_err = f"retryable status={status} body={text[:200]}"
await asyncio.sleep(backoff_delay(attempts))
continue
# 其它状态:尝试解析 JSON,不行就读文本
try:
data = await resp.json(content_type=None)
except Exception:
data = await resp.text()
if 200 &lt;= status &lt;= 299:
return FetchResult(True, status, data=data, attempts=attempts, url=url)
# 非重试型错误(如 400/401/403/404),直接返回
return FetchResult(False, status, data=data, error="non-retryable http error",
attempts=attempts, url=url)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
last_err = f"{type(e).__name__}: {e}"
await asyncio.sleep(backoff_delay(attempts))
continue
except Exception as e:
last_err = f"Unexpected: {type(e).__name__}: {e}"
await asyncio.sleep(backoff_delay(attempts))
continue
return FetchResult(False, 0, error=last_err or "exhausted retries", attempts=attempts, url=url)</code></pre>
6)示例:批量 GET 请求(可替换成你的接口)
async def run_demo():
urls = [
"https://httpbin.org/json",
"https://httpbin.org/status/500", # 模拟 5xx
"https://httpbin.org/status/429", # 模拟限流
"https://httpbin.org/delay/2",
] * 5 # 扩大任务量演示
fetcher = SmartFetcher(concurrency=6, rps=4.0, burst=6, timeout_sec=10, max_attempts=5)
async with aiohttp.ClientSession() as session:
tasks = [fetcher.fetch_json(session, "GET", u) for u in urls]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r.ok)
fail = len(results) - ok
print(f"done: ok={ok} fail={fail}")
# 打印部分失败原因
for r in results:
if not r.ok:
print(f"[FAIL] url={r.url} attempts={r.attempts} status={r.status} err={r.error}")
if name == "main":
asyncio.run(run_demo())
7)如何接入你的真实业务(比如批量发布到 WordPress/Woo)
你只需要把 url、headers、json_body 换成你的接口参数即可,例如:
POST /wp-json/wp/v2/posts 发布文章
POST /wc/v3/products 上传产品
GET /wp-json/wp/v2/product?per_page=100&page=1 拉列表
并且你可以按业务把“重试条件”改得更严格,比如对 408/409 也重试,或对特定错误码不重试。
你还可以加的“实战增强点”(我也能帮你加)
- 自动读取 Retry-After(429 时按服务器要求等待)
- 代理池(proxy rotation)
- 请求签名 / nonce(需要鉴权的接口)
- 并发分组(不同 endpoint 不同限流)
- 请求日志 + trace_id(和上一篇日志系统结合)
评论 0