Danh sách bài viết

Bài 57: XCLAIM & XAUTOCLAIM — Xử Lý Message Fail

Bài 56 đã trình bày cơ chế Consumer Groups và PEL (Pending Entry List): mỗi message được XREADGROUP sẽ vào PEL của consumer đó và chỉ rời PEL khi consumer gọi XACK. Vấn đề xảy ra khi consumer crash trước khi XACK — message kẹt trong PEL, không consumer nào tự nhận, hệ thống không tự phục hồi. XCLAIM cho phép claim thủ công message từ consumer dead sang consumer live. XAUTOCLAIM (Redis 6.2+) tự động hóa quá trình đó với cursor-based sweep. Bài này đi qua cả hai lệnh, giải thích min-idle-time, delivery count tracking để phát hiện poison message, pattern recovery loop trong worker, và cách monitor PEL trong production.

01/06/2026
0 lượt xem

Mục Tiêu Bài Học

  • Hiểu tại sao message kẹt trong PEL khi consumer crash và tại sao Redis không tự giải quyết.
  • Dùng XPENDING để tìm message đã idle quá lâu.
  • Dùng XCLAIM để claim thủ công message từ consumer dead sang consumer live.
  • Dùng XAUTOCLAIM (Redis 6.2+) để tự động hóa quá trình recovery.
  • Track delivery count để phát hiện poison message và tránh infinite retry loop.
  • Thiết kế worker self-healing cho môi trường có consumer crash định kỳ.

Vấn Đề: Consumer Crash & Message Kẹt PEL

Khi consumer A gọi XREADGROUP, Redis ghi message vào PEL của A với timestamp "delivered at". Consumer A phải gọi XACK để xóa message khỏi PEL. Đây là cơ chế at-least-once delivery.

Sự cố điển hình:

  1. Consumer A nhận message 1700000001000-0.
  2. A bắt đầu xử lý (gọi downstream API, ghi DB...).
  3. A crash do OOM, network partition, hoặc rolling deploy.
  4. Message vẫn trong PEL của A với idle time tăng dần.
  5. Consumer B và C tiếp tục chạy nhưng không tự nhận message từ PEL của A.
  6. Nếu không có cơ chế recovery: message không được xử lý vĩnh viễn.

Redis không tự recover vì thiết kế cố ý: Redis không biết consumer crash hay chỉ xử lý chậm. Quyết định "idle bao lâu là đủ để coi là stale" phải do application xác định. XCLAIM và XAUTOCLAIM là công cụ để thực hiện quyết định đó.

Incident thực tế

Hệ thống notification không có recovery loop. Một consumer node crash do OOM trong đợt traffic cao. 1 tuần sau, PEL tích lũy 10.000 message pending, user report "không nhận được email xác nhận đơn hàng". Team phát hiện qua metric PEL size tăng dần nhưng không có alert. Fix: thêm XAUTOCLAIM + manual recover toàn bộ PEL cũ.

XPENDING — Xem Message Stale

XPENDING cho biết message đang chờ trong PEL, kèm thông tin idle time và delivery count.

Summary mode

XPENDING stream:orders mygroup

Trả về: tổng pending count, ID min/max, breakdown per consumer.

Range mode với IDLE filter (Redis 7.0+)

XPENDING stream:orders mygroup IDLE 60000 - + 100
  • IDLE 60000: chỉ trả về message đã idle > 60.000ms (60 giây).
  • - +: từ ID nhỏ nhất đến lớn nhất (toàn bộ stream).
  • 100: giới hạn kết quả.

Mỗi entry trả về có: message ID, consumer name, idle time (ms), delivery count.

Dùng redis-py

import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM = "stream:orders"
GROUP = "mygroup"

# Lấy message đã idle > 60s
stale = r.xpending_range(
    STREAM, GROUP,
    min="-", max="+",
    count=100,
    idle=60000,  # ms, yêu cầu redis-py >= 4.3.4
)

for entry in stale:
    print(
        entry["message_id"],
        entry["consumer"],
        entry["time_since_delivered"],  # ms
        entry["times_delivered"],
    )

Trước Redis 7.0 không có IDLE filter trong XPENDING; phải tự filter theo time_since_delivered trong application code.

XCLAIM — Claim Thủ Công

Cú pháp:

XCLAIM stream:orders mygroup consumer2 60000 1700000001000-0 1700000002000-0
  • mygroup: consumer group.
  • consumer2: consumer muốn nhận message.
  • 60000: min-idle-time (ms) — chỉ claim message đã idle > 60s. Nếu message đang được xử lý (idle thấp), lệnh bỏ qua message đó (không báo lỗi).
  • Các ID theo sau: danh sách message cần claim.

Sau khi XCLAIM thành công:

  • Message được chuyển sang PEL của consumer2.
  • Idle time của message được reset về 0.
  • Delivery count tăng thêm 1.
  • Consumer2 có thể xử lý rồi XACK bình thường.

Nếu min-idle-time không thỏa (message vẫn đang xử lý), XCLAIM trả về empty list cho message đó — không lỗi, không claim.

Ví dụ với redis-py

msg_id = "1700000001000-0"

claimed = r.xclaim(
    STREAM,
    GROUP,
    "consumer2",
    min_idle_time=60000,
    message_ids=[msg_id],
)

for cid, fields in claimed:
    print(f"Claimed: {cid}, fields: {fields}")

min-idle-time — Chọn Bao Lâu

min-idle-time là tham số quan trọng nhất khi dùng XCLAIM/XAUTOCLAIM. Chọn sai dẫn đến hai loại lỗi:

min-idle-time Hậu quả
Quá ngắn (vd 5s) Claim nhầm message đang được consumer khác xử lý chậm (DB transaction chậm, downstream API timeout). Kết quả: duplicate processing.
Quá dài (vd 30 phút) Message kẹt lâu trước khi được recover. Với hệ thống real-time (payment, notification), 30 phút là không chấp nhận được.

Công thức chọn:

min_idle_time = P99(processing_time) + safety_buffer

Ví dụ: P99 processing time là 30s, đặt buffer 2x → min-idle = 60s. Nếu P99 chỉ 5s thì min-idle = 15–30s là hợp lý.

Để biết P99, track histogram processing time trong metric system (Prometheus, Datadog). Nếu chưa có metric, dùng con số bảo thủ 2–3 phút ban đầu rồi giảm dần.

Recovery Loop Thủ Công (Pre-6.2)

Trước Redis 6.2, không có XAUTOCLAIM. Cần tự viết recovery loop kết hợp XPENDING + XCLAIM:

import redis
import time

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM = "stream:orders"
GROUP = "mygroup"
CONSUMER = "consumer-1"
MIN_IDLE_MS = 60_000


def process(fields: dict) -> None:
    # logic xử lý message
    print(f"Processing: {fields}")


def recover_stale() -> None:
    """Scan PEL tìm message stale, claim và xử lý."""
    stale = r.xpending_range(
        STREAM, GROUP,
        min="-", max="+",
        count=100,
    )

    for entry in stale:
        # Filter thủ công vì Redis < 7.0 không có IDLE filter
        if entry["time_since_delivered"] < MIN_IDLE_MS:
            continue

        msg_id = entry["message_id"]
        claimed = r.xclaim(
            STREAM, GROUP, CONSUMER,
            min_idle_time=MIN_IDLE_MS,
            message_ids=[msg_id],
        )

        for cid, fields in claimed:
            try:
                process(fields)
                r.xack(STREAM, GROUP, cid)
            except Exception as exc:
                # Không XACK, message sẽ quay lại PEL
                # consumer khác (hoặc chính mình) sẽ claim lại sau
                print(f"Process failed for {cid}: {exc}")


# Chạy recovery định kỳ mỗi 30s
while True:
    recover_stale()
    time.sleep(30)

Nhược điểm của pattern này: cần viết thêm loop riêng, xử lý cursor thủ công nếu PEL lớn hơn count, và gọi 2 round-trip (XPENDING + XCLAIM) thay vì 1.

XAUTOCLAIM (Redis 6.2+) — Cursor-based Sweep

XAUTOCLAIM kết hợp XPENDING + XCLAIM vào một lệnh duy nhất, với cursor để xử lý PEL lớn mà không block server:

XAUTOCLAIM stream:orders mygroup consumer2 60000 0 COUNT 100
  • 60000: min-idle-time (ms).
  • 0: cursor bắt đầu (giống SCAN, dùng 0 để scan từ đầu).
  • COUNT 100: số message tối đa mỗi lần sweep (optional, default không giới hạn nhưng nên đặt).

Response gồm 3 phần:

  1. Next cursor: dùng cho lần gọi tiếp theo. Khi cursor trả về là "0-0" (hoặc chuỗi rỗng tùy version), toàn bộ PEL đã được scan.
  2. Claimed messages: danh sách [msg_id, fields] đã được claim thành công.
  3. Deleted IDs (Redis 7.0+): message ID đã bị xóa khỏi stream nhưng vẫn còn trong PEL (edge case khi XTRIM xảy ra trong lúc consumer đang xử lý).

Dùng redis-py

next_cursor, claimed_msgs, deleted_ids = r.xautoclaim(
    STREAM,
    GROUP,
    CONSUMER,
    min_idle_time=60_000,
    start_id="0",    # cursor
    count=100,
)

print(f"Next cursor: {next_cursor}")
print(f"Claimed: {claimed_msgs}")
print(f"Deleted (gone from stream): {deleted_ids}")

Khi cursor trả về là "0-0", toàn bộ PEL đã được scan trong vòng sweep này. Reset về "0" cho vòng tiếp theo.

Pattern Worker Tích Hợp XAUTOCLAIM

Worker thực tế kết hợp XAUTOCLAIM (recover stale) và XREADGROUP (đọc message mới) trong cùng một vòng lặp:

import redis
import time
import logging

log = logging.getLogger(__name__)

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM = "stream:orders"
GROUP = "mygroup"
CONSUMER = "consumer-1"
MIN_IDLE_MS = 60_000
AUTOCLAIM_INTERVAL = 30  # giây


def process(msg_id: str, fields: dict) -> None:
    """Xử lý message. Phải idempotent vì có thể bị claim lại."""
    # Ví dụ: ghi DB, gọi API...
    print(f"[{CONSUMER}] Processing {msg_id}: {fields}")


def handle_messages(entries: list) -> None:
    for msg_id, fields in entries:
        try:
            process(msg_id, fields)
            r.xack(STREAM, GROUP, msg_id)
        except Exception as exc:
            # Không XACK → message ở lại PEL
            # Sau min_idle_time, XAUTOCLAIM sẽ claim lại
            log.warning("Failed %s: %s", msg_id, exc)


def run_worker():
    autoclaim_cursor = "0"
    last_autoclaim = 0.0

    while True:
        now = time.time()

        # Chạy XAUTOCLAIM định kỳ (mỗi AUTOCLAIM_INTERVAL giây)
        if now - last_autoclaim >= AUTOCLAIM_INTERVAL:
            next_cursor, claimed, _ = r.xautoclaim(
                STREAM, GROUP, CONSUMER,
                min_idle_time=MIN_IDLE_MS,
                start_id=autoclaim_cursor,
                count=20,
            )

            if claimed:
                log.info("Autoclaimed %d stale messages", len(claimed))
                handle_messages(claimed)

            # Nếu cursor về "0-0", reset cho sweep tiếp theo
            if next_cursor == "0-0":
                autoclaim_cursor = "0"
                last_autoclaim = now  # chờ interval trước khi sweep lại
            else:
                autoclaim_cursor = next_cursor
                # Còn message trong PEL, tiếp tục sweep nhanh hơn
                last_autoclaim = now - (AUTOCLAIM_INTERVAL - 1)

        # Đọc message mới từ stream
        result = r.xreadgroup(
            GROUP, CONSUMER,
            {STREAM: ">"},
            count=10,
            block=5000,  # block tối đa 5s
        )

        if result:
            for _stream_name, entries in result:
                handle_messages(entries)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    run_worker()

Điểm quan trọng trong pattern này:

  • XAUTOCLAIM chỉ chạy mỗi 30s, không phải mỗi vòng lặp — tránh lãng phí CPU.
  • Cursor được giữ giữa các lần gọi để sweep toàn bộ PEL kể cả khi PEL lớn.
  • Handler phải idempotent: message có thể bị claim và xử lý nhiều lần nếu crash xảy ra đúng lúc claim nhưng trước XACK.
  • Exception không bị nuốt hoàn toàn — ghi log để monitor.

Delivery Count & Poison Message Detection

Mỗi lần XCLAIM hoặc XAUTOCLAIM, delivery count của message tăng thêm 1. Delivery count cũng tăng khi lần đầu deliver qua XREADGROUP.

Dùng XPENDING để xem delivery count:

XPENDING stream:orders mygroup - + 100
# Output per entry:
# 1) message_id
# 2) consumer_name
# 3) idle_time_ms
# 4) delivery_count  ← đây

Khi delivery count vượt ngưỡng, message rất có thể là poison message — message mà application không xử lý được do dữ liệu lỗi, schema mismatch, hoặc bug code. Tiếp tục retry sẽ loop mãi.

Detect poison message trong Python

MAX_RETRY = 3


def get_delivery_count(msg_id: str) -> int:
    """Tra delivery count của một message cụ thể."""
    pending = r.xpending_range(
        STREAM, GROUP,
        min=msg_id, max=msg_id,
        count=1,
    )
    if not pending:
        return 0
    return pending[0]["times_delivered"]


def handle_messages_with_dlq(entries: list) -> None:
    for msg_id, fields in entries:
        count = get_delivery_count(msg_id)

        if count > MAX_RETRY:
            # Poison message: đẩy sang Dead Letter Queue (bài 60)
            r.xadd("stream:orders:dlq", {"original_id": msg_id, **fields})
            r.xack(STREAM, GROUP, msg_id)
            log.warning("Poison message %s after %d attempts → DLQ", msg_id, count)
            continue

        try:
            process(msg_id, fields)
            r.xack(STREAM, GROUP, msg_id)
        except Exception as exc:
            log.warning("Failed %s (attempt %d): %s", msg_id, count, exc)

Lưu ý: mỗi lần gọi get_delivery_count là một XPENDING round-trip. Với volume cao, nên batch hoặc cache kết quả. Dead Letter Queue sẽ được đào sâu ở bài 60.

XCLAIM Options: JUSTID, IDLE, RETRYCOUNT

JUSTID — chỉ claim, không trả data

XCLAIM stream:orders mygroup consumer2 60000 1700000001000-0 JUSTID

Trả về chỉ danh sách message ID đã claim (không bao gồm fields). Dùng khi cần bulk claim rồi đọc data sau, hoặc đơn giản là tiết kiệm bandwidth khi fields lớn.

Lưu ý: JUSTID claim xong thì message đã thuộc PEL của consumer2 — phải xử lý và XACK, không được bỏ qua.

IDLE — đặt lại idle time sau claim

XCLAIM stream:orders mygroup consumer2 60000 1700000001000-0 IDLE 5000

Sau khi claim, idle time của message được set thành 5000ms thay vì reset về 0. Use case: cố ý trì hoãn retry sau một khoảng thời gian cụ thể (retry backoff thủ công).

RETRYCOUNT — đặt delivery count

XCLAIM stream:orders mygroup consumer2 60000 1700000001000-0 RETRYCOUNT 1

Override delivery count. Thường dùng khi manually reset message để retry từ đầu, hoặc trong testing. Trong production ít dùng — thay đổi delivery count có thể làm sai logic poison message detection.

Dedicated Reclaimer vs Inline Reclaim

Có hai cách tổ chức recovery trong hệ thống nhiều worker:

Inline reclaim (bài 8 đã dùng)

Mỗi worker tự chạy XAUTOCLAIM định kỳ trong vòng lặp chính của mình. Đơn giản, không cần deploy thêm process. Phù hợp khi số worker nhỏ (2–5).

Dedicated reclaimer

def reclaimer_process():
    """Process riêng chỉ làm nhiệm vụ sweep PEL."""
    cursor = "0"
    while True:
        next_cursor, claimed, _ = r.xautoclaim(
            STREAM, GROUP, "reclaimer",
            min_idle_time=MIN_IDLE_MS,
            start_id=cursor,
            count=50,
        )

        if claimed:
            log.info("Reclaimer swept %d messages", len(claimed))
            handle_messages(claimed)

        cursor = "0" if next_cursor == "0-0" else next_cursor

        if not claimed:
            time.sleep(15)  # Không có gì để claim, nghỉ

Ưu điểm: tách concern rõ ràng — worker chính chỉ lo xử lý message mới, reclaimer lo recovery. Dễ monitor riêng (metric reclaimer_claimed_total). Phù hợp khi worker pool lớn (20+ instance).

Nhược điểm: thêm một process cần deploy và monitor. Nếu reclaimer crash, cần tự recover (hoặc dùng supervisor).

Race Condition Giữa Nhiều Worker Reclaim

Nếu nhiều worker cùng chạy XAUTOCLAIM trên cùng stream/group, có race condition không?

Câu trả lời: không gây duplicate processing. XAUTOCLAIM là atomic: khi worker A claim message X, message X thuộc PEL của A. Worker B chạy XAUTOCLAIM tiếp theo sẽ không thấy X (hoặc thấy nhưng không thể claim vì idle time đã reset).

Tuy nhiên, nên tránh nhiều worker cùng chạy XAUTOCLAIM vì:

  • Mỗi XAUTOCLAIM là một round-trip tới Redis — nhiều worker = nhiều round-trip thừa.
  • Cursor không shared giữa worker — mỗi worker có cursor riêng, scan lại từ đầu.
  • Khó debug khi nhiều worker cùng claim và log.

Thực tế: 1 worker chạy XAUTOCLAIM là đủ cho hầu hết hệ thống. Nếu volume PEL lớn, dùng dedicated reclaimer.

Monitoring PEL

PEL size là metric quan trọng nhất để phát hiện sự cố consumer crash:

# Lấy pending count per consumer
info = r.xpending(STREAM, GROUP)
# info = {
#   'pending': 10000,       ← tổng
#   'min': '170000...-0',
#   'max': '170000...-9',
#   'consumers': [
#     {'name': 'consumer-1', 'pending': 10000},
#   ]
# }

total_pending = info["pending"]
per_consumer = {c["name"]: c["pending"] for c in info["consumers"]}

Export metric này ra Prometheus/Datadog. Alert khi:

  • PEL size tăng liên tục (consumer crash, không ai recover).
  • Một consumer có PEL lớn bất thường so với consumer khác (consumer đó bị treo hoặc crash).
  • Delivery count histogram cho thấy nhiều message bị retry > N lần (poison message tích lũy).

Dùng XINFO GROUPS để monitor group-level

groups = r.xinfo_groups(STREAM)
for g in groups:
    print(g["name"], "pending:", g["pel-count"])

pel-count trong XINFO GROUPS là tổng PEL size cho group đó. Đây là con số nên theo dõi thường xuyên.

Anti-patterns

  • Không có recovery: không dùng XCLAIM/XAUTOCLAIM sau khi triển khai consumer group. Message kẹt vĩnh viễn khi bất kỳ consumer nào crash.
  • min-idle-time quá nhỏ: vd 5s trong khi P99 processing là 20s. Kết quả: claim nhầm message đang xử lý → duplicate, data corruption nếu handler không idempotent.
  • XAUTOCLAIM mỗi 100ms: CPU waste, Redis phải scan PEL liên tục. XAUTOCLAIM 10–30s là đủ.
  • Không track delivery count: poison message bị retry vô hạn, chiếm PEL, làm sai metric "pending count".
  • XCLAIM JUSTID rồi không follow-up: claim message vào PEL của mình nhưng không xử lý, không XACK. Message kẹt trong PEL của mình thay vì consumer cũ.
  • Handler không idempotent: khi recovery claim lại message đã xử lý một phần, chạy lại handler có thể gây duplicate charge, duplicate email...

Best Practices

  • Đặt min-idle-time = P99(processing time) + buffer (thường x2). Đo P99 bằng histogram metric thực tế.
  • Chạy XAUTOCLAIM định kỳ mỗi 10–30s, không liên tục.
  • Track delivery count. Khi vượt ngưỡng (vd 3–5 lần), đẩy sang Dead Letter Queue thay vì retry.
  • Luôn viết handler idempotent — XAUTOCLAIM có thể deliver lại message đã xử lý một phần.
  • Monitor PEL size per consumer và alert khi tăng bất thường.
  • Với hệ thống nhiều worker, cân nhắc dedicated reclaimer process thay vì inline reclaim để tách concern và dễ monitor.
  • Đọc và handle deleted_ids trả về từ XAUTOCLAIM (Redis 7.0+): message đã xóa khỏi stream nhưng còn PEL cần XACK để clean up.

Tổng Kết & Quiz

XCLAIM và XAUTOCLAIM là cơ chế bắt buộc khi dùng Consumer Groups trong môi trường production. Consumer crash là chuyện bình thường (rolling deploy, OOM, network). Không có recovery loop đồng nghĩa với message mất.

XAUTOCLAIM (Redis 6.2+) đơn giản hóa đáng kể so với XPENDING + XCLAIM thủ công: một lệnh, cursor-based, atomic. Kết hợp với delivery count tracking, bạn có đủ công cụ để xây self-healing consumer group.

Quiz

  1. Consumer A nhận message lúc 10:00:00, process mất 45s, hoàn thành lúc 10:00:45 và XACK. Consumer B chạy XAUTOCLAIM với min-idle-time=30000 lúc 10:00:20. Điều gì xảy ra với message đó?
  2. Tại sao XCLAIM với min-idle-time nhỏ hơn thời gian xử lý thực tế lại nguy hiểm hơn đối với hệ thống không idempotent so với hệ thống idempotent?
  3. Delivery count của một message là 7, MAX_RETRY = 3. Bạn muốn retry thêm 3 lần từ đầu. Làm thế nào?
  4. Khi XAUTOCLAIM trả về cursor "0-0", điều đó có nghĩa là gì? Bước tiếp theo nên làm gì?
  5. XAUTOCLAIM claim được 0 message sau mỗi lần sweep. PEL size vẫn tăng dần. Nguyên nhân có thể là gì?

Đáp án gợi ý

  1. Message đã idle 20s lúc XAUTOCLAIM chạy, nhỏ hơn min-idle-time 30s → XAUTOCLAIM bỏ qua, không claim. Consumer A vẫn xử lý và XACK bình thường. min-idle-time bảo vệ message đang được xử lý đúng cách này.
  2. Hệ thống không idempotent: nếu claim nhầm message đang xử lý, hai consumer cùng xử lý một message → duplicate action (charge 2 lần, gửi email 2 lần, etc.). Hệ thống idempotent: cùng kịch bản, lần xử lý thứ hai phát hiện đã làm rồi → skip → chỉ tốn thêm latency, không gây lỗi.
  3. Dùng XCLAIM ... RETRYCOUNT 0 (hoặc 1) để reset delivery count về 0 (hoặc 1). Sau đó xử lý bình thường. Lưu ý: override delivery count nên dùng cẩn thận — nếu bug vẫn chưa fix, message sẽ fail lại và loop tiếp.
  4. Cursor "0-0" nghĩa là đã scan hết toàn bộ PEL trong sweep này. Reset cursor về "0" và chờ đến lần sweep tiếp theo (sau AUTOCLAIM_INTERVAL giây). Không cần gọi lại ngay lập tức.
  5. Nhiều khả năng: (a) consumer mới liên tục nhận và fail message mà không XACK — PEL tích lũy từ consumer đang chạy, không phải consumer dead, idle time còn thấp nên XAUTOCLAIM bỏ qua; (b) min-idle-time quá lớn so với thực tế; (c) reclaimer bản thân bị crash; (d) có poison message delivery count cao nhưng không được đẩy DLQ — liên tục được claim rồi fail, không accumulated nhưng PEL không giảm.

Bài tiếp theo

Bài 58 chuyển sang Delayed Jobs — cách dùng Sorted Set để schedule job chạy sau một khoảng thời gian, thay thế cho cron job hoặc external scheduler trong nhiều use case đơn giản.

Tham khảo