零拷贝 + 异步 I/O:把 Python 的 I/O 跑满网卡

零拷贝 + 异步 I/O:把 Python 的 I/O 跑满网卡

在高吞吐场景里,Python 的瓶颈常在拷贝次数上下文切换。这篇用可落地的小技巧,从对象模型与事件循环两头减压。

1) 零拷贝三件套:memoryview / bytearray / mmap

  • memoryview 对现有缓冲区做“视图”,子切片仍零拷贝。
  • bytearray 可原地修改,避免新分配。
  • mmap 让文件直接映射进内存,减少用户态/内核态来回搬运。
import mmap, os

with open("data.bin", "r+b") as f:
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE)
    mv = memoryview(mm)
    header = mv[:64]      # 仍零拷贝
    header[:4] = b"GPAL"  # 原地写
    mm.flush()

2) sendfile 与内核直传

内核支持时,用 socket.sendfile() 直接从文件描述符到网卡,绕过用户态缓冲区。

import socket

with socket.create_connection(("127.0.0.1", 9000)) as s, open("big.iso","rb") as f:
    s.sendfile(f)  # 内核态直传

3) asyncio 背压与流控

异步不是银弹,关键是背压——写缓冲快满就暂停生产者,避免 OOM 与抖动。

import asyncio

async def pump(reader, writer):
    try:
        while chunk := await reader.read(64 * 1024):
            writer.write(chunk)
            if writer.transport.is_closing():
                break
            await writer.drain()  # 背压点
    finally:
        await writer.drain()
        writer.close()
        await writer.wait_closed()

4) struct.pack_into:少分配、更亲 cache

相比 struct.pack() 返回新对象,pack_into() 直接写入已有缓冲区。

import struct
buf = bytearray(32)
struct.pack_into("!IIf", buf, 0, 0xDEADBEEF, 42, 3.14)  # 原地写入

5) 降低字节码开销的小手段

  • 绑定局部:把热路径的属性/方法先绑定到局部变量,少一次查找。
  • 避免临时对象:拼接用 bytearray.extend,列表构建用 .append 聚合后再一次性处理。
  • 批量系统调用:网络读写遵循“更大块、更少次”。
def hot_loop(items, out_extend=bytearray.extend):
    out = bytearray()
    append = out.append
    for b in items:  # b: int 0..255
        append(b ^ 0x5A)
    return out

6) 协程池与并发度

CPU 密集转向 multiprocessingconcurrent.futures.ProcessPoolExecutor;I/O 密集用 asyncio + 合理的并发度(例如同时活跃连接数 * 每连接窗口大小 ≈ 网卡带宽 / 单连接吞吐)。

import asyncio

SEM = asyncio.Semaphore(200)  # 根据 NIC/RTT 调参

async def bounded(coro):
    async with SEM:
        return await coro

7) 底层观测与基准

  • tracemalloc 看分配热点,dis 看字节码,perf_counter_ns() 做微基准。
  • tcp_nodelay/SO_SNDBUF/SO_RCVBUF 配合服务器端设置,配齐窗口。
import dis, time
from time import perf_counter_ns

def bench(fn, *a, n=10000):
    t0 = perf_counter_ns()
    for _ in range(n): fn(*a)
    return (perf_counter_ns()-t0)/n

dis.dis(hot_loop)
print("ns/op:", bench(lambda: hot_loop(range(256))))

8) 快速路线图(可直接套用)

  1. 文件传输:优先 sendfile;不行再 asyncio + writer.drain()
  2. 文本/二进制处理:bytearray + memoryview + pack_into
  3. 大文件:mmap 分块视图 + 异步写。
  4. 观测→定位→替换:从 distracemalloc 找到热点,再做最小替换,避免“过度工程化”。

结语:Python 的上限常被“拷贝”和“阻塞”束缚。把数据留在内核或原地缓冲里,再用可观测的背压把节奏稳住,你就能在不动 C 扩展的前提下,把吞吐稳定拉到硬件上限的合理区间。

评论 0