Danh sách bài viết

Bài 65: Pub/Sub Cơ Bản — Fire-and-forget & Cảnh Báo Mất Message

Redis Pub/Sub là cơ chế publish/subscribe đơn giản nhất trong Redis: publisher gửi message tới channel, mọi subscriber đang lắng nghe channel đó nhận được ngay. Không có queue, không có persistence, không có replay. Bài này giải thích cơ chế hoạt động, các lệnh cốt lõi (PUBLISH, SUBSCRIBE, PSUBSCRIBE), hai code mẫu Python cho publisher và subscriber, và điểm quan trọng nhất cần nhớ: Pub/Sub là fire-and-forget — subscriber offline sẽ mất message không có cách nào lấy lại. Cuối bài có bảng so sánh Pub/Sub với Streams để biết khi nào dùng cái nào.

01/06/2026
14 phút đọc
0 lượt xem
1

Mục Tiêu Bài Học

  • Hiểu cơ chế Pub/Sub: channel, publisher, subscriber, many-to-many.
  • Nắm được fire-and-forget là gì và hệ quả: message mất khi subscriber offline.
  • Dùng được PUBLISH, SUBSCRIBE, PSUBSCRIBE trong terminal và Python.
  • Biết subscriber connection model — vì sao phải dùng connection riêng.
  • Phân biệt Pub/Sub với Streams để chọn đúng công cụ.
2

Pub/Sub Là Gì

Pub/Sub (Publish/Subscribe) là mô hình truyền message qua channel (kênh). Có ba thành phần:

  • Publisher: gửi message tới một channel bằng lệnh PUBLISH. Publisher không biết subscriber là ai, có bao nhiêu, hay có ai online hay không.
  • Channel: tên định danh trung gian, ví dụ notifications:user:123. Redis không lưu message vào channel — channel chỉ là địa chỉ routing.
  • Subscriber: đăng ký nhận message từ channel bằng SUBSCRIBE. Khi publisher gửi, Redis đẩy (push) message tới mọi subscriber đang kết nối.

Mô hình là many-to-many: nhiều publisher có thể publish lên cùng một channel, nhiều subscriber có thể lắng nghe cùng một channel. Redis đẩy từng message tới tất cả subscriber — đây là fanout (lan truyền rộng), khác hoàn toàn với queue (chia việc giữa consumer).

Điểm then chốt của toàn bài: Redis không lưu message Pub/Sub ở bất kỳ đâu. Message chỉ tồn tại trong thời điểm publish và chuyển tiếp ngay đến subscriber. Nếu không có subscriber nào online, message biến mất. Đây là lý do Pub/Sub được gọi là fire-and-forget.

3

Lệnh Cốt Lõi: PUBLISH & SUBSCRIBE

PUBLISH

PUBLISH channel:notifications "Hello"
# (integer) 3   ← số subscriber nhận được message này

PUBLISH channel:notifications "{\"user\": 123, \"event\": \"order_placed\"}"
# (integer) 1

Giá trị trả về là số subscriber nhận được message tại thời điểm publish. Nếu trả về 0, không có ai đang lắng nghe — message không được lưu lại.

SUBSCRIBE

# Subscribe một channel
SUBSCRIBE channel:notifications

# Subscribe nhiều channel cùng lúc
SUBSCRIBE channel:a channel:b channel:c

Sau khi chạy SUBSCRIBE, kết nối chuyển sang "subscriber mode" — Redis bắt đầu đẩy message về khi có publisher gửi lên channel đó.

Để dừng lắng nghe:

UNSUBSCRIBE channel:notifications
# Hoặc unsubscribe tất cả:
UNSUBSCRIBE

Demo: hai terminal

Mở terminal 1 (subscriber):

redis-cli
> SUBSCRIBE channel:notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:notifications"
3) (integer) 1

Mở terminal 2 (publisher):

redis-cli PUBLISH channel:notifications "Hello from publisher"
# (integer) 1

Terminal 1 nhận ngay:

1) "message"
2) "channel:notifications"
3) "Hello from publisher"

Nếu đóng terminal 1 rồi mới publish — không có gì nhận được, message mất.

4

Cảnh Báo: Fire-and-forget & Mất Message

Đây là điểm quan trọng nhất của Pub/Sub Redis. Hầu hết lỗi thiết kế khi dùng Pub/Sub đều xuất phát từ việc bỏ qua hành vi này.

Những trường hợp mất message

  • Subscriber chưa connect: message publish trước khi subscriber gọi SUBSCRIBE sẽ mất hoàn toàn. Redis không buffer chờ.
  • Subscriber disconnect (network blip): nếu subscriber mất kết nối dù chỉ 2 giây, toàn bộ message publish trong 2 giây đó không được nhận.
  • Subscriber reconnect: sau khi kết nối lại và gọi SUBSCRIBE, chỉ nhận message từ thời điểm đó trở đi. Không có cơ chế "nhận lại từ vị trí X".
  • Redis restart: mọi message đang trong hành trình đều mất — Pub/Sub không có persistence.

Không có ack, không có replay

Redis không xác nhận subscriber đã xử lý thành công. Không có cơ chế retry nếu subscriber nhận message nhưng crash giữa chừng. Không có consumer group để chia tải. Không có ID để đọc lại từ vị trí cụ thể.

Kết luận thực tế

Pub/Sub phù hợp khi mất một số message là chấp nhận được. Ví dụ: push notification UI (user không online thì không cần nhận), live dashboard (refresh tiếp theo sẽ cập nhật), cache invalidation broadcast (TTL sẽ tự hết hạn sau cùng). Với task không được mất — thanh toán, xử lý đơn hàng, event log — dùng Streams (bài 55).

Incident thực tế (Module 6): Subscriber reconnect sau 2 giây network blip, mất toàn bộ message Pub/Sub trong khoảng đó. Không có replay. Hệ thống phải thiết kế lại để chấp nhận mất dữ liệu ngắn, hoặc chuyển sang Streams với consumer group.
5

Code Python — Subscriber

Dùng redis-py (pip install redis). Subscriber phải chạy trên connection riêng — không dùng chung connection pool thông thường.

import json
import redis

# decode_responses=True để nhận str thay vì bytes
r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)

# Tạo đối tượng pubsub từ connection riêng
pubsub = r.pubsub()
pubsub.subscribe("channel:notifications")


def process(data: str) -> None:
    """Xử lý message nhận được."""
    try:
        payload = json.loads(data)
        print(f"Received: {payload}")
        # business logic ở đây...
    except json.JSONDecodeError:
        print(f"Raw message: {data}")


# listen() là generator blocking — chạy vô hạn cho tới khi ngắt
for message in pubsub.listen():
    # Lọc chỉ lấy message thật (bỏ qua subscribe/unsubscribe confirmation)
    if message["type"] == "message":
        process(message["data"])

Cấu trúc của mỗi message dict từ listen():

  • type: "message" (message thật), "subscribe" / "unsubscribe" (sự kiện quản lý), "pmessage" (pattern match).
  • channel: tên channel message đến từ.
  • data: nội dung message (string).
  • pattern: pattern đã match (chỉ có khi dùng PSUBSCRIBE, giá trị None với SUBSCRIBE thường).

Subscriber trên thread riêng

Trong ứng dụng thực tế, subscriber thường chạy trên thread hoặc asyncio task riêng để không block main thread:

import threading
import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)


def run_subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe("channel:notifications")
    for message in pubsub.listen():
        if message["type"] == "message":
            handle_notification(message["data"])


thread = threading.Thread(target=run_subscriber, daemon=True)
thread.start()
6

Code Python — Publisher

Publisher dùng connection bình thường — r.publish() không cần connection riêng, không chuyển sang subscriber mode.

import json
import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)


def notify_user(user_id: int, event: str, payload: dict) -> int:
    """
    Publish notification tới channel của user.
    Trả về số subscriber nhận được (0 = không ai online).
    """
    channel = f"notifications:user:{user_id}"
    message = json.dumps({"event": event, "data": payload})
    receiver_count = r.publish(channel, message)
    return receiver_count


# Ví dụ sử dụng
count = notify_user(
    user_id=123,
    event="order_placed",
    payload={"order_id": "ORD-456", "total": 299000}
)

if count == 0:
    # Không ai đang subscribe — message đã mất
    # Cân nhắc fallback: lưu DB, push notification qua FCM, v.v.
    pass

Lưu ý: khi r.publish() trả về 0, message không đến được ai. Tùy use case, bạn cần quyết định có cần fallback không — ví dụ lưu vào database để user đọc sau khi vào lại ứng dụng.

7

Pattern Subscription với PSUBSCRIBE

PSUBSCRIBE cho phép subscribe theo pattern thay vì tên channel cố định. Hữu ích khi channel name được tạo động (theo user ID, room ID, tenant...).

# Subscribe tất cả channel bắt đầu bằng "notifications:user:"
PSUBSCRIBE notifications:user:*
# Match: notifications:user:123, notifications:user:456, notifications:user:abc

# Subscribe theo suffix
PSUBSCRIBE *.urgent
# Match: orders.urgent, payments.urgent, alerts.urgent

# Wildcard đơn ký tự với ?
PSUBSCRIBE session:????
# Match: session:abcd, session:1234 (đúng 4 ký tự)
PUNSUBSCRIBE notifications:user:*

Ký tự wildcard trong PSUBSCRIBE:

  • *: khớp nhiều ký tự bất kỳ (kể cả 0 ký tự).
  • ?: khớp đúng một ký tự bất kỳ.
  • [abc]: khớp một ký tự trong tập hợp.

Code Python với PSUBSCRIBE

import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
pubsub = r.pubsub()

# Subscribe theo pattern
pubsub.psubscribe("notifications:user:*")

for message in pubsub.listen():
    if message["type"] == "pmessage":
        # message["pattern"] = "notifications:user:*"
        # message["channel"] = "notifications:user:123"  (channel thật)
        # message["data"] = nội dung
        channel = message["channel"]
        user_id = channel.split(":")[-1]  # lấy "123" từ "notifications:user:123"
        print(f"User {user_id}: {message['data']}")

Type "pmessage" (thay vì "message") xuất hiện khi message đến qua PSUBSCRIBE. Message dict có thêm key "pattern" chứa pattern đã match.

Lưu ý hiệu năng với PSUBSCRIBE

Redis phải khớp pattern cho mỗi message publish, chi phí tỉ lệ với số pattern đang active và tần suất publish. Với ít chục pattern, không đáng kể. Nếu có hàng nghìn pattern phức tạp trên hàng triệu message/s, cần đo và cân nhắc thiết kế lại channel naming.

8

Subscriber Connection Model

Khi kết nối chạy SUBSCRIBE hoặc PSUBSCRIBE, connection chuyển sang subscriber mode. Trong mode này, connection chỉ chấp nhận một tập lệnh giới hạn:

  • SUBSCRIBE, UNSUBSCRIBE
  • PSUBSCRIBE, PUNSUBSCRIBE
  • SSUBSCRIBE, SUNSUBSCRIBE (Sharded Pub/Sub — Redis 7)
  • PING, RESET, QUIT

Mọi lệnh khác (GET, SET, HGET...) sẽ trả về lỗi:

> SUBSCRIBE channel:notifications
# Bây giờ connection ở subscriber mode
> GET some-key
# (error) ERR Command not allowed inside a subscription context

Hệ quả quan trọng cho application code:

  • Subscriber phải dùng connection riêng, không lấy từ connection pool chung.
  • Trong redis-py, đối tượng pubsub = r.pubsub() tự tạo connection riêng.
  • Với ioredis (Node.js): const sub = client.duplicate() — nhân đôi client với cùng config để tạo connection mới.

Implication cho scale

Nếu mỗi user browser mở WebSocket cần 1 subscriber Redis riêng, 10.000 user = 10.000 connection Redis. Redis có thể xử lý số lượng lớn connection nhưng đây vẫn là overhead đáng kể. Pattern chuẩn trong production: backend gateway — 1 hoặc vài connection subscriber Redis, forward message ra nhiều WebSocket client (phần này chi tiết ở bài 67).

9

Use Case: Cache Invalidation Broadcast

Bài 16 (multi-layer cache) đã đề cập Pub/Sub làm invalidation signal. Đây là một trong những use case phù hợp nhất với Pub/Sub vì mất message không gây hậu quả nghiêm trọng — cache sẽ tự hết hạn qua TTL.

import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)

# --- Producer (chạy khi có thay đổi DB) ---
def update_user(user_id: int, data: dict) -> None:
    db.update_user(user_id, data)                     # ghi DB trước
    r.publish("invalidate", f"user:{user_id}")        # broadcast invalidation signal


# --- Subscriber (chạy trên mỗi app instance) ---
l1_cache: dict = {}  # local in-process cache

def run_invalidation_subscriber() -> None:
    pubsub = r.pubsub()
    pubsub.subscribe("invalidate")

    for msg in pubsub.listen():
        if msg["type"] == "message":
            key = msg["data"]            # ví dụ "user:123"
            l1_cache.pop(key, None)      # xóa entry khỏi local cache
            # Lần đọc kế tiếp sẽ miss L1, load lại từ Redis L2 hoặc DB

Nếu một instance mất kết nối vài giây và bỏ lỡ invalidation signal — local cache của nó sẽ phục vụ dữ liệu cũ tới khi TTL hết. Đây là hành vi chấp nhận được cho cache invalidation, không phải bug.

10

Use Case: WebSocket Backplane

Khi chạy nhiều instance WebSocket server (horizontal scaling), mỗi user kết nối tới một instance cụ thể. Nếu user A gửi message tới user B nhưng B đang kết nối ở instance khác, cần cơ chế để message đi qua. Pub/Sub Redis làm backplane — lớp trung gian broadcast giữa các instance.

# Mô hình đơn giản hóa — chi tiết ở bài 67

# Instance 1 (user A kết nối ở đây)
def on_user_a_sends(message: str, target_user_id: int) -> None:
    channel = f"ws:user:{target_user_id}"
    r.publish(channel, message)   # broadcast qua Redis


# Instance 2 (user B kết nối ở đây)
# Subscriber Redis trên instance 2
def ws_subscriber() -> None:
    pubsub = r.pubsub()
    # Subscribe channel của mọi user kết nối tới instance này
    for uid in connected_user_ids:
        pubsub.subscribe(f"ws:user:{uid}")

    for msg in pubsub.listen():
        if msg["type"] == "message":
            uid = msg["channel"].split(":")[-1]
            ws_connections[uid].send(msg["data"])  # forward tới WebSocket client

Mô hình này giữ số connection Redis ở mức thấp (1 subscriber/instance) thay vì 1 subscriber/user. Bài 67 đi chi tiết hơn về quản lý subscribe/unsubscribe động khi user connect/disconnect.

11

Pub/Sub vs Streams — Bảng So Sánh

Streams (bài 55) và Pub/Sub đều dùng để truyền message qua Redis, nhưng đặc tính hoàn toàn khác nhau.

Tiêu chí Pub/Sub Streams
Delivery Fire-and-forget At-least-once (với consumer group)
Mất message Có (subscriber offline) Không (persist đến khi xác nhận)
Replay / read lại Không Có (query theo ID range)
Persistence Không RDB / AOF
Consumer group Không (mọi subscriber nhận) Có (chia tải giữa consumer)
Fan-out behavior Mặc định (mỗi subscriber nhận bản copy) Chia việc trong group, fanout qua nhiều group
Acknowledgement Không Có (XACK)
Latency Rất thấp (< 1ms, push) Thấp (poll hoặc blocking read)
Use case chính Realtime broadcast, cache invalidation, WebSocket backplane Reliable queue, event log, task queue

Khi nào dùng Pub/Sub

  • Real-time UI notification — user không online thì không cần nhận.
  • Cache invalidation broadcast giữa các instance (có TTL làm safety net).
  • Live dashboard update — mất vài update không ảnh hưởng.
  • Chat ephemeral (không cần history).
  • WebSocket backplane giữa các server instance.

Khi nào phải dùng Streams thay Pub/Sub

  • Task không được mất (thanh toán, xử lý đơn hàng, email transactional).
  • Cần replay hoặc đọc lại event từ vị trí cụ thể.
  • Cần consumer group để chia tải (nhiều worker xử lý song song).
  • Cần audit trail hay event log persistent.

Streams có thể thay Pub/Sub không?

Streams thay được Pub/Sub khi cần reliable. Ngược lại không đúng: Pub/Sub không thể thay Streams cho reliable messaging. Về latency, Pub/Sub thường thấp hơn vì là push thuần — Streams XREAD blocking gần tương đương nhưng có thêm overhead của persistence.

12

Anti-patterns & Best Practices

Anti-patterns

  • Dùng Pub/Sub như queue cho task quan trọng: message mất khi subscriber offline không phải edge case — đây là hành vi bình thường của Pub/Sub. Reliable task cần Streams.
  • Dùng subscriber connection để chạy command khác: connection ở subscriber mode không chấp nhận GET, SET... Dùng connection riêng cho subscriber, connection pool riêng cho command.
  • 1 subscriber Redis / 1 user: với 10.000 user, đây là 10.000 connection Redis. Dùng backend gateway pattern thay thế.
  • Không handle disconnect: subscriber có thể mất kết nối bất cứ lúc nào. Nếu không có logic reconnect, listener sẽ dừng mà không báo lỗi rõ ràng.
  • Pub/Sub heavy trên Redis Cluster không dùng Sharded Pub/Sub: Pub/Sub mặc định broadcast tới tất cả node trong cluster. Redis 7 có Sharded Pub/Sub (SSUBSCRIBE, SPUBLISH) — chi tiết ở bài 68.

Best practices

  • Pub/Sub chỉ cho fire-and-forget realtime. Task cần đảm bảo delivery → Streams.
  • Backend gateway: 1 subscriber Redis per server instance, forward message tới nhiều WebSocket client.
  • Implement reconnect logic trong subscriber với backoff. Chấp nhận mất message ngắn khi reconnect.
  • Trên Redis Cluster, dùng Sharded Pub/Sub nếu volume cao (bài 68).
  • Dùng decode_responses=True với redis-py để tránh xử lý bytes thủ công.
  • Với cache invalidation: Pub/Sub là signal, TTL là safety net. Đừng bỏ TTL vì "đã có invalidation".
13

Tổng Kết & Quiz

  • Pub/Sub Redis là cơ chế publish/subscribe theo channel. Publisher gửi, Redis push tới mọi subscriber đang kết nối.
  • Fire-and-forget: message không được lưu. Subscriber offline → mất message, không có replay, không có ack.
  • PSUBSCRIBE cho phép subscribe theo pattern wildcard (*, ?, [...]).
  • Subscriber mode là exclusive: connection không chấp nhận lệnh Redis thông thường sau khi SUBSCRIBE.
  • Use case phù hợp: realtime broadcast chấp nhận mất, cache invalidation, WebSocket backplane.
  • Reliable messaging → Streams, không phải Pub/Sub.

Quiz

  1. Publisher gọi PUBLISH channel:events "hello" và nhận về (integer) 0. Điều này có nghĩa gì? Message có được Redis lưu lại để chờ subscriber tới sau không?
  2. Tại sao subscriber phải dùng connection riêng, không thể lấy từ connection pool thông thường?
  3. Ứng dụng có 50 server instance, mỗi instance phục vụ 5.000 WebSocket client. Nếu mỗi user cần 1 subscriber Redis riêng, cần bao nhiêu connection Redis tổng cộng? Đề xuất pattern tốt hơn.
  4. Pub/Sub phù hợp cho cache invalidation vì sao, trong khi lại không phù hợp cho reliable task queue?
  5. Một subscriber dùng PSUBSCRIBE notifications:user:*. Message type trong listen() là gì, và lấy channel thật từ key nào trong dict?

Đáp án gợi ý

  1. Trả về 0 nghĩa là không có subscriber nào đang online và kết nối tới channel đó tại thời điểm publish. Message biến mất — Redis không buffer hay lưu lại. Đây là fire-and-forget.
  2. Sau khi gọi SUBSCRIBE, connection vào subscriber mode và chỉ chấp nhận SUBSCRIBE/UNSUBSCRIBE/PING/QUIT. Lấy connection từ pool để subscribe sẽ khiến connection đó không dùng được cho lệnh thông thường nữa, làm hỏng pool.
  3. 50 × 5.000 = 250.000 connection Redis — không hợp lý. Pattern tốt hơn: backend gateway — mỗi server instance chỉ dùng 1 subscriber Redis, forward message tới WebSocket client tương ứng theo user ID. Tổng chỉ cần ~50 subscriber connection.
  4. Cache invalidation chấp nhận mất signal vì có TTL làm safety net — dữ liệu sẽ tự làm mới khi cache miss. Task queue không được mất message vì mỗi task là một đơn vị công việc cụ thể (ví dụ gửi email, trừ điểm), mất task nghĩa là công việc không được thực hiện.
  5. Type là "pmessage". Channel thật lấy từ key message["channel"]. Pattern đã match lấy từ message["pattern"].

Bài tiếp theo

Bài 66 — Keyspace Notifications: Redis có thể tự phát sự kiện khi key bị set, expire, delete. Subscriber lắng nghe các sự kiện này để phản ứng mà không cần ứng dụng tự publish.

Tham khảo