Danh sách bài viết

Bài 67: Scale WebSocket Multi-instance — Pub/Sub Backplane

Khi chạy nhiều instance backend, mỗi WebSocket client chỉ connect tới một instance cụ thể. Broadcast dùng in-memory sẽ chỉ tới client của instance đó, không lan sang instance khác. Bài này phân tích bài toán, xây backplane với Redis Pub/Sub, code cụ thể bằng Socket.IO Redis adapter (Node.js) và FastAPI asyncio (Python), thiết kế channel theo global/room/user, routing direct message tới đúng user, duy trì user→instance map với TTL heartbeat, và các failure scenario cần xử lý.

01/06/2026
0 lượt xem
1

Mục Tiêu Bài Học

  • Giải thích được tại sao in-memory broadcast thất bại khi backend chạy nhiều instance.
  • Mô tả kiến trúc Pub/Sub backplane: mỗi instance publish và subscribe cùng một Redis channel, Redis là cầu nối giữa các instance.
  • Viết được code Socket.IO với @socket.io/redis-adapter để handle backplane tự động.
  • Viết được code FastAPI asyncio subscriber nghe Redis channel và broadcast tới local WebSocket connections.
  • Thiết kế channel theo ba cấp: global broadcast, per-room, per-user direct message.
  • Implement user → instance map với TTL heartbeat và cleanup khi disconnect.
  • Nhận diện các failure scenario (instance crash, Redis drop, ghost connection) và cách xử lý.
  • Phân biệt khi nào kết hợp Pub/Sub + Streams (realtime + replay).
2

Bài Toán: Multi-instance & WebSocket

WebSocket là long-lived connection: client kết nối và giữ kênh liên tục, không phải request-response như HTTP. Điều này tạo ra vấn đề khi scale horizontal.

Xét kịch bản đơn giản nhất:

  • Load balancer phân phối request ngẫu nhiên (round-robin hoặc least-connections).
  • User A connect → load balancer → Instance 1.
  • User B connect → load balancer → Instance 2.
  • User A gửi message tới User B → Instance 1 nhận message.
  • Instance 1 cần đẩy message tới User B, nhưng User B đang ở Instance 2.
  • Instance 1 không biết Instance 2 tồn tại — chúng là hai process độc lập, không share memory.

Đây là bài toán inter-instance communication: các instance cần một kênh để đồng bộ dữ liệu realtime với nhau. Redis Pub/Sub là lựa chọn phổ biến cho vai trò này — được gọi là backplane (tầng trung gian nối các instance).

User A ──► Instance 1 ──► PUBLISH "chat:global" ──► Redis
                                                          │
                                              ┌───────────┴───────────┐
                                              ▼                       ▼
                                         Instance 1             Instance 2
                                       (broadcast local)    (broadcast local)
                                              │                       │
                                           User A                  User B
3

Vì Sao In-memory Broadcast Sai

Code sau hoạt động tốt khi chỉ có một instance:

# Single-instance: OK
connected_clients = set()  # in-memory, lưu websocket objects

async def on_message(sender_ws, message):
    for client in connected_clients:
        await client.send_text(message)
    # Broadcast tới mọi client đang kết nối

Vấn đề xảy ra khi scale lên nhiều instance:

# Multi-instance: SAI
# Instance 1: connected_clients = {ws_userA}
# Instance 2: connected_clients = {ws_userB}
#
# Instance 1 nhận message từ User A:
for client in connected_clients:
    await client.send_text(message)
# → Chỉ gửi tới ws_userA (local).
# → ws_userB ở Instance 2 không nhận được gì.
# → User B hoàn toàn bị bỏ qua.

Đây không phải lỗi code, mà là giới hạn kiến trúc: connected_clients là in-memory state riêng của mỗi process. Hai process không share state trừ khi dùng một lớp lưu trữ chung bên ngoài.

4

Pattern Pub/Sub Backplane

Thay vì broadcast trực tiếp từ in-memory, mỗi instance publish message vào một Redis channel. Mọi instance đều subscribe channel đó. Khi nhận được message qua subscription, mỗi instance broadcast tới local clients của mình.

Luồng:
1. User A gửi message tới Instance 1
2. Instance 1 → PUBLISH "chat:global" "{message}"
3. Redis gửi message tới mọi subscriber của "chat:global"
4. Instance 1 subscriber nhận → broadcast tới local clients (User A)
5. Instance 2 subscriber nhận → broadcast tới local clients (User B)
→ Cả User A và User B đều nhận message

Mỗi instance duy trì hai thành phần riêng biệt:

  • Local connection map: connection_id → websocket, chỉ chứa client đang kết nối với instance đó.
  • Redis subscriber: một connection tới Redis dùng chung cho mọi incoming message từ các channel đã subscribe.

Redis connection cho subscriber: 1 connection dùng chung cho cả instance, không phải 1 connection per client. Đây là điểm quan trọng — không tạo N Redis connection cho N WebSocket client.

5

Code Node.js — Socket.IO Redis Adapter

Socket.IO cung cấp @socket.io/redis-adapter (npm) xử lý toàn bộ logic backplane, không cần tự viết publish/subscribe.

npm install socket.io @socket.io/redis-adapter redis
import { createAdapter } from "@socket.io/redis-adapter";
import { Server } from "socket.io";
import { createClient } from "redis";

const io = new Server(3000);

// Cần 2 client riêng biệt: 1 pub, 1 sub
// Redis client ở chế độ subscriber không thể gửi lệnh thông thường
const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

// Gán adapter — Socket.IO tự handle backplane từ đây
io.adapter(createAdapter(pubClient, subClient));

io.on("connection", (socket) => {
  console.log("Client connected:", socket.id);

  socket.on("chat:send", (msg) => {
    // io.emit() tự broadcast qua backplane tới mọi instance
    io.emit("chat:broadcast", { from: socket.id, text: msg });
  });

  socket.on("room:join", (room) => {
    socket.join(room);
  });

  socket.on("room:send", ({ room, msg }) => {
    // io.to(room).emit() cũng được route đúng qua backplane
    io.to(room).emit("room:broadcast", { from: socket.id, text: msg });
  });
});

Adapter dùng hai client vì một Redis connection ở chế độ SUBSCRIBE bị block — không thể gửi lệnh khác. Một client để publish (pubClient), một client để listen (subClient).

Với Socket.IO adapter, không cần thay đổi logic application code. io.emit(), io.to(room).emit(), socket.to(userId).emit() đều được adapter tự route qua backplane.

6

Code Python — FastAPI + Redis Pub/Sub

Khi không dùng Socket.IO, cần tự implement subscriber loop. FastAPI với asyncio là lựa chọn phổ biến phía Python.

pip install fastapi uvicorn redis[asyncio]
import asyncio
import json
import uuid
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as aioredis

app = FastAPI()

# Local connection map: chỉ chứa client của instance này
connections: dict[str, WebSocket] = {}

# Redis client dùng cho publish (thông thường)
redis_pub = aioredis.from_url("redis://localhost:6379", decode_responses=True)


async def redis_subscriber():
    """Chạy background: lắng nghe channel và broadcast tới local clients."""
    r = aioredis.from_url("redis://localhost:6379")
    pubsub = r.pubsub()
    await pubsub.subscribe("chat:global")

    async for msg in pubsub.listen():
        if msg["type"] != "message":
            continue
        text = msg["data"].decode() if isinstance(msg["data"], bytes) else msg["data"]
        # Broadcast tới mọi WebSocket đang kết nối với instance này
        dead = []
        for conn_id, ws in connections.items():
            try:
                await ws.send_text(text)
            except Exception:
                dead.append(conn_id)
        for conn_id in dead:
            connections.pop(conn_id, None)


@app.on_event("startup")
async def startup():
    asyncio.create_task(redis_subscriber())


@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()
    conn_id = str(uuid.uuid4())
    connections[conn_id] = websocket
    try:
        while True:
            data = await websocket.receive_text()
            payload = json.dumps({"conn": conn_id, "text": data})
            # Publish tới Redis → lan tới mọi instance
            await redis_pub.publish("chat:global", payload)
    except WebSocketDisconnect:
        connections.pop(conn_id, None)

Điểm cần chú ý trong đoạn code trên:

  • redis_subscriber() chạy như một background task, dùng connection riêng ở chế độ pubsub.
  • redis_pub là connection thông thường, chỉ dùng để PUBLISH.
  • Vòng lặp subscriber xử lý các dead connection bằng cách collect và xóa sau — tránh modify dict trong vòng lặp.
  • Message được publish dưới dạng JSON string. Instance nhận được parse lại từ string.
7

Channel Design: Global, Room, User

Cấu trúc channel ảnh hưởng trực tiếp đến số lượng message mỗi instance phải xử lý. Ba mức thiết kế phổ biến:

Mức Channel Subscriber Dùng khi
Global chat:global Mọi instance Broadcast toàn server, thông báo hệ thống
Per-room chat:room:{room_id} Instance có user trong room đó Chat room, kênh thảo luận nhóm
Per-user notif:user:{user_id} Instance hosting user đó Direct message, thông báo cá nhân

Global channel: đơn giản nhất, mọi instance subscribe. Mọi message đều đến mọi instance kể cả instance không có client liên quan. Phù hợp cho broadcast tần suất thấp.

Per-room channel: instance chỉ subscribe khi có ít nhất một local client đang ở room đó. Khi user cuối cùng của room rời đi, instance unsubscribe. Giảm đáng kể số message mỗi instance phải xử lý trong hệ thống có nhiều room.

# Dynamic subscribe/unsubscribe theo room
async def user_join_room(user_id: str, room_id: str, pubsub):
    channel = f"chat:room:{room_id}"
    local_room_users[room_id].add(user_id)
    if len(local_room_users[room_id]) == 1:
        # User đầu tiên của room này trên instance → subscribe
        await pubsub.subscribe(channel)

async def user_leave_room(user_id: str, room_id: str, pubsub):
    channel = f"chat:room:{room_id}"
    local_room_users[room_id].discard(user_id)
    if len(local_room_users[room_id]) == 0:
        # Không còn ai → unsubscribe
        await pubsub.unsubscribe(channel)

Per-user channel: chỉ instance đang host user đó subscribe. Phù hợp cho direct message và notification. Khi user connect, instance subscribe notif:user:{user_id}. Khi disconnect, unsubscribe.

8

Routing Direct Message

Khi User A muốn gửi tin nhắn riêng tới User B, có hai cách xử lý:

Option A — Broadcast + filter local

# Publisher (Instance 1):
payload = json.dumps({"to": user_b_id, "from": user_a_id, "msg": "Hello"})
await redis.publish("dm:all", payload)

# Subscriber trên mỗi instance:
async for msg in pubsub.listen():
    if msg["type"] != "message":
        continue
    data = json.loads(msg["data"])
    recipient_id = data["to"]
    if recipient_id in local_connections:
        ws = local_connections[recipient_id]
        await ws.send_text(json.dumps(data))

Option A đơn giản, không cần biết user ở instance nào. Nhược điểm: mọi instance đều nhận message dù không liên quan — overhead tăng tuyến tính theo số instance.

Option B — Lookup instance, publish channel riêng

# Tìm instance đang host User B:
instance_id = await redis.hget("user:host", user_b_id)

if instance_id:
    # Publish tới channel riêng của instance đó
    channel = f"instance:{instance_id}"
    payload = json.dumps({"to": user_b_id, "from": user_a_id, "msg": "Hello"})
    await redis.publish(channel, payload)
else:
    # User B offline → lưu vào DB/Stream để sau này đọc lại
    await redis.xadd(f"inbox:{user_b_id}", {"msg": payload})

Option B hiệu quả hơn — chỉ instance có User B nhận message. Nhưng cần duy trì user:host map (xem mục 9) và xử lý trường hợp user offline.

Trong thực tế, hầu hết hệ thống dùng per-user channel (mục 7) thay vì Option B: mỗi user có channel riêng notif:user:{id}, instance hosting user đó subscribe. Publisher không cần biết user ở instance nào, chỉ cần publish đúng channel.

9

User → Instance Map với TTL Heartbeat

Nếu cần biết user đang ở instance nào (Option B hoặc load-aware routing), cần duy trì một map trong Redis:

INSTANCE_ID = os.environ.get("INSTANCE_ID", "instance-1")
HEARTBEAT_INTERVAL = 30   # seconds
USER_HOST_TTL = 60        # seconds — phải > HEARTBEAT_INTERVAL

async def on_user_connect(user_id: str):
    # Ghi vào hash map: user → instance
    await redis.hset("user:host", user_id, INSTANCE_ID)
    # Đặt TTL riêng cho key của user này (dùng string key phụ)
    await redis.set(f"user:alive:{user_id}", "1", ex=USER_HOST_TTL)

async def on_user_disconnect(user_id: str):
    await redis.hdel("user:host", user_id)
    await redis.delete(f"user:alive:{user_id}")

async def heartbeat_loop(user_id: str):
    """Chạy background khi user connect, refresh TTL định kỳ."""
    while user_id in local_connections:
        await redis.set(f"user:alive:{user_id}", "1", ex=USER_HOST_TTL)
        await asyncio.sleep(HEARTBEAT_INTERVAL)

Cách hoạt động của TTL heartbeat:

  • Khi user connect bình thường: on_user_connect ghi map, heartbeat_loop refresh TTL mỗi 30s.
  • Khi user disconnect bình thường: on_user_disconnect xóa ngay.
  • Khi connection drop đột ngột (mất mạng, instance crash): không có on_user_disconnect — TTL 60s sẽ tự expire. Ghost entry bị dọn sạch sau tối đa 60s.

Một vấn đề với HSET "user:host" là toàn bộ hash không có TTL per-field. Cách phổ biến là dùng hash cho lookup nhanh và string key phụ (user:alive:{id}) để track liveness. Khi TTL expire thì dùng keyspace notification (bài 66) để trigger cleanup HDEL.

10

Connection Limits & Scaling Tips

Một instance async (Node.js, asyncio Python) có thể xử lý 10k–50k WebSocket connection tùy memory và OS limits. Các tham số cần điều chỉnh khi scale:

# Linux: tăng giới hạn file descriptor (mỗi socket = 1 fd)
# /etc/sysctl.conf
net.core.somaxconn = 65535
fs.file-max = 1000000

# /etc/security/limits.conf
* soft nofile 100000
* hard nofile 100000

Redis Pub/Sub connection per instance: chỉ cần 1 subscriber connection dù có bao nhiêu WebSocket client. Đây là điểm khác biệt quan trọng — Redis không phải bottleneck ở đây.

Horizontal scaling với Kubernetes:

# HPA scale theo connection count (custom metric)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: websocket-gateway
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: websocket-gateway
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Pods
    pods:
      metric:
        name: websocket_connections_total
      target:
        type: AverageValue
        averageValue: "10000"  # scale khi avg vượt 10k conn/pod
11

Failure Scenarios

Ba kịch bản lỗi thường gặp và hành vi của hệ thống:

Instance crash

Mọi WebSocket connection tới instance đó bị đứt. Client phát hiện qua heartbeat timeout (thường 30s) và tự reconnect. Load balancer route tới instance khác còn sống. Vấn đề: ghost entry trong user:host map — TTL heartbeat dọn sạch sau 60s như mô tả ở mục 9.

Redis connection drop

Subscriber connection bị ngắt → instance không nhận được message từ backplane trong thời gian đó. Message publish trong khoảng này bị mất (Pub/Sub không có persistence). Client không biết có message bị miss. Giải pháp: kết hợp Pub/Sub với Streams (mục 12) để client có thể replay khi reconnect.

Redis client library (redis-py, ioredis) thường có auto-reconnect. Cần đảm bảo subscriber loop restart sau khi reconnect:

async def robust_subscriber():
    while True:
        try:
            r = aioredis.from_url("redis://localhost:6379")
            pubsub = r.pubsub()
            await pubsub.subscribe("chat:global")
            async for msg in pubsub.listen():
                if msg["type"] == "message":
                    await broadcast_to_local(msg["data"])
        except Exception as e:
            print(f"Subscriber error: {e}, reconnecting in 2s")
            await asyncio.sleep(2)

Network blip phía client

WebSocket client mất kết nối tạm thời rồi reconnect. Server-side: connection cũ vẫn trong connections dict cho tới khi lần gửi tiếp theo gặp lỗi. Dead connection cleanup: collect error khi send và xóa (đã có trong code mục 6).

12

Kết Hợp Pub/Sub + Streams

Pub/Sub là realtime — message đến ngay nhưng không lưu lại. Streams (bài 60–64) lưu message với ID có thể replay. Kết hợp hai cơ chế giải quyết bài toán user reconnect:

async def send_chat_message(room_id: str, user_id: str, text: str):
    payload = json.dumps({"user": user_id, "text": text, "ts": time.time()})
    stream_key = f"stream:room:{room_id}"
    channel = f"chat:room:{room_id}"

    # XADD vào Stream — lưu persistent, có thể replay
    msg_id = await redis.xadd(stream_key, {"data": payload}, maxlen=1000)

    # PUBLISH vào Pub/Sub — đẩy realtime tới user đang online
    await redis.publish(channel, json.dumps({"id": msg_id, "data": payload}))

Khi user reconnect sau khi mất kết nối, client gửi ID của message cuối cùng đã nhận:

@app.websocket("/ws/{room_id}")
async def ws_with_replay(websocket: WebSocket, room_id: str, last_id: str = "0"):
    await websocket.accept()

    # Replay message bị miss trong khi offline
    stream_key = f"stream:room:{room_id}"
    missed = await redis.xrange(stream_key, min=last_id, count=100)
    for msg_id, fields in missed:
        await websocket.send_text(fields["data"])

    # Tiếp tục nhận realtime qua Pub/Sub (đã có subscriber background task)

Lưu ý race condition nhỏ: giữa lúc XRANGE và lúc subscriber bắt đầu nhận, có thể có message trùng. Client cần deduplicate theo msg_id.

13

Sticky Session vs Stateless

Có hai cách cấu hình load balancer cho WebSocket:

Sticky session Stateless (random)
Cơ chế LB pin user về cùng instance (IP hash hoặc cookie) LB phân phối ngẫu nhiên
Ưu điểm Đơn giản, không cần backplane nếu không cần cross-instance broadcast Phân phối đều hơn, không lo hotspot một instance
Nhược điểm Instance chết → mọi user được pin sẽ mất connection, LB phải re-route, failover phức tạp Bắt buộc cần backplane cho cross-instance broadcast
Phù hợp Hệ thống nhỏ, không muốn thêm Redis backplane Production scale, K8s rolling update, multi-AZ

Stateless với backplane được khuyến nghị cho hệ thống cần rolling update và horizontal scale. Khi instance được replace (K8s pod restart), user chỉ đơn giản reconnect tới instance mới, không cần LB biết về sticky state.

14

Anti-patterns & Best Practice

Anti-patterns

  • In-memory broadcast multi-instance: vấn đề cốt lõi bài này. Chỉ đúng khi single instance.
  • 1 Redis connection per WebSocket client: với 10k client sẽ có 10k Redis connection — Redis và OS đều không chịu được. Dùng 1 subscriber connection dùng chung.
  • Channel quá granular: chat:user:{a}:friend:{b} tạo quá nhiều channel riêng lẻ, khó manage subscription và cleanup. Dùng per-user channel (notif:user:{id}) là đủ.
  • Không cleanup ghost connection: dead WebSocket trong connections dict sẽ tích tụ và gây memory leak. Cleanup khi gặp send error hoặc disconnect event.
  • Không cleanup user:host map: instance crash mà không dọn ghost entry — các instance khác publish tới user "offline" theo Option B mà không biết. TTL heartbeat là cơ chế an toàn.
  • Pub/Sub mà không có Streams cho critical message: nếu cần đảm bảo message không bị miss khi user reconnect, phải kết hợp Streams. Pub/Sub alone không đủ.

Best practice

  • Dùng Pub/Sub backplane cho realtime broadcast — đơn giản, latency thấp (<1ms Redis RTT).
  • Thiết kế channel theo tầng: global cho broadcast toàn server, per-room cho nhóm, per-user cho direct message.
  • Dynamic subscribe/unsubscribe per-room để tránh broadcast không cần thiết.
  • User → instance map với TTL heartbeat để ghost cleanup tự động.
  • Kết hợp Streams cho message quan trọng cần replay khi reconnect.
  • Async framework (Node.js event loop, Python asyncio): 1 instance xử lý hàng chục nghìn connection.
  • Stateless với backplane cho production — dễ scale, dễ rolling update hơn sticky session.
15

Tổng Kết & Quiz

Pub/Sub backplane giải quyết bài toán inter-instance communication cho WebSocket: mỗi instance publish message vào Redis channel, mọi instance subscribe và broadcast tới local clients của mình. Redis là lớp trung gian stateless — instance không cần biết về nhau, chỉ cần biết về channel.

Các điểm cốt lõi:

  • In-memory broadcast đúng với single instance, sai với multi-instance.
  • Pub/Sub backplane: 1 subscriber connection per instance, không phải per client.
  • Channel design ba tầng: global / per-room / per-user.
  • User → instance map với TTL heartbeat cho ghost cleanup.
  • Pub/Sub + Streams: realtime delivery + replay khi reconnect.

Quiz

  1. Backend có 3 instance. User A ở instance 1 broadcast một message. Nếu không có backplane, user nào ở instance 2 và 3 sẽ nhận được message? Tại sao?
  2. Vì sao cần hai Redis client riêng biệt (pubClientsubClient) trong Socket.IO Redis adapter?
  3. Mô tả sự khác biệt giữa global channel và per-room channel về số message mỗi instance phải xử lý trong một hệ thống có 1000 room, mỗi room 10 user, tổng 5 instance.
  4. User disconnect đột ngột (mất điện). on_user_disconnect không chạy. Cơ chế nào đảm bảo user:host map cuối cùng được cleanup?
  5. Tại sao kết hợp Pub/Sub + Streams tốt hơn chỉ dùng Pub/Sub cho chat application khi cần đảm bảo user reconnect không bị miss message?

Đáp án gợi ý

  1. Không ai ở instance 2 và 3 nhận được. Vì mỗi instance chỉ biết connected_clients local của mình. Instance 1 broadcast trong vòng lặp local, không có cơ chế gửi tới process khác.
  2. Redis connection ở chế độ SUBSCRIBE bị block — chỉ có thể nhận notification, không thể gửi lệnh khác như PUBLISH. Cần hai connection riêng: một cho publish, một cho subscribe.
  3. Global channel: mọi message đến cả 5 instance, kể cả instance không có user liên quan. Per-room: instance chỉ nhận message từ room có local user. Nếu mỗi room phân bố đều → mỗi instance xử lý ~200 room × traffic, nhưng chỉ nhận message từ room đang có user thay vì tất cả 1000 room.
  4. TTL heartbeat: background task refresh TTL của user:alive:{user_id} mỗi 30s. Khi user không còn online, không có gì refresh TTL → sau 60s key expire → keyspace notification trigger cleanup HDEL user:host {user_id}.
  5. Pub/Sub fire-and-forget: message publish khi user offline bị mất, không có cách lấy lại. Streams lưu message với ID có thể query (XRANGE). Khi user reconnect, gửi ID cuối cùng đã nhận → XRANGE trả về message bị miss → không bị gap trong lịch sử chat.

Bài tiếp theo

Bài 68 đi vào Sharded Pub/Sub (Redis 7): cách chia channel ra nhiều shard trong Redis Cluster để tránh bottleneck khi số lượng subscriber và message rất lớn.

Tham khảo