Mục lục
- Mục Tiêu Bài Học
- Bài Toán Priority Queue
- Pattern 1 — Multiple Streams
- Pattern 2 — Weighted Polling (Chống Starvation)
- Pattern 3 — Sorted Set Score
- So Sánh Ba Pattern
- Bài Toán Idempotent Jobs
- Kỹ Thuật 1 — Idempotency Key (SET NX)
- Kỹ Thuật 2 — DB Unique Constraint
- Kỹ Thuật 3 — Upsert & SET Thay INCR
- Kỹ Thuật 4 — Check Side-effect State
- Side-effect Ordering Trong Job Nhiều Bước
- Idempotent Wrapper — Context Manager
- Kết Hợp Priority + Idempotent Trong Production
- Anti-patterns & Best Practices
- Tổng Kết & Quiz
Mục Tiêu Bài Học
- Hiểu tại sao một single queue không đủ khi job có mức ưu tiên khác nhau.
- Triển khai được ba pattern priority queue: multiple streams, weighted polling, Sorted Set score — và biết khi nào dùng cái nào.
- Giải thích starvation là gì và tại sao weighted polling giải quyết được.
- Nắm bốn kỹ thuật idempotent job: SET NX, DB unique constraint, upsert/SET, check state.
- Viết được idempotent wrapper (context manager) bảo vệ side-effect, xử lý đúng trường hợp job fail.
- Biết quy tắc ordering side-effect trong job nhiều bước để retry không bị state lệch.
- Nhận diện các anti-pattern: idem key TTL ngắn hơn retry window, INCR thay SET, không delete idem key khi fail.
Bài Toán Priority Queue
Trong một hệ thống async processing thực tế, các job không có giá trị như nhau với người dùng và với SLA:
- Critical: xử lý thanh toán, gửi OTP, hoàn tiền. Delay hàng giây là vấn đề.
- Normal: gửi email xác nhận đơn hàng, thông báo push. Delay vài chục giây chấp nhận được.
- Low: tính toán analytics, tổng hợp báo cáo, index search. Delay vài phút không ảnh hưởng.
Nếu dùng một queue duy nhất theo FIFO, một đợt analytics job khổng lồ (low priority) có thể chặn thanh toán job phía sau trong hàng đợi. Priority queue giải quyết: worker luôn xử lý job critical trước, chỉ chuyển sang normal hoặc low khi không còn critical.
Redis cung cấp nhiều cách triển khai. Ba pattern phổ biến nhất phân tích dưới đây có trade-off khác nhau về FIFO, reliability (XACK), và starvation protection.
Pattern 1 — Multiple Streams
Mỗi priority level có một Redis Stream riêng: stream:critical, stream:normal, stream:low. Worker check critical trước bằng XREADGROUP, nếu có message thì xử lý và tiếp tục vòng lặp từ đầu (vẫn check critical). Chỉ khi critical rỗng mới fallback xuống normal, rồi low.
import redis
r = redis.Redis()
GROUP = "workers"
CONSUMER = "worker-1"
def worker_loop():
while True:
# Critical trước — block ngắn để không trì hoãn khi có message mới
msgs = r.xreadgroup(GROUP, CONSUMER, {"stream:critical": ">"}, count=10, block=100)
if msgs:
process(msgs)
continue
# Normal kế tiếp
msgs = r.xreadgroup(GROUP, CONSUMER, {"stream:normal": ">"}, count=10, block=100)
if msgs:
process(msgs)
continue
# Low cuối cùng — block dài hơn vì ít khẩn cấp
msgs = r.xreadgroup(GROUP, CONSUMER, {"stream:low": ">"}, count=10, block=5000)
if msgs:
process(msgs)
Điểm quan trọng: sau mỗi lần xử lý critical hoặc normal, vòng lặp continue về đầu — worker không bao giờ "lọt" xuống low khi còn critical. Low chỉ được phục vụ khi cả critical lẫn normal đều rỗng.
Trade-off: nếu luồng critical liên tục không dừng, low priority có thể starve — không được xử lý trong thời gian dài. Đây là điểm yếu cần giải quyết bằng pattern tiếp theo.
Pattern 2 — Weighted Polling (Chống Starvation)
Thay vì ưu tiên tuyệt đối (critical chặn toàn bộ), weighted polling phân bổ tỷ lệ xử lý: mỗi vòng lặp lấy tối đa 10 message critical, 5 normal, 1 low. Low priority vẫn luôn được xử lý — chỉ ít hơn — dù critical đông.
def weighted_worker():
# weight = số message tối đa lấy mỗi vòng
weights = [
("stream:critical", 10),
("stream:normal", 5),
("stream:low", 1),
]
while True:
for stream, weight in weights:
msgs = r.xreadgroup(
GROUP, CONSUMER,
{stream: ">"},
count=weight,
block=100,
)
if msgs:
process(msgs)
Với config trên, trong tình huống tất cả ba stream đều có message, tỷ lệ throughput là 10:5:1. Low vẫn được xử lý 1 message mỗi vòng — starvation không xảy ra. Điều chỉnh weight tuỳ SLA thực tế của hệ thống.
Weighted polling vẫn dùng Redis Streams và consumer group, nên giữ được tất cả tính chất reliability của XREADGROUP/XACK (bài 55–57): message không mất khi worker crash, PEL theo dõi message chưa ack, XAUTOCLAIM reclaim khi worker chết.
Pattern 3 — Sorted Set Score
Redis Sorted Set cho phép gán score cho mỗi member và pop theo thứ tự score. Ý tưởng: encode priority vào score, ZPOPMAX luôn lấy job có score cao nhất.
# Producer thêm job
ZADD jobs 100 '{"type":"payment","order_id":"A1"}' # critical
ZADD jobs 50 '{"type":"email","user_id":"U5"}' # normal
ZADD jobs 10 '{"type":"analytics","date":"2026-06-01"}' # low
# Worker lấy job priority cao nhất
ZPOPMAX jobs
def sorted_set_worker():
while True:
result = r.zpopmax("jobs", count=1)
if result:
payload_str, score = result[0]
payload = json.loads(payload_str)
process_job(payload)
else:
time.sleep(0.5) # không có block như XREADGROUP
Giới hạn của pattern này:
- Không FIFO trong cùng priority: hai job có cùng score 100 không có thứ tự đảm bảo — Redis lấy ngẫu nhiên trong tập score bằng nhau.
- Không có PEL / XACK:
ZPOPMAXxoá luôn message khỏi set. Nếu worker crash sau khi pop nhưng trước khi xử lý xong, message mất vĩnh viễn. Cần tự implement reliable delivery layer bên ngoài. - Không có blocking pop tích hợp: phải poll bằng sleep loop khi queue rỗng — tốn CPU và tăng latency.
Sorted Set phù hợp với use case đơn giản, ít yêu cầu reliability, hoặc khi priority score cần thay đổi động (ví dụ: job chờ lâu tự tăng score để tránh starvation — aging pattern).
So Sánh Ba Pattern
| Pattern | FIFO trong priority | Reliable (XACK/PEL) | Starvation protection | Phù hợp |
|---|---|---|---|---|
| Multiple streams (strict) | Có | Có | Không | Low priority không quan trọng lắm; traffic critical không liên tục |
| Weighted polling | Có | Có | Có | Production mặc định — cân bằng priority và fairness |
| Sorted Set score | Không (trong cùng score) | Hạn chế (không có PEL) | Không (trừ khi tự impl aging) | Use case đơn giản, priority score động |
Default cho production: multiple streams + weighted polling. Sorted Set chỉ dùng khi không cần FIFO nghiêm ngặt và có thể chấp nhận message mất khi worker crash.
Bài Toán Idempotent Jobs
Reliable queue (consumer group + PEL + XAUTOCLAIM) đảm bảo message được xử lý ít nhất một lần (at-least-once). Retry strategy (bài 59) và XAUTOCLAIM (bài 57) có thể deliver cùng một message nhiều lần — ví dụ worker A crash sau khi bắt đầu xử lý nhưng trước khi XACK, worker B reclaim và xử lý lại.
Nếu job có side-effect (gửi email, charge thẻ, gọi API bên thứ ba), at-least-once delivery đồng nghĩa side-effect có thể xảy ra nhiều lần:
- Payment job chạy 2 lần → charge 2 lần.
- Email job chạy 3 lần → người dùng nhận 3 email giống nhau.
- API call chạy 2 lần → tạo 2 resource trên third-party service.
Giải pháp: thiết kế job idempotent — chạy N lần có kết quả giống chạy 1 lần. Có bốn kỹ thuật chính, không loại trừ nhau.
Kỹ Thuật 1 — Idempotency Key (SET NX)
Trước khi thực hiện side-effect, dùng SET NX EX để "claim" quyền xử lý. Nếu key đã tồn tại (worker khác đã xử lý hoặc đang xử lý), skip. SET NX là atomic — không có race condition giữa check và set.
def send_email(message_id: str, recipient: str, body: str) -> None:
idem_key = f"job:done:{message_id}"
# SET NX: chỉ set nếu key chưa tồn tại
# EX 86400: TTL 24 giờ (phải > retry window)
claimed = r.set(idem_key, "1", nx=True, ex=86400)
if not claimed:
# Đã xử lý trước đó, skip
return
smtp.send(recipient, body)
Ba yếu tố cần đúng:
- Key đủ unique: dùng
message_idtừ payload, không dùng timestamp hay random — phải stable qua retry. - TTL > retry window: nếu retry window là 2 giờ, TTL phải ít nhất 3–4 giờ. TTL ngắn hơn → key expire trước khi hết retry → retry xử lý lại.
- SET NX trước side-effect: claim trước, execute sau. Đảo ngược (execute rồi mới set key) có race condition: execute xong nhưng crash trước khi set key → retry sẽ execute lại.
Điểm cần lưu ý thêm ở phần idempotent wrapper (mục 13): khi job fail sau khi đã SET key, cần delete key để cho phép retry.
Kỹ Thuật 2 — DB Unique Constraint
Database unique constraint là source of truth cho idempotency — bền hơn Redis key vì không expire, không mất khi Redis restart. Phù hợp cho các operation quan trọng như ghi nhận đơn hàng, transaction tài chính.
def process_order(order_id: str) -> None:
# ON CONFLICT DO NOTHING → idempotent tự nhiên
rows_inserted = db.execute("""
INSERT INTO processed_orders (order_id, processed_at)
VALUES (%s, NOW())
ON CONFLICT (order_id) DO NOTHING
RETURNING order_id
""", [order_id])
if not rows_inserted:
# order_id đã tồn tại → đã xử lý trước đó
return
# Lần đầu xử lý
send_confirmation_email(order_id)
update_inventory(order_id)
ON CONFLICT DO NOTHING RETURNING: nếu insert thành công → rows_inserted có dữ liệu; nếu conflict → empty. Đây là check nguyên tử trong DB transaction.
Có thể kết hợp cả hai: DB constraint là source of truth, Redis idem key là cache để tránh hit DB mỗi lần check. Redis check trước (nhanh); nếu miss (key expire hoặc cold cache), fall through xuống DB constraint.
Kỹ Thuật 3 — Upsert & SET Thay INCR
Một số operation tự nhiên idempotent nếu chọn đúng primitive. So sánh:
| Operation | Idempotent? | Ghi chú |
|---|---|---|
SET user:123:status "active" |
Có | Chạy 5 lần → kết quả vẫn là "active" |
INCR view_count |
Không | Retry 3 lần → count tăng 3 |
INSERT ... ON CONFLICT DO UPDATE SET status='active' |
Có | Upsert — chạy N lần cùng kết quả |
INSERT ... ON CONFLICT DO NOTHING + INCR |
Không | INCR vẫn không idempotent |
# Không idempotent: retry double-count
def track_view_bad(article_id: str) -> None:
r.incr(f"views:{article_id}") # retry → +2, +3...
# Idempotent: ghi giá trị tuyệt đối
def set_user_status(user_id: str, status: str) -> None:
r.set(f"user:{user_id}:status", status) # retry → không đổi
# Idempotent với INCR: cần idem key bảo vệ ngoài
def track_view_safe(article_id: str, request_id: str) -> None:
if r.set(f"view:done:{request_id}", "1", nx=True, ex=3600):
r.incr(f"views:{article_id}")
Khi thiết kế job, xem xét có thể thay INCR/APPEND bằng SET/UPDATE với giá trị tuyệt đối không. Nếu không thể (tổng hợp số liệu thực sự cần increment), bảo vệ bằng idem key bên ngoài.
Kỹ Thuật 4 — Check Side-effect State
Đôi khi trạng thái hiện tại của hệ thống đã đủ để biết side-effect đã xảy ra chưa, mà không cần lưu idem key riêng.
def send_otp(user_id: str) -> None:
# Kiểm tra OTP đã gửi trong vòng 60 giây chưa
last_sent_key = f"otp:sent:{user_id}"
if r.exists(last_sent_key):
return # đã gửi gần đây, skip
otp = generate_otp()
sms_gateway.send(user_id, otp)
# Ghi lại timestamp gửi gần nhất, TTL = cooldown
r.setex(last_sent_key, 60, "1")
def mark_notification_sent(notification_id: str) -> None:
# Check DB state — notification đã có trạng thái "sent" chưa
status = db.fetchone(
"SELECT status FROM notifications WHERE id = %s",
[notification_id]
)
if status and status["status"] == "sent":
return
# Send và cập nhật trạng thái
push_gateway.send(notification_id)
db.execute(
"UPDATE notifications SET status='sent', sent_at=NOW() WHERE id = %s",
[notification_id]
)
Pattern này phù hợp khi:
- Hệ thống đích (DB, cache) đã lưu trạng thái "đã làm" có thể query được.
- Cooldown check tự nhiên có ý nghĩa nghiệp vụ (OTP rate limiting).
Lưu ý race condition: giữa check state và execute có thể có worker khác chạy. Nếu cần strict once, kỹ thuật 1 (SET NX atomic) hoặc kỹ thuật 2 (DB constraint) an toàn hơn.
Side-effect Ordering Trong Job Nhiều Bước
Job thực tế thường có nhiều side-effect: ghi DB, gửi email, gọi API. Thứ tự thực hiện ảnh hưởng trực tiếp đến khả năng phục hồi khi retry.
Ví dụ job "hoàn thành đơn hàng" với ba bước:
- Update trạng thái đơn hàng trong DB (durable, có transaction, ON CONFLICT).
- Gửi email xác nhận cho khách (external, không rollback được).
- Gọi API cập nhật inventory partner (external).
def complete_order_job(order_id: str, job_id: str) -> None:
# Bước 1: DB update trước — durable, idempotent qua ON CONFLICT
rows = db.execute("""
UPDATE orders SET status='completed', completed_at=NOW()
WHERE id = %s AND status != 'completed'
RETURNING id
""", [order_id])
if not rows:
# Đã completed trước đó — skip toàn bộ
return
# Bước 2: Email — dùng idem key riêng
email_key = f"email:order_complete:{order_id}"
if r.set(email_key, "1", nx=True, ex=86400):
smtp.send_order_confirmation(order_id)
# Bước 3: API partner — dùng idem key riêng
api_key = f"api:inventory:{order_id}"
if r.set(api_key, "1", nx=True, ex=86400):
partner_api.update_inventory(order_id)
Nguyên tắc:
- DB operation đầu tiên: transaction rollback được, là source of truth cho "đã xử lý chưa".
- External side-effect sau: dùng idem key riêng cho từng bước. Nếu bước 2 fail, retry bỏ qua bước 1 (đã done) và chạy lại từ bước 2.
- Mỗi external step có idem key riêng: không dùng chung 1 key cho toàn job — nếu dùng chung, retry bỏ qua luôn cả bước chưa xong.
Idempotent Wrapper — Context Manager
Thay vì viết boilerplate SET NX / check / delete ở mỗi job, đóng gói thành context manager tái sử dụng. Điểm quan trọng nhất: khi job fail (exception), xoá idem key để retry có thể chạy lại.
from contextlib import contextmanager
class AlreadyProcessed(Exception):
pass
@contextmanager
def idempotent(key: str, ttl: int = 86400):
"""
Context manager đảm bảo block yield chạy đúng một lần.
- key: unique identifier cho job/operation.
- ttl: TTL (giây) — phải lớn hơn retry window.
Raise AlreadyProcessed nếu key đã tồn tại.
Xoá key nếu block yield raise exception (cho phép retry).
"""
full_key = f"idem:{key}"
claimed = r.set(full_key, "1", nx=True, ex=ttl)
if not claimed:
raise AlreadyProcessed(f"Already processed: {key}")
try:
yield
except Exception:
# Job fail → xoá key để retry có cơ hội chạy lại
r.delete(full_key)
raise
# Thành công → giữ key để chặn duplicate
def job_handler(payload: dict) -> None:
try:
with idempotent(payload["job_id"]):
do_work(payload)
except AlreadyProcessed:
pass # Đã xử lý — ack message và tiếp tục
Flow xử lý:
- Lần đầu, thành công: SET NX → claimed → yield → success → key giữ nguyên → duplicate bị block.
- Lần đầu, fail: SET NX → claimed → yield → exception → DELETE key → retry có thể chạy lại.
- Duplicate (retry sau success): SET NX → not claimed → AlreadyProcessed → handler bắt và ack message, không retry.
Lý do xoá key khi fail quan trọng: nếu không xoá, job fail nhưng key tồn tại → retry thấy AlreadyProcessed → skip → job không bao giờ hoàn thành → message tích lũy vào DLQ (bài 60).
Kết Hợp Priority + Idempotent Trong Production
Sơ đồ production queue đầy đủ kết hợp các kỹ thuật từ module 5:
STREAMS = [
("stream:critical", 10),
("stream:normal", 5),
("stream:low", 1),
]
def production_worker():
while True:
for stream, weight in STREAMS:
msgs = r.xreadgroup(GROUP, CONSUMER, {stream: ">"}, count=weight, block=100)
if not msgs:
continue
for _stream, entries in msgs:
for msg_id, fields in entries:
handle_message(stream, msg_id, fields)
def handle_message(stream: str, msg_id: bytes, fields: dict) -> None:
job_id = fields[b"job_id"].decode()
payload = json.loads(fields[b"payload"])
try:
with idempotent(job_id):
dispatch(payload) # thực hiện job
r.xack(stream, GROUP, msg_id) # ack sau khi thành công
except AlreadyProcessed:
r.xack(stream, GROUP, msg_id) # duplicate → ack, bỏ qua
except RetryableError as e:
# Retry với delay (bài 59) — không ack
schedule_retry(stream, msg_id, fields, e)
except Exception:
# Non-retryable → DLQ (bài 60)
move_to_dlq(stream, msg_id, fields)
r.xack(stream, GROUP, msg_id)
Các thành phần:
- Weighted polling → priority + starvation protection.
- Idempotent wrapper → mỗi job chỉ execute side-effect một lần.
- Retry strategy (bài 59) → retryable error có delay.
- DLQ (bài 60) → poison message không block queue chính.
- XAUTOCLAIM (bài 57) → reclaim message từ worker crash.
Anti-patterns & Best Practices
Anti-patterns
- Priority qua Sorted Set mong FIFO trong cùng score: Redis không guarantee thứ tự khi score bằng nhau. Nếu FIFO trong priority cần thiết, dùng Stream.
- Multiple streams strict không weighted → low starve: nếu critical stream liên tục không rỗng, low priority không được xử lý. Phải thêm weighted polling hoặc kiểm tra aging.
- Job không idempotent + at-least-once delivery: gửi email N lần, charge N lần. Mọi job có side-effect phải idempotent.
- Idem key TTL < retry window: key expire trước khi hết thời gian retry → retry chạy lại như lần đầu → duplicate. TTL phải > retry window, thêm buffer tối thiểu 2x.
- SET NX sau side-effect: execute side-effect rồi mới set idem key — nếu crash giữa chừng, retry sẽ execute lại. Phải SET NX trước.
- Không xoá idem key khi fail: job fail nhưng key còn → retry thấy AlreadyProcessed → skip → job không hoàn thành → message vào DLQ không đúng lý do.
- Dùng chung 1 idem key cho job nhiều bước: nếu bước 2 fail và retry, idem key tổng đã tồn tại → skip toàn bộ kể cả bước 2 chưa xong. Phải idem key riêng cho từng external step.
Best practices
- Default: multiple streams + weighted polling. Điều chỉnh weight theo SLA thực tế.
- Mọi job có external side-effect: dùng idempotent wrapper.
- DB unique constraint = source of truth; Redis idem key = performance cache.
- TTL idem key: ít nhất 2× retry window (bài 59).
- Delete idem key khi job fail — cho phép retry chạy lại.
- Đặt DB operation trước external side-effect trong job nhiều bước.
- Job ID phải stable qua retry — dùng field từ payload, không tạo mới mỗi lần.
Tổng Kết & Quiz
Bài này đi qua hai vấn đề bổ trợ nhau trong production queue:
- Priority queue: multiple streams với worker check theo thứ tự ưu tiên. Weighted polling (10:5:1) chống starvation. Sorted Set phù hợp use case đơn giản không cần FIFO nghiêm ngặt.
- Idempotent jobs: at-least-once delivery → phải idempotent. Bốn kỹ thuật: SET NX, DB constraint, upsert/SET, check state. Idempotent wrapper tự động claim và release khi fail.
- Production pattern: weighted polling + idempotent wrapper + retry strategy + DLQ tạo thành lớp bảo vệ hoàn chỉnh cho async job processing.
Quiz
- Tại sao strict multiple streams (không weighted) có thể gây starvation? Trong hoàn cảnh nào điều này chấp nhận được?
- Sorted Set score có FIFO trong cùng priority không? Giải thích tại sao.
- Idem key TTL được set là 1 giờ, retry window là 2 giờ. Điều gì xảy ra?
- Job A có 3 bước: DB update → gửi email → gọi API. Bước 2 fail. Retry nên bỏ qua bước nào và làm gì?
- Tại sao phải xoá idem key khi job fail thay vì giữ nguyên?
Đáp án gợi ý
- Strict multiple streams: worker luôn check critical trước, chỉ xuống low khi critical rỗng. Nếu critical stream liên tục có message, low không bao giờ được poll. Chấp nhận được khi low priority thực sự không quan trọng (analytics batch chạy ban đêm) và traffic critical không liên tục cao.
- Không. Redis Sorted Set chỉ sắp xếp theo score; các member có cùng score không có thứ tự xác định.
ZPOPMAXkhi nhiều member cùng score cao nhất sẽ trả về một trong số đó theo thứ tự nội bộ của Redis (thường là lexicographic của member value, không phải thứ tự insert). - Idem key expire sau 1 giờ nhưng retry window còn 2 giờ — trong giờ thứ 2, retry thấy key không tồn tại, coi là lần đầu và execute lại side-effect. Duplicate xảy ra dù đã có idem key.
- Retry kiểm tra DB state: bước 1 đã done (ON CONFLICT DO NOTHING → rows empty) → skip bước 1. Bước 2 (email): idem key
email:order:Xchưa có (bước 2 fail trước khi set key, hoặc đã delete) → chạy lại. Bước 3: tương tự bước 2. - Nếu giữ idem key khi fail, retry thấy key đã tồn tại → AlreadyProcessed → skip → job không bao giờ hoàn thành dù chưa được xử lý thực sự. Message tích lũy trong PEL, sau khi hết retry window → vào DLQ vì lý do sai (bị skip thay vì lỗi thực).
Bài tiếp theo
Bài 62 đi vào Distributed Workers — scale horizontal worker pool, phân phối tải giữa nhiều instance, và các vấn đề về consumer group management khi worker động.
