Django论坛实时通知系统:WebSocket、Redis 与 Celery 的协同实践
Django论坛实时通知系统:WebSocket、Redis 与 Celery 的协同实践
在技术社区中,@提及、私信、点赞、系统公告等都需要“低延迟+高可靠”的通知体验。本文给出一套在 django.cn/forum/ 场景可落地的通知系统方案:Django 后端负责鉴权与事件入队,Redis 充当发布/订阅与短期状态存储,Celery 承担异步分发,前端通过 WebSocket/SSE 实现秒级推送。
架构概览
- 事件源:帖子发布、回复、@提及、私信、审核结果变更、系统任务完成等。
- 写路径:业务视图写入事件(轻量化 payload)→ Redis 列表/流 → Celery Worker 消费 → 组装通知对象并持久化 → 通过 Channel Layer 推送。
- 读路径:前端建立 WS/SSE 连接订阅用户频道;离线期间的通知走轮询或首屏补发。
鉴权与会话
- 使用 Channels AuthMiddlewareStack 复用 Django 会话/Token;连接建立时校验用户态与租户/版块权限。
- 每个用户维护
user:{id}:conn_set,限制并发连接数,异常断开时定期清理。
# routing.py
websocket_urlpatterns = [
path("ws/notify/", consumers.NotifyConsumer.as_asgi()),
]
# consumer.py(简化)
class NotifyConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
if self.scope["user"].is_anonymous:
return await self.close()
self.group = f"user:{self.scope['user'].id}"
await self.channel_layer.group_add(self.group, self.channel_name)
await self.accept()
async def notify(self, event):
await self.send_json(event["data"])
事件建模与幂等
- 事件最小化:
type、actor_id、target_id、context、ts。 - 生成幂等键
idempotency_key = sha1(type+actor+target+ts_bucket);Worker 端用 RedisSETNX防重。 - 同类高频事件合并:例如 5 分钟内同一主题的多次点赞聚合为一条“X人点赞”。
Celery 分发与降级
- 任务队列分级:
notify_high(私信/提及)、notify_low(系统广播)。 - 延迟批处理:浏览量、热度信号通过
countdown或 beat 合并写库。 - 降级策略:WS 不可达时回退到 SSE,仍不可达时切换为 长轮询,并在用户下一次请求时批量吐出。
# tasks.py(简化)
@shared_task(queue="notify_high", autoretry_for=(Exception,), retry_backoff=True, max_retries=5)
def fanout_notify(event):
# 幂等
if not redis.setnx(f"idemp:{event['key']}", 1):
return
redis.expire(f"idemp:{event['key']}", 600)
# 落库 + 推送
Notification.objects.create(**event["model"])
async_to_sync(channel_layer.group_send)(
f"user:{event['uid']}",
{"type": "notify", "data": event["payload"]}
)
前端订阅与可靠性
- 首屏附带
last_event_id,WS 建连后请求“补差”; - 前端心跳(30s)+ 服务器超时(60s)检测长连接健康度;
- 断线重连退避策略:
1s → 2s → 5s → 10s上限,并携带新的last_event_id。
指标与告警
- QPS、推送 P95、丢消息率(幂等冲突/重试次数)、WS 在线连接数、回退比例(WS→SSE→轮询)。
- 告警门槛:回退比例 > 20%,或重试耗尽 > 0.5% 即触发。
评论 0