Danh sách bài viết

Bài 55: Redis Streams — Log Primitive (XADD/XREAD)

Redis Streams (ra mắt Redis 5.0, 2018) là cấu trúc dữ liệu append-only log nằm trong Redis: persistent qua RDB/AOF, replay được, mỗi entry có ID dạng timestamp-sequence. Bài này tập trung vào các lệnh cơ bản — XADD để ghi entry, XREAD để đọc message mới (blocking hoặc non-blocking), XRANGE/XREVRANGE để truy vấn theo vị trí — cùng cách giới hạn kích thước stream với MAXLEN/XTRIM. Bài giải thích rõ tại sao Streams khác List về mô hình consume (entry không bị xóa khi đọc), fan-out pattern, và những giới hạn của XREAD đơn dẫn đến sự cần thiết của Consumer Groups (bài 56).

28/05/2026
0 lượt xem
1

Mục Tiêu Bài Học

  • Hiểu Redis Streams là append-only log — entry không bị xóa khi consume, có thể replay từ bất kỳ vị trí nào.
  • Nắm cấu trúc Stream entry ID (milliseconds-sequence) và tại sao ID đóng vai trò như con trỏ vị trí trong log.
  • Viết được XADD với auto-generate ID, field-value pairs, và MAXLEN approximate để giới hạn kích thước stream.
  • Dùng XRANGE/XREVRANGE để đọc theo phạm vi, XREAD để nhận message mới (blocking và non-blocking).
  • Phân biệt được fan-out (mỗi consumer đọc toàn bộ stream) với consumer group (chia message, bài 56).
  • Biết XTRIM/MINID để cap stream theo size hoặc theo thời gian, tránh OOM.
  • Nhận diện giới hạn của XREAD đơn và hiểu sự cần thiết của Consumer Groups.
2

Streams Là Gì & Tại Sao Cần

Bài 54 đã phân tích giới hạn của List queue: khi consumer RPOP hoặc LMOVE lấy message, message bị xóa khỏi List. Không có replay; nếu consumer crash sau khi lấy nhưng trước khi xử lý xong, message mất. Đây là mô hình at-most-once — đủ cho những tác vụ có thể bỏ qua, nhưng không đủ cho hệ thống cần đảm bảo xử lý.

Redis Streams giải quyết vấn đề này bằng mô hình append-only log: entry được ghi thêm vào cuối stream và không bị xóa khi consume. Consumer đọc bằng cách cung cấp vị trí bắt đầu (entry ID); stream trả về các entry từ vị trí đó trở đi. Consumer tự theo dõi vị trí đã đọc đến đâu.

So sánh với các hệ thống khác:

  • Kafka topic: Streams tương tự về mô hình log, nhưng nằm trong Redis — không cần hạ tầng riêng, phù hợp quy mô vừa.
  • Redis List: List là queue pop-and-delete; Streams là log read-and-keep.
  • Redis Pub/Sub: Pub/Sub fire-and-forget, không persist; Streams persist qua RDB/AOF, replay được.

Redis Streams ra mắt trong Redis 5.0 (tháng 10/2018). Các lệnh bổ sung như XAUTOCLAIM (Redis 6.2) và cú pháp MINID cho XTRIM (Redis 6.2) được thêm sau.

3

Stream Entry ID

Mỗi entry trong stream có một ID duy nhất theo format:

<millisecondsTime>-<sequenceNumber>
# Ví dụ:
1735689600000-0    # milliseconds Unix timestamp + sequence 0
1735689600000-1    # cùng millisecond, entry tiếp theo (sequence tăng)
1735689600123-0    # millisecond khác, sequence reset về 0

Hai quy tắc quan trọng:

  • Monotonically increasing: ID sau luôn lớn hơn ID trước. Redis từ chối XADD nếu ID bạn tự chỉ định nhỏ hơn hoặc bằng ID cuối cùng của stream.
  • ID = vị trí trong log: XRANGE và XREAD đều dùng ID để xác định điểm bắt đầu đọc. Lưu lại last ID = biết đã đọc đến đâu.

Ba cách chỉ định ID khi XADD:

  • * — auto-generate (dùng trong hầu hết trường hợp).
  • 1735689600000-* — tự chỉ định millisecond, để Redis tự tăng sequence.
  • 1735689600000-5 — chỉ định đầy đủ (dùng khi cần import log từ nguồn ngoài).

Khi dùng XRANGE, hai ký tự đặc biệt:

  • -: ID nhỏ nhất (đầu stream).
  • +: ID lớn nhất (cuối stream).

Khi dùng XREAD:

  • 0: đọc từ đầu stream (từ ID 0-0).
  • $: chỉ đọc entry mới xuất hiện sau thời điểm gọi lệnh.
4

XADD — Ghi Entry Vào Stream

Cú pháp cơ bản:

# Ghi entry với auto-generate ID
XADD stream:orders * order_id 123 amount 500 status pending
# Trả về ID đã được gán, ví dụ: "1735689600000-0"

# Ghi entry với MAXLEN để cap stream (approximate, hiệu quả hơn exact)
XADD stream:orders MAXLEN ~ 10000 * order_id 124 amount 300 status pending

# MAXLEN exact (không dùng ~) — ít dùng vì chậm hơn
XADD stream:orders MAXLEN 10000 * order_id 125 amount 200 status paid

Giải thích các thành phần:

  • stream:orders: tên stream (key Redis). Tự động tạo nếu chưa tồn tại.
  • *: auto-generate ID. Redis dùng Unix time milliseconds + sequence.
  • order_id 123 amount 500 status pending: các field-value pairs, giống Hash. Một entry có thể có nhiều cặp.
  • MAXLEN ~ 10000: giới hạn stream tối đa khoảng 10.000 entry. Dấu ~ cho phép Redis trim theo radix tree node (hiệu quả hơn so với exact), thực tế có thể giữ hơi nhiều hơn 10.000 một chút.

XADD trả về ID của entry vừa được ghi. Nếu stream đầy theo MAXLEN, các entry cũ nhất sẽ bị xóa trước khi ghi entry mới.

# Kiểm tra: stream tự tạo khi XADD lần đầu
EXISTS stream:orders        # 0
XADD stream:orders * msg hello
# "1735689600000-0"
EXISTS stream:orders        # 1
TYPE stream:orders          # stream
5

XLEN, XRANGE, XREVRANGE — Đọc Theo Vị Trí

Nhóm lệnh này đọc entry từ stream theo phạm vi ID — không blocking, không xóa entry sau khi đọc.

# Số entry trong stream
XLEN stream:orders
# (integer) 1500

# Đọc TẤT CẢ entry từ đầu đến cuối
XRANGE stream:orders - +

# Giới hạn số entry trả về
XRANGE stream:orders - + COUNT 10        # 10 entry đầu tiên

# Đọc từ ID cụ thể trở đi
XRANGE stream:orders 1735689600000-0 +   # từ entry này đến cuối

# Đọc theo khoảng ID
XRANGE stream:orders 1735689600000 1735689700000   # khoảng 100s

# XREVRANGE: đọc từ cuối về đầu (entry mới nhất trước)
XREVRANGE stream:orders + - COUNT 10     # 10 entry mới nhất

Format output của XRANGE:

XRANGE stream:orders - + COUNT 2
# 1) 1) "1735689600000-0"
#    2) 1) "order_id"
#       2) "123"
#       3) "amount"
#       4) "500"
#       5) "status"
#       6) "pending"
# 2) 1) "1735689600001-0"
#    2) 1) "order_id"
#       2) "124"
#       ...

Mỗi entry là một tuple gồm ID và danh sách field-value xen kẽ. Client library (redis-py, ioredis) thường tự parse thành dict/object.

XRANGE theo partial timestamp: có thể dùng chỉ phần millisecond (không cần sequence), Redis tự suy ra:

# Đọc entry từ timestamp 1735689600000 trở đi
# Redis tự chuyển thành 1735689600000-0 cho start
XRANGE stream:orders 1735689600000 +
6

XREAD — Đọc Message Mới (Consumer Đơn Giản)

XREAD thiết kế cho consumer đọc message mới — không cần biết ID cụ thể trước, chỉ cần biết "tôi đã đọc đến đâu" hoặc "chỉ đọc entry xuất hiện từ bây giờ trở đi".

# Đọc tối đa 10 entry từ đầu stream
XREAD COUNT 10 STREAMS stream:orders 0

# Chỉ đọc entry CHƯA tồn tại lúc gọi lệnh (entry hoàn toàn mới)
XREAD COUNT 10 STREAMS stream:orders $

# BLOCKING: chờ tối đa 5 giây (5000ms) để có entry mới
# Trả về ngay nếu có entry mới, timeout trả về nil
XREAD BLOCK 5000 COUNT 10 STREAMS stream:orders $

# BLOCK 0: chờ vô hạn (dùng như long-poll)
XREAD BLOCK 0 COUNT 10 STREAMS stream:orders $

XREAD có thể đọc nhiều stream cùng lúc:

# Đọc đồng thời từ 2 stream
XREAD COUNT 5 STREAMS stream:orders stream:payments 0 0
# Phần STREAMS: tên các stream, sau đó tương ứng là last ID cho mỗi stream
# "0 0" nghĩa là đọc từ đầu cả hai stream

Format output của XREAD:

XREAD COUNT 2 STREAMS stream:orders 0
# 1) 1) "stream:orders"
#    2) 1) 1) "1735689600000-0"
#             2) 1) "order_id"
#                2) "123"
#                ...
#          2) 1) "1735689600001-0"
#             2) ...

Output bọc theo stream name — quan trọng khi đọc nhiều stream cùng lúc.

Sự khác biệt giữa $0:

  • 0: đọc từ entry đầu tiên của stream. Phù hợp khi muốn replay toàn bộ.
  • $: chỉ nhận entry được thêm vào sau thời điểm gọi XREAD. Phù hợp cho consumer mới không cần history. Lưu ý: nếu XREAD BLOCK với $ và có entry mới vào trong lúc chờ, sẽ nhận được ngay; nhưng nếu không dùng BLOCK, $ hầu như luôn trả về nil (không có entry nào "xuất hiện sau khi lệnh chạy xong" trên connection đồng bộ).
7

Streams vs List — Bảng So Sánh

Tiêu chí List Streams
Consume Pop + delete (entry mất) Read, entry vẫn tồn tại
Replay Không Có (đọc lại từ bất kỳ ID nào)
Per-message ID Không Có (timestamp-sequence)
Consumer group Không Có (bài 56: XREADGROUP, XACK)
Acknowledgment Manual (LMOVE pattern) Built-in XACK (bài 56)
Persistence Có (RDB/AOF) Có (RDB/AOF)
Fan-out Không trực tiếp Có (mỗi consumer đọc toàn bộ)
Blocking read BRPOP / BLMOVE XREAD BLOCK
Cap size LTRIM MAXLEN / XTRIM / MINID
Use case chính Simple queue, at-most-once Reliable messaging, event log, audit

Không phải Streams luôn tốt hơn List. Nếu tác vụ đơn giản, at-most-once (mất message chấp nhận được), List nhẹ hơn và đủ dùng. Streams phù hợp khi cần: replay, audit trail, nhiều consumer độc lập, hoặc reliable processing với ack.

8

Use Case 1 — Event Log

Stream phù hợp tự nhiên với event sourcing và audit log: mỗi action được ghi thêm vào stream theo thứ tự thời gian, hệ thống có thể replay lại để dựng lại state hoặc debug.

import redis
import time

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# Producer: ghi event user action
def log_event(user_id: str, action: str, metadata: dict = None):
    payload = {
        "user_id": user_id,
        "action": action,
        "ts": str(int(time.time() * 1000)),
    }
    if metadata:
        payload.update(metadata)
    entry_id = r.xadd(
        "events:user",
        payload,
        maxlen=1_000_000,   # giữ tối đa ~1M event
        approximate=True,   # dùng ~ để hiệu quả hơn
    )
    return entry_id

log_event("u123", "login", {"ip": "1.2.3.4", "device": "mobile"})
log_event("u123", "view_product", {"product_id": "p456"})
log_event("u456", "purchase", {"order_id": "o789", "amount": "500"})
# Consumer: đọc event mới theo blocking poll
def consume_events(last_id: str = "$"):
    while True:
        # block=5000: chờ tối đa 5 giây rồi loop lại
        result = r.xread({"events:user": last_id}, block=5000, count=50)
        if not result:
            # timeout, không có event mới — tiếp tục chờ
            continue
        for stream_name, entries in result:
            for entry_id, fields in entries:
                process_event(fields)
                last_id = entry_id   # cập nhật vị trí đã đọc

def process_event(fields: dict):
    print(f"[{fields['ts']}] user={fields['user_id']} action={fields['action']}")

Điểm quan trọng: last_id phải được persist (ví dụ lưu vào Redis bằng SET, hoặc database) nếu muốn consumer tiếp tục từ vị trí cũ sau khi restart. XREAD đơn không tự track vị trí phía server — đây là giới hạn sẽ được nói rõ ở mục 14.

9

Use Case 2 — Fan-out (Nhiều Consumer Độc Lập)

Fan-out là pattern trong đó một event được xử lý bởi nhiều consumer độc lập — mỗi consumer đọc toàn bộ stream từ vị trí riêng của nó. Đây khác với consumer group (bài 56) là chia nhau message.

# Cùng một stream "events:order"
# Consumer A (analytics service): đọc từ vị trí riêng của A
XREAD COUNT 100 STREAMS events:order <last_id_A>

# Consumer B (notification service): đọc từ vị trí riêng của B
XREAD COUNT 100 STREAMS events:order <last_id_B>

# Consumer C (audit service): đọc từ vị trí riêng của C
XREAD COUNT 100 STREAMS events:order <last_id_C>
# Cả ba consumer đều thấy mọi message — không consumer nào "ăn" của consumer kia

Mỗi consumer tự lưu last_id của riêng mình (thường trong Redis SET hoặc database). Tốc độ xử lý mỗi consumer độc lập — consumer chậm không ảnh hưởng consumer nhanh.

Use case thực tế:

  • Order service ghi event order.created vào stream.
  • Analytics service đọc để cập nhật dashboard.
  • Notification service đọc để gửi email xác nhận.
  • Audit service đọc để ghi vào audit log database.

Lưu ý: nếu một consumer bị chậm và stream bị trim (MAXLEN), consumer đó có thể mất entry đã bị trim trước khi kịp đọc. Với fan-out kiểu này, cần chú ý cân bằng giữa MAXLEN và tốc độ consumer chậm nhất.

10

Capped Stream — Quản Lý Memory

Stream không có TTL tự động. Nếu không giới hạn kích thước, stream grow vô hạn và gây OOM — đây là anti-pattern phổ biến nhất khi dùng Streams trong production.

Có ba cách giới hạn:

# 1. MAXLEN trong XADD — trim khi ghi (phổ biến nhất)
XADD stream:logs MAXLEN ~ 1000000 * level error msg "disk full"

# 2. XTRIM riêng — trim stream đã có
XTRIM stream:logs MAXLEN ~ 1000000

# 3. MINID (Redis 6.2+): xóa entry cũ hơn ID cụ thể
# Xóa mọi entry có ID < 1735603200000-0 (7 ngày trước)
XTRIM stream:logs MINID ~ 1735603200000

Sự khác biệt giữa exact và approximate (~):

  • Exact (MAXLEN 10000): Redis trim chính xác đến 10.000 entry. Phải duyệt tìm boundary chính xác — O(N) khi N lớn.
  • Approximate (MAXLEN ~ 10000): Redis trim theo radix tree node (macro node). Nhanh hơn O(1), có thể giữ hơi nhiều hơn 10.000 một chút nhưng không đáng kể. Dùng trong hầu hết production case.

MINID hữu ích khi muốn retention theo thời gian (giữ 7 ngày) thay vì theo số lượng. Cần chạy XTRIM định kỳ (cron hoặc background job):

import time

def trim_old_entries(stream_name: str, retention_days: int = 7):
    cutoff_ms = int((time.time() - retention_days * 86400) * 1000)
    # Xóa mọi entry cũ hơn retention_days ngày
    r.xtrim(stream_name, minid=str(cutoff_ms), approximate=True)

XDEL xóa entry theo ID cụ thể (đánh dấu deleted trong radix tree, memory thực sự được thu hồi khi node được trim):

XDEL stream:orders 1735689600000-0
# (integer) 1   — số entry thực sự bị xóa

XDEL ít dùng trong production; thường XTRIM/MAXLEN đủ.

11

Persistence & Durability

Streams persist qua RDB và AOF giống các data structure khác. Lựa chọn cấu hình AOF ảnh hưởng đến độ bền message khi crash:

  • appendfsync everysec (default): flush mỗi giây → mất tối đa ~1 giây message nếu Redis crash. Đây là thiết lập cân bằng performance/durability cho hầu hết trường hợp.
  • appendfsync always: flush mỗi write → không mất message nhưng throughput giảm đáng kể (~10–20x chậm hơn). Chỉ dùng khi yêu cầu durability cao nhất.
  • appendfsync no: để OS quyết định → nhanh nhất nhưng có thể mất nhiều giây message.

So sánh với List: List cũng persist qua AOF, nhưng mô hình RPOP-then-process của List có vấn đề: message bị pop khỏi List trước khi xử lý xong — nếu consumer crash giữa chừng, message đã mất khỏi List (không còn để replay). Streams giải quyết điều này bằng Consumer Groups với XACK (bài 56).

Nếu chỉ dùng XREAD đơn (không consumer group), persistence bảo vệ stream entries nhưng không bảo vệ vị trí đọc của consumer — vị trí đó phải được persist riêng.

12

XINFO — Inspect Stream

# Thông tin tổng quan về stream
XINFO STREAM stream:orders
# - length: số entry hiện tại
# - radix-tree-keys: số key trong radix tree nội bộ
# - radix-tree-nodes: số node
# - last-generated-id: ID của entry mới nhất
# - first-entry: entry đầu tiên (ID + fields)
# - last-entry: entry cuối cùng (ID + fields)
# - groups: số consumer group đang attach (bài 56)

# Xem danh sách consumer groups (sẽ có nội dung sau bài 56)
XINFO GROUPS stream:orders
# - name: tên group
# - consumers: số consumer trong group
# - pending: số message chưa XACK (PEL)
# - last-delivered-id: ID cuối đã giao cho group

# Thông tin full (Redis 7.0: XINFO STREAM ... FULL)
XINFO STREAM stream:orders FULL COUNT 10

XINFO STREAM hữu ích để:

  • Kiểm tra stream có đang grow không kiểm soát không (theo dõi length).
  • Xem last-generated-id để biết hoạt động gần nhất.
  • Debug khi cần kiểm tra entry đầu/cuối stream.
13

Code Đầy Đủ (Python redis-py)

Code dưới đây dùng redis-py (phiên bản 5.x). API xadd, xread, xrange tương ứng trực tiếp với Redis command.

import redis
import time
import json

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM = "stream:orders"
CONSUMER_POSITION_KEY = "consumer:orders:last_id"


# ─── PRODUCER ────────────────────────────────────────────────────────────────

def publish_order(order_id: str, amount: float, status: str = "pending") -> str:
    """Ghi order event vào stream, trả về entry ID."""
    entry_id = r.xadd(
        STREAM,
        {
            "order_id": order_id,
            "amount": str(amount),
            "status": status,
        },
        maxlen=10_000,
        approximate=True,
    )
    print(f"[PRODUCER] order={order_id} → entry_id={entry_id}")
    return entry_id


# ─── CONSUMER (simple XREAD, không consumer group) ──────────────────────────

def load_position() -> str:
    """Đọc vị trí đã đọc lần trước. Trả về '0' nếu chưa có."""
    saved = r.get(CONSUMER_POSITION_KEY)
    return saved if saved else "0"


def save_position(entry_id: str):
    """Persist vị trí vừa xử lý xong."""
    r.set(CONSUMER_POSITION_KEY, entry_id)


def process_order(fields: dict):
    """Xử lý một order entry."""
    print(f"[CONSUMER] order={fields['order_id']} amount={fields['amount']} status={fields['status']}")


def run_consumer():
    last_id = load_position()
    print(f"[CONSUMER] bắt đầu từ ID: {last_id}")

    while True:
        # Đọc tối đa 20 entry, block tối đa 5 giây chờ message mới
        result = r.xread({STREAM: last_id}, block=5000, count=20)
        if not result:
            # Hết timeout, không có message mới — loop lại
            continue

        for _stream_name, entries in result:
            for entry_id, fields in entries:
                try:
                    process_order(fields)
                    last_id = entry_id
                    save_position(last_id)   # persist sau mỗi message
                except Exception as e:
                    print(f"[CONSUMER] lỗi xử lý entry {entry_id}: {e}")
                    # với XREAD đơn: không có cơ chế retry tự động
                    # → cần consumer group + XACK (bài 56)


# ─── UTILITIES ───────────────────────────────────────────────────────────────

def inspect_stream():
    info = r.xinfo_stream(STREAM)
    print(f"length={info['length']}, last_id={info['last-generated-id']}")

def read_range(start: str = "-", end: str = "+", count: int = 10):
    entries = r.xrange(STREAM, min=start, max=end, count=count)
    for entry_id, fields in entries:
        print(f"  [{entry_id}] {fields}")

def trim_by_age(retention_days: int = 7):
    cutoff_ms = int((time.time() - retention_days * 86400) * 1000)
    trimmed = r.xtrim(STREAM, minid=str(cutoff_ms), approximate=True)
    print(f"Đã trim {trimmed} entries cũ hơn {retention_days} ngày")


# ─── DEMO ────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Ghi 3 order
    publish_order("o001", 500.0, "pending")
    publish_order("o002", 300.0, "paid")
    publish_order("o003", 750.0, "pending")

    # Đọc tất cả từ đầu
    print("\nToàn bộ stream:")
    read_range()

    inspect_stream()
14

Giới Hạn XREAD Đơn & Tại Sao Cần Consumer Group

XREAD đơn (không consumer group) có những giới hạn nhất định khi dùng cho reliable processing:

  • Vị trí consumer nằm phía client: last_id phải tự lưu và tự load. Nếu consumer crash trước khi persist last_id, lần restart tiếp theo không biết đã xử lý đến đâu — có thể xử lý lại (duplicate) hoặc bỏ sót tùy cách implement.
  • Không load balance tự động: mỗi instance consumer độc lập đọc toàn bộ stream. Không có cơ chế tự động chia message giữa nhiều worker.
  • Không có ack phía server: Redis không biết consumer nào đã xử lý message nào. Không thể phát hiện message "đang xử lý dở" khi consumer crash.
  • Không có retry tự động: nếu xử lý lỗi, XREAD không có cơ chế requeue hay claim lại message.

Bảng tóm tắt:

Khả năng XREAD đơn Consumer Group (bài 56)
Track vị trí phía server Không Có (per group)
Load balance nhiều worker Không Có (mỗi message giao 1 worker)
Ack sau xử lý Không Có (XACK)
Pending Entry List (PEL) Không Có (biết message nào chưa ack)
Claim lại message bị stuck Không Có (XCLAIM / XAUTOCLAIM)
Fan-out (mọi consumer thấy message) Không (chia nhau)

XREAD đơn phù hợp cho: event log đọc audit trail, fan-out nhiều service độc lập, replay history. Consumer Group phù hợp cho: background job processing, worker pool cần đảm bảo mỗi message xử lý đúng một lần.

15

Anti-patterns & Best Practices

Anti-patterns

  • Stream không có MAXLEN: stream grow vô hạn, dùng toàn bộ RAM của Redis instance. Đây là nguyên nhân phổ biến nhất gây OOM với Streams trong production. Luôn đặt MAXLEN ~ hoặc chạy XTRIM định kỳ.
  • XREAD đơn cho reliable job processing: khi consumer crash, vị trí đọc mất (trừ khi persist riêng). Dùng consumer group (bài 56) cho processing cần đảm bảo.
  • MAXLEN exact thay vì approximate: XADD stream MAXLEN 10000 * chạy O(N) trim mỗi lần ghi. Dùng MAXLEN ~ 10000 để trim O(1) theo radix node.
  • Lưu payload lớn trong field: mỗi field-value được lưu inline trong stream entry. Payload vài KB/entry nhân với hàng triệu entry = OOM nhanh. Lưu pointer (ID hoặc key tham chiếu), fetch dữ liệu lớn từ nơi khác.
  • Dùng Streams khi List đủ: nếu tác vụ đơn giản, at-most-once, không cần replay, List nhẹ hơn và đủ. Over-engineer không cần thiết tăng độ phức tạp.
  • Đọc XRANGE - + không giới hạn: trên stream triệu entry, XRANGE không COUNT trả về toàn bộ, block Redis event loop. Luôn dùng COUNT.

Best Practices

  • Luôn đặt MAXLEN ~ <size> trong XADD hoặc chạy XTRIM định kỳ.
  • Dùng XTRIM MINID ~ khi muốn retention theo thời gian.
  • Persist last_id sau mỗi message (hoặc theo batch nhỏ) khi dùng XREAD đơn.
  • Với fan-out nhiều service: mỗi service tự track last_id độc lập.
  • Consumer Group (bài 56) cho background job cần at-least-once với ack.
  • Field nhỏ gọn: lưu pointer thay vì payload lớn.
  • Bật AOF everysec cho durability cơ bản; always nếu không được mất bất kỳ message nào.
  • Monitor XINFO STREAM để phát hiện stream grow bất thường.
16

Quiz

  1. Stream entry ID 1735689600000-3 có ý nghĩa gì? Tại sao ID này không thể nhỏ hơn ID của entry trước?
  2. Sự khác biệt giữa XREAD STREAMS mystream 0XREAD BLOCK 5000 STREAMS mystream $?
  3. MAXLEN exact và MAXLEN approximate (~) khác nhau ở điểm gì về hiệu năng?
  4. Mô tả fan-out pattern dùng Streams. Tại sao hai consumer đọc cùng stream lại nhận được cùng tập message?
  5. Kể ba giới hạn của XREAD đơn khiến nó không phù hợp cho reliable background job processing.

Đáp án gợi ý

  1. 1735689600000 là Unix timestamp tính bằng milliseconds; 3 là sequence number (entry thứ 4 trong cùng millisecond). ID monotonically increasing là bất biến của stream — Redis từ chối XADD nếu ID mới nhỏ hơn hoặc bằng ID cuối, đảm bảo thứ tự log không bị phá vỡ.
  2. XREAD ... 0: đọc non-blocking từ đầu stream (ID 0-0 trở đi), trả về ngay kể cả khi không có entry. XREAD BLOCK 5000 ... $: chỉ nhận entry xuất hiện sau thời điểm gọi, block connection tối đa 5 giây nếu chưa có entry mới, trả về nil nếu hết timeout.
  3. MAXLEN exact: Redis trim đến chính xác N entry sau mỗi XADD, tốn O(N) vì phải tìm boundary chính xác. MAXLEN ~: trim theo radix tree macro node, O(1), có thể giữ hơi nhiều hơn N nhưng chênh lệch nhỏ và không đáng kể trong thực tế.
  4. Mỗi consumer dùng XREAD với last_id của riêng mình. Stream không xóa entry khi được đọc — entry tồn tại cho đến khi bị trim/XDEL. Vì thế consumer A và consumer B đều có thể đọc cùng entry, mỗi người từ vị trí riêng → cùng tập message.
  5. (1) Vị trí consumer (last_id) phải tự persist phía client — crash mất vị trí; (2) không load balance tự động giữa nhiều worker instance; (3) không có ack phía server — Redis không biết message nào đã xử lý thành công, không thể recover message "đang xử lý dở" khi consumer crash.

Bài tiếp theo

Bài 56 đi vào Consumer Groups: XREADGROUP, XACK, Pending Entry List (PEL), và XAUTOCLAIM để xử lý message bị stuck khi consumer crash.

Tham khảo