Mục lục
- Mục Tiêu Bài Học
- Streams Là Gì & Tại Sao Cần
- Stream Entry ID
- XADD — Ghi Entry Vào Stream
- XLEN, XRANGE, XREVRANGE — Đọc Theo Vị Trí
- XREAD — Đọc Message Mới (Consumer Đơn Giản)
- Streams vs List — Bảng So Sánh
- Use Case 1 — Event Log
- Use Case 2 — Fan-out (Nhiều Consumer Độc Lập)
- Capped Stream — Quản Lý Memory
- Persistence & Durability
- XINFO — Inspect Stream
- Code Đầy Đủ (Python redis-py)
- Giới Hạn XREAD Đơn & Tại Sao Cần Consumer Group
- Anti-patterns & Best Practices
- Quiz
Mục Tiêu Bài Học
- Hiểu Redis Streams là append-only log — entry không bị xóa khi consume, có thể replay từ bất kỳ vị trí nào.
- Nắm cấu trúc Stream entry ID (milliseconds-sequence) và tại sao ID đóng vai trò như con trỏ vị trí trong log.
- Viết được XADD với auto-generate ID, field-value pairs, và MAXLEN approximate để giới hạn kích thước stream.
- Dùng XRANGE/XREVRANGE để đọc theo phạm vi, XREAD để nhận message mới (blocking và non-blocking).
- Phân biệt được fan-out (mỗi consumer đọc toàn bộ stream) với consumer group (chia message, bài 56).
- Biết XTRIM/MINID để cap stream theo size hoặc theo thời gian, tránh OOM.
- Nhận diện giới hạn của XREAD đơn và hiểu sự cần thiết của Consumer Groups.
Streams Là Gì & Tại Sao Cần
Bài 54 đã phân tích giới hạn của List queue: khi consumer RPOP hoặc LMOVE lấy message, message bị xóa khỏi List. Không có replay; nếu consumer crash sau khi lấy nhưng trước khi xử lý xong, message mất. Đây là mô hình at-most-once — đủ cho những tác vụ có thể bỏ qua, nhưng không đủ cho hệ thống cần đảm bảo xử lý.
Redis Streams giải quyết vấn đề này bằng mô hình append-only log: entry được ghi thêm vào cuối stream và không bị xóa khi consume. Consumer đọc bằng cách cung cấp vị trí bắt đầu (entry ID); stream trả về các entry từ vị trí đó trở đi. Consumer tự theo dõi vị trí đã đọc đến đâu.
So sánh với các hệ thống khác:
- Kafka topic: Streams tương tự về mô hình log, nhưng nằm trong Redis — không cần hạ tầng riêng, phù hợp quy mô vừa.
- Redis List: List là queue pop-and-delete; Streams là log read-and-keep.
- Redis Pub/Sub: Pub/Sub fire-and-forget, không persist; Streams persist qua RDB/AOF, replay được.
Redis Streams ra mắt trong Redis 5.0 (tháng 10/2018). Các lệnh bổ sung như XAUTOCLAIM (Redis 6.2) và cú pháp MINID cho XTRIM (Redis 6.2) được thêm sau.
Stream Entry ID
Mỗi entry trong stream có một ID duy nhất theo format:
<millisecondsTime>-<sequenceNumber>
# Ví dụ:
1735689600000-0 # milliseconds Unix timestamp + sequence 0
1735689600000-1 # cùng millisecond, entry tiếp theo (sequence tăng)
1735689600123-0 # millisecond khác, sequence reset về 0
Hai quy tắc quan trọng:
- Monotonically increasing: ID sau luôn lớn hơn ID trước. Redis từ chối XADD nếu ID bạn tự chỉ định nhỏ hơn hoặc bằng ID cuối cùng của stream.
- ID = vị trí trong log: XRANGE và XREAD đều dùng ID để xác định điểm bắt đầu đọc. Lưu lại last ID = biết đã đọc đến đâu.
Ba cách chỉ định ID khi XADD:
*— auto-generate (dùng trong hầu hết trường hợp).1735689600000-*— tự chỉ định millisecond, để Redis tự tăng sequence.1735689600000-5— chỉ định đầy đủ (dùng khi cần import log từ nguồn ngoài).
Khi dùng XRANGE, hai ký tự đặc biệt:
-: ID nhỏ nhất (đầu stream).+: ID lớn nhất (cuối stream).
Khi dùng XREAD:
0: đọc từ đầu stream (từ ID0-0).$: chỉ đọc entry mới xuất hiện sau thời điểm gọi lệnh.
XADD — Ghi Entry Vào Stream
Cú pháp cơ bản:
# Ghi entry với auto-generate ID
XADD stream:orders * order_id 123 amount 500 status pending
# Trả về ID đã được gán, ví dụ: "1735689600000-0"
# Ghi entry với MAXLEN để cap stream (approximate, hiệu quả hơn exact)
XADD stream:orders MAXLEN ~ 10000 * order_id 124 amount 300 status pending
# MAXLEN exact (không dùng ~) — ít dùng vì chậm hơn
XADD stream:orders MAXLEN 10000 * order_id 125 amount 200 status paid
Giải thích các thành phần:
stream:orders: tên stream (key Redis). Tự động tạo nếu chưa tồn tại.*: auto-generate ID. Redis dùng Unix time milliseconds + sequence.order_id 123 amount 500 status pending: các field-value pairs, giống Hash. Một entry có thể có nhiều cặp.MAXLEN ~ 10000: giới hạn stream tối đa khoảng 10.000 entry. Dấu~cho phép Redis trim theo radix tree node (hiệu quả hơn so với exact), thực tế có thể giữ hơi nhiều hơn 10.000 một chút.
XADD trả về ID của entry vừa được ghi. Nếu stream đầy theo MAXLEN, các entry cũ nhất sẽ bị xóa trước khi ghi entry mới.
# Kiểm tra: stream tự tạo khi XADD lần đầu
EXISTS stream:orders # 0
XADD stream:orders * msg hello
# "1735689600000-0"
EXISTS stream:orders # 1
TYPE stream:orders # stream
XLEN, XRANGE, XREVRANGE — Đọc Theo Vị Trí
Nhóm lệnh này đọc entry từ stream theo phạm vi ID — không blocking, không xóa entry sau khi đọc.
# Số entry trong stream
XLEN stream:orders
# (integer) 1500
# Đọc TẤT CẢ entry từ đầu đến cuối
XRANGE stream:orders - +
# Giới hạn số entry trả về
XRANGE stream:orders - + COUNT 10 # 10 entry đầu tiên
# Đọc từ ID cụ thể trở đi
XRANGE stream:orders 1735689600000-0 + # từ entry này đến cuối
# Đọc theo khoảng ID
XRANGE stream:orders 1735689600000 1735689700000 # khoảng 100s
# XREVRANGE: đọc từ cuối về đầu (entry mới nhất trước)
XREVRANGE stream:orders + - COUNT 10 # 10 entry mới nhất
Format output của XRANGE:
XRANGE stream:orders - + COUNT 2
# 1) 1) "1735689600000-0"
# 2) 1) "order_id"
# 2) "123"
# 3) "amount"
# 4) "500"
# 5) "status"
# 6) "pending"
# 2) 1) "1735689600001-0"
# 2) 1) "order_id"
# 2) "124"
# ...
Mỗi entry là một tuple gồm ID và danh sách field-value xen kẽ. Client library (redis-py, ioredis) thường tự parse thành dict/object.
XRANGE theo partial timestamp: có thể dùng chỉ phần millisecond (không cần sequence), Redis tự suy ra:
# Đọc entry từ timestamp 1735689600000 trở đi
# Redis tự chuyển thành 1735689600000-0 cho start
XRANGE stream:orders 1735689600000 +
XREAD — Đọc Message Mới (Consumer Đơn Giản)
XREAD thiết kế cho consumer đọc message mới — không cần biết ID cụ thể trước, chỉ cần biết "tôi đã đọc đến đâu" hoặc "chỉ đọc entry xuất hiện từ bây giờ trở đi".
# Đọc tối đa 10 entry từ đầu stream
XREAD COUNT 10 STREAMS stream:orders 0
# Chỉ đọc entry CHƯA tồn tại lúc gọi lệnh (entry hoàn toàn mới)
XREAD COUNT 10 STREAMS stream:orders $
# BLOCKING: chờ tối đa 5 giây (5000ms) để có entry mới
# Trả về ngay nếu có entry mới, timeout trả về nil
XREAD BLOCK 5000 COUNT 10 STREAMS stream:orders $
# BLOCK 0: chờ vô hạn (dùng như long-poll)
XREAD BLOCK 0 COUNT 10 STREAMS stream:orders $
XREAD có thể đọc nhiều stream cùng lúc:
# Đọc đồng thời từ 2 stream
XREAD COUNT 5 STREAMS stream:orders stream:payments 0 0
# Phần STREAMS: tên các stream, sau đó tương ứng là last ID cho mỗi stream
# "0 0" nghĩa là đọc từ đầu cả hai stream
Format output của XREAD:
XREAD COUNT 2 STREAMS stream:orders 0
# 1) 1) "stream:orders"
# 2) 1) 1) "1735689600000-0"
# 2) 1) "order_id"
# 2) "123"
# ...
# 2) 1) "1735689600001-0"
# 2) ...
Output bọc theo stream name — quan trọng khi đọc nhiều stream cùng lúc.
Sự khác biệt giữa $ và 0:
0: đọc từ entry đầu tiên của stream. Phù hợp khi muốn replay toàn bộ.$: chỉ nhận entry được thêm vào sau thời điểm gọi XREAD. Phù hợp cho consumer mới không cần history. Lưu ý: nếu XREAD BLOCK với$và có entry mới vào trong lúc chờ, sẽ nhận được ngay; nhưng nếu không dùng BLOCK,$hầu như luôn trả về nil (không có entry nào "xuất hiện sau khi lệnh chạy xong" trên connection đồng bộ).
Streams vs List — Bảng So Sánh
| Tiêu chí | List | Streams |
|---|---|---|
| Consume | Pop + delete (entry mất) | Read, entry vẫn tồn tại |
| Replay | Không | Có (đọc lại từ bất kỳ ID nào) |
| Per-message ID | Không | Có (timestamp-sequence) |
| Consumer group | Không | Có (bài 56: XREADGROUP, XACK) |
| Acknowledgment | Manual (LMOVE pattern) | Built-in XACK (bài 56) |
| Persistence | Có (RDB/AOF) | Có (RDB/AOF) |
| Fan-out | Không trực tiếp | Có (mỗi consumer đọc toàn bộ) |
| Blocking read | BRPOP / BLMOVE | XREAD BLOCK |
| Cap size | LTRIM | MAXLEN / XTRIM / MINID |
| Use case chính | Simple queue, at-most-once | Reliable messaging, event log, audit |
Không phải Streams luôn tốt hơn List. Nếu tác vụ đơn giản, at-most-once (mất message chấp nhận được), List nhẹ hơn và đủ dùng. Streams phù hợp khi cần: replay, audit trail, nhiều consumer độc lập, hoặc reliable processing với ack.
Use Case 1 — Event Log
Stream phù hợp tự nhiên với event sourcing và audit log: mỗi action được ghi thêm vào stream theo thứ tự thời gian, hệ thống có thể replay lại để dựng lại state hoặc debug.
import redis
import time
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
# Producer: ghi event user action
def log_event(user_id: str, action: str, metadata: dict = None):
payload = {
"user_id": user_id,
"action": action,
"ts": str(int(time.time() * 1000)),
}
if metadata:
payload.update(metadata)
entry_id = r.xadd(
"events:user",
payload,
maxlen=1_000_000, # giữ tối đa ~1M event
approximate=True, # dùng ~ để hiệu quả hơn
)
return entry_id
log_event("u123", "login", {"ip": "1.2.3.4", "device": "mobile"})
log_event("u123", "view_product", {"product_id": "p456"})
log_event("u456", "purchase", {"order_id": "o789", "amount": "500"})
# Consumer: đọc event mới theo blocking poll
def consume_events(last_id: str = "$"):
while True:
# block=5000: chờ tối đa 5 giây rồi loop lại
result = r.xread({"events:user": last_id}, block=5000, count=50)
if not result:
# timeout, không có event mới — tiếp tục chờ
continue
for stream_name, entries in result:
for entry_id, fields in entries:
process_event(fields)
last_id = entry_id # cập nhật vị trí đã đọc
def process_event(fields: dict):
print(f"[{fields['ts']}] user={fields['user_id']} action={fields['action']}")
Điểm quan trọng: last_id phải được persist (ví dụ lưu vào Redis bằng SET, hoặc database) nếu muốn consumer tiếp tục từ vị trí cũ sau khi restart. XREAD đơn không tự track vị trí phía server — đây là giới hạn sẽ được nói rõ ở mục 14.
Use Case 2 — Fan-out (Nhiều Consumer Độc Lập)
Fan-out là pattern trong đó một event được xử lý bởi nhiều consumer độc lập — mỗi consumer đọc toàn bộ stream từ vị trí riêng của nó. Đây khác với consumer group (bài 56) là chia nhau message.
# Cùng một stream "events:order"
# Consumer A (analytics service): đọc từ vị trí riêng của A
XREAD COUNT 100 STREAMS events:order <last_id_A>
# Consumer B (notification service): đọc từ vị trí riêng của B
XREAD COUNT 100 STREAMS events:order <last_id_B>
# Consumer C (audit service): đọc từ vị trí riêng của C
XREAD COUNT 100 STREAMS events:order <last_id_C>
# Cả ba consumer đều thấy mọi message — không consumer nào "ăn" của consumer kia
Mỗi consumer tự lưu last_id của riêng mình (thường trong Redis SET hoặc database). Tốc độ xử lý mỗi consumer độc lập — consumer chậm không ảnh hưởng consumer nhanh.
Use case thực tế:
- Order service ghi event
order.createdvào stream. - Analytics service đọc để cập nhật dashboard.
- Notification service đọc để gửi email xác nhận.
- Audit service đọc để ghi vào audit log database.
Lưu ý: nếu một consumer bị chậm và stream bị trim (MAXLEN), consumer đó có thể mất entry đã bị trim trước khi kịp đọc. Với fan-out kiểu này, cần chú ý cân bằng giữa MAXLEN và tốc độ consumer chậm nhất.
Capped Stream — Quản Lý Memory
Stream không có TTL tự động. Nếu không giới hạn kích thước, stream grow vô hạn và gây OOM — đây là anti-pattern phổ biến nhất khi dùng Streams trong production.
Có ba cách giới hạn:
# 1. MAXLEN trong XADD — trim khi ghi (phổ biến nhất)
XADD stream:logs MAXLEN ~ 1000000 * level error msg "disk full"
# 2. XTRIM riêng — trim stream đã có
XTRIM stream:logs MAXLEN ~ 1000000
# 3. MINID (Redis 6.2+): xóa entry cũ hơn ID cụ thể
# Xóa mọi entry có ID < 1735603200000-0 (7 ngày trước)
XTRIM stream:logs MINID ~ 1735603200000
Sự khác biệt giữa exact và approximate (~):
- Exact (
MAXLEN 10000): Redis trim chính xác đến 10.000 entry. Phải duyệt tìm boundary chính xác — O(N) khi N lớn. - Approximate (
MAXLEN ~ 10000): Redis trim theo radix tree node (macro node). Nhanh hơn O(1), có thể giữ hơi nhiều hơn 10.000 một chút nhưng không đáng kể. Dùng trong hầu hết production case.
MINID hữu ích khi muốn retention theo thời gian (giữ 7 ngày) thay vì theo số lượng. Cần chạy XTRIM định kỳ (cron hoặc background job):
import time
def trim_old_entries(stream_name: str, retention_days: int = 7):
cutoff_ms = int((time.time() - retention_days * 86400) * 1000)
# Xóa mọi entry cũ hơn retention_days ngày
r.xtrim(stream_name, minid=str(cutoff_ms), approximate=True)
XDEL xóa entry theo ID cụ thể (đánh dấu deleted trong radix tree, memory thực sự được thu hồi khi node được trim):
XDEL stream:orders 1735689600000-0
# (integer) 1 — số entry thực sự bị xóa
XDEL ít dùng trong production; thường XTRIM/MAXLEN đủ.
Persistence & Durability
Streams persist qua RDB và AOF giống các data structure khác. Lựa chọn cấu hình AOF ảnh hưởng đến độ bền message khi crash:
appendfsync everysec(default): flush mỗi giây → mất tối đa ~1 giây message nếu Redis crash. Đây là thiết lập cân bằng performance/durability cho hầu hết trường hợp.appendfsync always: flush mỗi write → không mất message nhưng throughput giảm đáng kể (~10–20x chậm hơn). Chỉ dùng khi yêu cầu durability cao nhất.appendfsync no: để OS quyết định → nhanh nhất nhưng có thể mất nhiều giây message.
So sánh với List: List cũng persist qua AOF, nhưng mô hình RPOP-then-process của List có vấn đề: message bị pop khỏi List trước khi xử lý xong — nếu consumer crash giữa chừng, message đã mất khỏi List (không còn để replay). Streams giải quyết điều này bằng Consumer Groups với XACK (bài 56).
Nếu chỉ dùng XREAD đơn (không consumer group), persistence bảo vệ stream entries nhưng không bảo vệ vị trí đọc của consumer — vị trí đó phải được persist riêng.
XINFO — Inspect Stream
# Thông tin tổng quan về stream
XINFO STREAM stream:orders
# - length: số entry hiện tại
# - radix-tree-keys: số key trong radix tree nội bộ
# - radix-tree-nodes: số node
# - last-generated-id: ID của entry mới nhất
# - first-entry: entry đầu tiên (ID + fields)
# - last-entry: entry cuối cùng (ID + fields)
# - groups: số consumer group đang attach (bài 56)
# Xem danh sách consumer groups (sẽ có nội dung sau bài 56)
XINFO GROUPS stream:orders
# - name: tên group
# - consumers: số consumer trong group
# - pending: số message chưa XACK (PEL)
# - last-delivered-id: ID cuối đã giao cho group
# Thông tin full (Redis 7.0: XINFO STREAM ... FULL)
XINFO STREAM stream:orders FULL COUNT 10
XINFO STREAM hữu ích để:
- Kiểm tra stream có đang grow không kiểm soát không (theo dõi
length). - Xem
last-generated-idđể biết hoạt động gần nhất. - Debug khi cần kiểm tra entry đầu/cuối stream.
Code Đầy Đủ (Python redis-py)
Code dưới đây dùng redis-py (phiên bản 5.x). API xadd, xread, xrange tương ứng trực tiếp với Redis command.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM = "stream:orders"
CONSUMER_POSITION_KEY = "consumer:orders:last_id"
# ─── PRODUCER ────────────────────────────────────────────────────────────────
def publish_order(order_id: str, amount: float, status: str = "pending") -> str:
"""Ghi order event vào stream, trả về entry ID."""
entry_id = r.xadd(
STREAM,
{
"order_id": order_id,
"amount": str(amount),
"status": status,
},
maxlen=10_000,
approximate=True,
)
print(f"[PRODUCER] order={order_id} → entry_id={entry_id}")
return entry_id
# ─── CONSUMER (simple XREAD, không consumer group) ──────────────────────────
def load_position() -> str:
"""Đọc vị trí đã đọc lần trước. Trả về '0' nếu chưa có."""
saved = r.get(CONSUMER_POSITION_KEY)
return saved if saved else "0"
def save_position(entry_id: str):
"""Persist vị trí vừa xử lý xong."""
r.set(CONSUMER_POSITION_KEY, entry_id)
def process_order(fields: dict):
"""Xử lý một order entry."""
print(f"[CONSUMER] order={fields['order_id']} amount={fields['amount']} status={fields['status']}")
def run_consumer():
last_id = load_position()
print(f"[CONSUMER] bắt đầu từ ID: {last_id}")
while True:
# Đọc tối đa 20 entry, block tối đa 5 giây chờ message mới
result = r.xread({STREAM: last_id}, block=5000, count=20)
if not result:
# Hết timeout, không có message mới — loop lại
continue
for _stream_name, entries in result:
for entry_id, fields in entries:
try:
process_order(fields)
last_id = entry_id
save_position(last_id) # persist sau mỗi message
except Exception as e:
print(f"[CONSUMER] lỗi xử lý entry {entry_id}: {e}")
# với XREAD đơn: không có cơ chế retry tự động
# → cần consumer group + XACK (bài 56)
# ─── UTILITIES ───────────────────────────────────────────────────────────────
def inspect_stream():
info = r.xinfo_stream(STREAM)
print(f"length={info['length']}, last_id={info['last-generated-id']}")
def read_range(start: str = "-", end: str = "+", count: int = 10):
entries = r.xrange(STREAM, min=start, max=end, count=count)
for entry_id, fields in entries:
print(f" [{entry_id}] {fields}")
def trim_by_age(retention_days: int = 7):
cutoff_ms = int((time.time() - retention_days * 86400) * 1000)
trimmed = r.xtrim(STREAM, minid=str(cutoff_ms), approximate=True)
print(f"Đã trim {trimmed} entries cũ hơn {retention_days} ngày")
# ─── DEMO ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Ghi 3 order
publish_order("o001", 500.0, "pending")
publish_order("o002", 300.0, "paid")
publish_order("o003", 750.0, "pending")
# Đọc tất cả từ đầu
print("\nToàn bộ stream:")
read_range()
inspect_stream()
Giới Hạn XREAD Đơn & Tại Sao Cần Consumer Group
XREAD đơn (không consumer group) có những giới hạn nhất định khi dùng cho reliable processing:
- Vị trí consumer nằm phía client:
last_idphải tự lưu và tự load. Nếu consumer crash trước khi persistlast_id, lần restart tiếp theo không biết đã xử lý đến đâu — có thể xử lý lại (duplicate) hoặc bỏ sót tùy cách implement. - Không load balance tự động: mỗi instance consumer độc lập đọc toàn bộ stream. Không có cơ chế tự động chia message giữa nhiều worker.
- Không có ack phía server: Redis không biết consumer nào đã xử lý message nào. Không thể phát hiện message "đang xử lý dở" khi consumer crash.
- Không có retry tự động: nếu xử lý lỗi, XREAD không có cơ chế requeue hay claim lại message.
Bảng tóm tắt:
| Khả năng | XREAD đơn | Consumer Group (bài 56) |
|---|---|---|
| Track vị trí phía server | Không | Có (per group) |
| Load balance nhiều worker | Không | Có (mỗi message giao 1 worker) |
| Ack sau xử lý | Không | Có (XACK) |
| Pending Entry List (PEL) | Không | Có (biết message nào chưa ack) |
| Claim lại message bị stuck | Không | Có (XCLAIM / XAUTOCLAIM) |
| Fan-out (mọi consumer thấy message) | Có | Không (chia nhau) |
XREAD đơn phù hợp cho: event log đọc audit trail, fan-out nhiều service độc lập, replay history. Consumer Group phù hợp cho: background job processing, worker pool cần đảm bảo mỗi message xử lý đúng một lần.
Anti-patterns & Best Practices
Anti-patterns
- Stream không có MAXLEN: stream grow vô hạn, dùng toàn bộ RAM của Redis instance. Đây là nguyên nhân phổ biến nhất gây OOM với Streams trong production. Luôn đặt
MAXLEN ~hoặc chạy XTRIM định kỳ. - XREAD đơn cho reliable job processing: khi consumer crash, vị trí đọc mất (trừ khi persist riêng). Dùng consumer group (bài 56) cho processing cần đảm bảo.
- MAXLEN exact thay vì approximate:
XADD stream MAXLEN 10000 *chạy O(N) trim mỗi lần ghi. DùngMAXLEN ~ 10000để trim O(1) theo radix node. - Lưu payload lớn trong field: mỗi field-value được lưu inline trong stream entry. Payload vài KB/entry nhân với hàng triệu entry = OOM nhanh. Lưu pointer (ID hoặc key tham chiếu), fetch dữ liệu lớn từ nơi khác.
- Dùng Streams khi List đủ: nếu tác vụ đơn giản, at-most-once, không cần replay, List nhẹ hơn và đủ. Over-engineer không cần thiết tăng độ phức tạp.
- Đọc XRANGE - + không giới hạn: trên stream triệu entry, XRANGE không COUNT trả về toàn bộ, block Redis event loop. Luôn dùng COUNT.
Best Practices
- Luôn đặt
MAXLEN ~ <size>trong XADD hoặc chạy XTRIM định kỳ. - Dùng
XTRIM MINID ~khi muốn retention theo thời gian. - Persist
last_idsau mỗi message (hoặc theo batch nhỏ) khi dùng XREAD đơn. - Với fan-out nhiều service: mỗi service tự track
last_idđộc lập. - Consumer Group (bài 56) cho background job cần at-least-once với ack.
- Field nhỏ gọn: lưu pointer thay vì payload lớn.
- Bật AOF
everyseccho durability cơ bản;alwaysnếu không được mất bất kỳ message nào. - Monitor
XINFO STREAMđể phát hiện stream grow bất thường.
Quiz
- Stream entry ID
1735689600000-3có ý nghĩa gì? Tại sao ID này không thể nhỏ hơn ID của entry trước? - Sự khác biệt giữa
XREAD STREAMS mystream 0vàXREAD BLOCK 5000 STREAMS mystream $? - MAXLEN exact và MAXLEN approximate (
~) khác nhau ở điểm gì về hiệu năng? - Mô tả fan-out pattern dùng Streams. Tại sao hai consumer đọc cùng stream lại nhận được cùng tập message?
- Kể ba giới hạn của XREAD đơn khiến nó không phù hợp cho reliable background job processing.
Đáp án gợi ý
1735689600000là Unix timestamp tính bằng milliseconds;3là sequence number (entry thứ 4 trong cùng millisecond). ID monotonically increasing là bất biến của stream — Redis từ chối XADD nếu ID mới nhỏ hơn hoặc bằng ID cuối, đảm bảo thứ tự log không bị phá vỡ.XREAD ... 0: đọc non-blocking từ đầu stream (ID 0-0 trở đi), trả về ngay kể cả khi không có entry.XREAD BLOCK 5000 ... $: chỉ nhận entry xuất hiện sau thời điểm gọi, block connection tối đa 5 giây nếu chưa có entry mới, trả về nil nếu hết timeout.- MAXLEN exact: Redis trim đến chính xác N entry sau mỗi XADD, tốn O(N) vì phải tìm boundary chính xác. MAXLEN ~: trim theo radix tree macro node, O(1), có thể giữ hơi nhiều hơn N nhưng chênh lệch nhỏ và không đáng kể trong thực tế.
- Mỗi consumer dùng XREAD với
last_idcủa riêng mình. Stream không xóa entry khi được đọc — entry tồn tại cho đến khi bị trim/XDEL. Vì thế consumer A và consumer B đều có thể đọc cùng entry, mỗi người từ vị trí riêng → cùng tập message. - (1) Vị trí consumer (
last_id) phải tự persist phía client — crash mất vị trí; (2) không load balance tự động giữa nhiều worker instance; (3) không có ack phía server — Redis không biết message nào đã xử lý thành công, không thể recover message "đang xử lý dở" khi consumer crash.
Bài tiếp theo
Bài 56 đi vào Consumer Groups: XREADGROUP, XACK, Pending Entry List (PEL), và XAUTOCLAIM để xử lý message bị stuck khi consumer crash.
