Danh sách bài viết

Bài 62: Distributed Workers — Scale Horizontal

Khi 1 worker không đủ để xử lý lượng message trong queue, giải pháp cơ bản là chạy nhiều worker đồng thời trên nhiều process hoặc pod. Redis Streams consumer group là cơ chế native cho việc này: Redis tự phân phối message giữa các consumer active trong group mà không cần thêm service trung gian. Bài này đi qua tại sao cần scale ngang, consumer naming convention, toàn bộ worker lifecycle (startup recovery, main loop, graceful shutdown), health check cho Kubernetes, autoscaling dựa trên backlog metric với KEDA, capacity planning, và các anti-pattern dễ gặp khi vận hành nhiều worker.

01/06/2026
0 lượt xem

Mục tiêu bài học

  • Hiểu tại sao single worker tạo bottleneck và consumer group giải quyết bằng cách nào.
  • Biết chọn consumer naming strategy phù hợp với K8s deployment.
  • Viết được worker lifecycle hoàn chỉnh: startup recovery, main loop, graceful shutdown.
  • Biết cấu hình health check và autoscaling (KEDA) dựa backlog metric.
  • Hiểu các anti-pattern và hậu quả khi vận hành nhiều worker song song.

Vì sao scale horizontal

Single worker là điểm bottleneck rõ ràng nhất trong hệ thống async. Khi producer đẩy message nhanh hơn consumer xử lý, queue backlog tích lũy:

  • Latency tăng: message chờ lâu hơn trước khi được xử lý.
  • Memory Redis tăng: stream chứa nhiều entry hơn cần thiết.
  • Single point of failure: nếu worker chết, không có worker nào tiếp tục.

Thêm worker vào consumer group giải quyết cả ba vấn đề này mà không cần thay đổi producer hay cấu trúc stream. Redis tự phân phối message giữa các consumer active — không cần load balancer trung gian, không cần service discovery phức tạp.

Scale horizontal hiệu quả đến khi đụng giới hạn: số lượng worker vượt throughput Redis, hoặc message processing tốn CPU nhiều hơn I/O (lúc này scale CPU thay vì network). Bài này tập trung vào trường hợp phổ biến: I/O-bound worker và Redis là bottleneck phụ.

Consumer group là nền tảng scale

Consumer group (bài 56) là cơ chế Redis Streams cho phép nhiều consumer đọc từ cùng 1 stream mà mỗi message chỉ được giao cho 1 consumer duy nhất trong group. Đây là toàn bộ cơ sở hạ tầng cần thiết để scale.

Khi có N consumer active trong group:

  • Redis track last-delivered-id cho từng consumer.
  • XREADGROUP với > tự động giao message chưa delivered cho consumer đang gọi.
  • Redis không có "round-robin" cứng — consumer nào gọi XREADGROUP trước thì nhận message trước. Kết quả thực tế gần với load balance tự nhiên nếu tất cả worker có tốc độ xử lý tương đương.
  • Pending Entries List (PEL) track message đã giao nhưng chưa XACK cho mỗi consumer riêng biệt.

Thêm worker mới vào group không ảnh hưởng worker cũ — chỉ cần gọi XREADGROUP với consumer name mới. Xóa worker khỏi group không ảnh hưởng stream — PEL của consumer đó tồn tại cho đến khi được XAUTOCLAIM (bài 57) hoặc XGROUP DELCONSUMER.

# Kiểm tra consumer active trong group
XINFO CONSUMERS stream:orders order-processor

# Kết quả mẫu
1) name: worker-pod-abc:order-processor
   pending: 3
   idle: 1500   # ms từ lần cuối active
   inactive: 0

2) name: worker-pod-def:order-processor
   pending: 1
   idle: 800
   inactive: 0

Consumer naming convention

Consumer name trong Redis Streams là string tùy ý, nhưng quyết định này ảnh hưởng trực tiếp đến behavior của PEL khi worker restart.

Vấn đề cốt lõi: PEL gắn với consumer name. Nếu worker restart với consumer name khác (do random UUID), PEL cũ bị "mồ côi" — không có consumer nào claim chúng trừ khi XAUTOCLAIM dọn. Nếu worker restart với cùng consumer name, PEL persist và worker recover được message chưa xử lý.

Hai chiến lược phổ biến:

Chiến lược Consumer name PEL sau restart Phù hợp
Ephemeral (mỗi restart = mới) worker-{hostname}-{pid}-{uuid4()} PEL cũ mồ côi → cần XAUTOCLAIM Môi trường K8s (hostname đổi qua restart)
Stable (persist qua restart) {hostname}:{role} PEL persist → worker tự recover VM hoặc bare metal (hostname cố định)

Trong Kubernetes, pod hostname thay đổi mỗi lần restart (trừ StatefulSet với headless service). Phương án thực tế nhất là dùng ephemeral name kết hợp với XAUTOCLAIM để dọn PEL của consumer cũ sau một idle threshold:

import socket
import os
import uuid

# Ephemeral: mỗi restart = consumer name mới
CONSUMER = f"{socket.gethostname()}-{os.getpid()}-{uuid.uuid4().hex[:8]}"
# vd: worker-pod-abc-12345-f3a9b2c1

# Stable (nếu hostname cố định — StatefulSet, VM):
# CONSUMER = f"{socket.gethostname()}:{os.environ.get('WORKER_ROLE', 'default')}"
# vd: worker-pod-abc:order-processor

Quy tắc bắt buộc: consumer name phải unique trong group tại bất kỳ thời điểm nào. Hai worker cùng consumer name gây PEL conflict — cả hai nhận và ack cùng message, hoặc chỉ 1 worker nhận nhưng XACK thất bại vì consumer name sai.

Stable consumer name — trade-off

Stable name có lợi điểm rõ: worker restart cùng host có thể tự recover PEL mà không cần chờ XAUTOCLAIM. Nhưng có nhược điểm cần cân nhắc:

  • K8s Deployment: pod hostname thay đổi qua mỗi restart → "stable" về lý thuyết nhưng không stable về thực tế.
  • K8s StatefulSet: pod name cố định (worker-0, worker-1...) → stable name hoạt động tốt. Nhưng StatefulSet phức tạp hơn Deployment.
  • Scale down: nếu giảm replica từ 3 xuống 2, PEL của worker-2 bị mồ côi cho đến khi scale up lại hoặc XAUTOCLAIM dọn.

Với Deployment thông thường, dùng ephemeral name + XAUTOCLAIM là lựa chọn an toàn hơn. Với StatefulSet hoặc VM, stable name giúp recovery nhanh hơn sau restart có kế hoạch (deploy mới).

import socket, os

def make_consumer_name() -> str:
    """
    Với StatefulSet: POD_NAME env được inject tự động (worker-0, worker-1...).
    Với Deployment: fallback về ephemeral.
    """
    pod_name = os.environ.get("POD_NAME")  # K8s downward API
    role = os.environ.get("WORKER_ROLE", "default")
    if pod_name:
        return f"{pod_name}:{role}"
    # Fallback: ephemeral
    import uuid
    return f"{socket.gethostname()}-{os.getpid()}-{uuid.uuid4().hex[:8]}"

Worker lifecycle đầy đủ

Một worker production-ready trải qua 4 giai đoạn rõ ràng:

  1. Setup: tạo consumer group nếu chưa tồn tại (idempotent).
  2. Startup recovery: đọc PEL với ID "0" trước khi đọc message mới.
  3. Main loop: đọc message mới, xử lý, XACK. Định kỳ XAUTOCLAIM message stale.
  4. Graceful shutdown: dừng nhận mới, drain PEL, XGROUP DELCONSUMER.
import signal
import time
import redis

STREAM = "stream:orders"
GROUP = "order-processor"
CONSUMER = make_consumer_name()
BLOCK_MS = 5000       # block tối đa 5s nếu không có message
CLAIM_IDLE_MS = 60000 # claim message idle > 60s
MAX_PENDING = 10      # xử lý tối đa 10 pending mỗi lần recover

r = redis.Redis(decode_responses=True)
shutdown = False

def handle_sigterm(sig, frame):
    global shutdown
    shutdown = True

signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGINT, handle_sigterm)


def setup():
    """Tạo group nếu chưa có. mkstream=True tự tạo stream nếu chưa tồn tại."""
    try:
        r.xgroup_create(STREAM, GROUP, id="$", mkstream=True)
    except redis.exceptions.ResponseError as e:
        if "BUSYGROUP" not in str(e):
            raise  # lỗi khác, raise lại


def process_message(msg_id: str, data: dict):
    """Business logic. Raise exception nếu fail."""
    print(f"Processing {msg_id}: {data}")
    # ... thực tế: gọi API, ghi DB, publish event ...


def recover_pending():
    """
    Đọc PEL của chính consumer này với ID "0" để xử lý lại
    message đã nhận nhưng chưa XACK từ lần chạy trước
    (chỉ có ý nghĩa với stable consumer name).
    """
    start = "0"
    while True:
        results = r.xreadgroup(GROUP, CONSUMER, {STREAM: start},
                               count=MAX_PENDING)
        if not results:
            break
        _, messages = results[0]
        if not messages:
            break
        for msg_id, data in messages:
            try:
                process_message(msg_id, data)
                r.xack(STREAM, GROUP, msg_id)
            except Exception as e:
                print(f"Recover failed {msg_id}: {e}")
                # không XACK → message ở lại PEL để retry sau
        start = messages[-1][0]  # tiếp tục từ ID cuối cùng


def handle_new_messages():
    """Đọc message mới từ stream với ">" (chưa delivered cho ai trong group)."""
    results = r.xreadgroup(GROUP, CONSUMER, {STREAM: ">"}, count=10,
                           block=BLOCK_MS)
    if not results:
        return
    _, messages = results[0]
    for msg_id, data in messages:
        try:
            process_message(msg_id, data)
            r.xack(STREAM, GROUP, msg_id)
        except Exception as e:
            print(f"Failed {msg_id}: {e}")
            # không XACK → vào PEL, sẽ được retry/XAUTOCLAIM sau


def autoclaim_stale():
    """
    Nhận lại message đã idle > CLAIM_IDLE_MS từ consumer bất kỳ.
    Xử lý các message bị bỏ lại do worker khác chết hoặc crash.
    """
    result = r.xautoclaim(STREAM, GROUP, CONSUMER,
                          min_idle_time=CLAIM_IDLE_MS,
                          start_id="0-0",
                          count=5)
    # result = (next_start_id, [(msg_id, data), ...], [deleted_ids])
    _, claimed_messages, _ = result
    for msg_id, data in claimed_messages:
        try:
            process_message(msg_id, data)
            r.xack(STREAM, GROUP, msg_id)
        except Exception as e:
            print(f"Autoclaim processing failed {msg_id}: {e}")


def drain_pending():
    """
    Xử lý hết PEL của consumer hiện tại trước khi shutdown.
    Chỉ cần thiết với stable consumer name (nếu ephemeral, PEL sẽ
    được XAUTOCLAIM dọn bởi worker khác sau idle threshold).
    """
    start = "0"
    while True:
        results = r.xreadgroup(GROUP, CONSUMER, {STREAM: start}, count=10)
        if not results:
            break
        _, messages = results[0]
        if not messages:
            break
        for msg_id, data in messages:
            try:
                process_message(msg_id, data)
                r.xack(STREAM, GROUP, msg_id)
            except Exception as e:
                print(f"Drain failed {msg_id}: {e}")
        start = messages[-1][0]


def worker():
    setup()
    recover_pending()

    autoclaim_counter = 0
    while not shutdown:
        handle_new_messages()
        autoclaim_counter += 1
        if autoclaim_counter % 10 == 0:  # autoclaim mỗi 10 vòng lặp
            autoclaim_stale()

    # Graceful shutdown
    print(f"Shutting down {CONSUMER}...")
    drain_pending()
    # Xóa consumer khỏi group → PEL transfer về "" (không còn owner)
    # Worker khác sẽ XAUTOCLAIM sau
    r.xgroup_delconsumer(STREAM, GROUP, CONSUMER)
    print(f"{CONSUMER} shutdown complete.")


if __name__ == "__main__":
    worker()

Một vài điểm cần chú ý trong code trên:

  • recover_pending() dùng "0" thay vì ">" — đọc PEL của consumer này, không đọc message mới. Quan trọng với stable name.
  • handle_new_messages() dùng ">" — đọc message chưa được giao cho ai trong group.
  • autoclaim_stale() dọn PEL của các consumer đã chết — đây là XAUTOCLAIM từ bài 57, nhưng được gọi định kỳ trong worker loop.
  • Không XACK khi fail — message ở lại PEL để retry sau hoặc được XAUTOCLAIM bởi worker khác.

Graceful shutdown

Khi Kubernetes scale down hoặc deploy version mới, K8s gửi SIGTERM trước rồi chờ terminationGracePeriodSeconds trước khi SIGKILL. Worker cần xử lý SIGTERM đúng cách.

Luồng graceful shutdown:

  1. Nhận SIGTERM → set flag shutdown = True.
  2. Kết thúc lần xử lý hiện tại (không abort giữa chừng).
  3. Thoát khỏi main loop.
  4. Drain PEL — xử lý hết message đang pending trong PEL của consumer này.
  5. XGROUP DELCONSUMER — xóa consumer khỏi group. PEL còn lại (nếu có) sẽ được XAUTOCLAIM bởi worker khác sau idle threshold.

Với K8s Deployment và ephemeral consumer name: drain không có ý nghĩa nhiều vì consumer name mới sau restart sẽ không nhìn thấy PEL cũ. Quan trọng hơn là XGROUP DELCONSUMER — không để lại "ghost consumer" với PEL bị khóa lâu dài.

Tránh SIGKILL: SIGKILL không cho worker cơ hội cleanup. Message đang được xử lý sẽ ở lại PEL cho đến khi XAUTOCLAIM dọn sau idle threshold. Không phải lỗi nghiêm trọng nếu worker idempotent, nhưng tăng latency và có thể gây duplicate nếu message đã được xử lý một phần.

Cấu hình Kubernetes:

spec:
  template:
    spec:
      terminationGracePeriodSeconds: 60  # đủ thời gian drain
      containers:
        - name: worker
          lifecycle:
            preStop:
              exec:
                # K8s gọi preStop trước SIGTERM — thêm delay nhỏ nếu cần
                command: ["/bin/sh", "-c", "sleep 2"]

Health check

Kubernetes cần hai loại probe khác nhau:

  • Liveness probe: worker process còn sống không? Nếu fail → K8s restart pod.
  • Readiness probe: worker sẵn sàng nhận traffic/work chưa? Nếu fail → không route request đến pod này (quan trọng hơn với HTTP server; với queue worker, liveness đủ cho hầu hết trường hợp).

Cách đơn giản nhất: chạy HTTP health endpoint trên thread phụ (hoặc dùng exec probe).

import threading
from http.server import HTTPServer, BaseHTTPRequestHandler

redis_client = redis.Redis(decode_responses=True)
is_ready = False  # Set True sau khi setup() và recover_pending() xong


class HealthHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/healthz":
            # Liveness: kiểm tra Redis còn respond không
            try:
                redis_client.ping()
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"OK")
            except Exception:
                self.send_response(503)
                self.end_headers()
                self.wfile.write(b"Redis unreachable")
        elif self.path == "/readyz":
            # Readiness: worker đã setup và recovered PEL chưa
            if is_ready:
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"READY")
            else:
                self.send_response(503)
                self.end_headers()
                self.wfile.write(b"NOT READY")
        else:
            self.send_response(404)
            self.end_headers()

    def log_message(self, format, *args):
        pass  # tắt access log mặc định


def start_health_server(port: int = 8080):
    server = HTTPServer(("", port), HealthHandler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()


# Trong worker():
# start_health_server()
# setup()
# recover_pending()
# is_ready = True  ← set sau khi sẵn sàng
# while not shutdown: ...

Kubernetes probe config:

livenessProbe:
  httpGet:
    path: /healthz
    port: 8080
  initialDelaySeconds: 10
  periodSeconds: 15
  failureThreshold: 3

readinessProbe:
  httpGet:
    path: /readyz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 10
  failureThreshold: 2

Nếu không muốn HTTP server, dùng exec probe với command kiểm tra Redis ping:

livenessProbe:
  exec:
    command: ["redis-cli", "-h", "$(REDIS_HOST)", "ping"]
  periodSeconds: 15

Autoscaling — metric và KEDA

Thêm worker thủ công không hiệu quả khi load thay đổi theo giờ. Autoscaling dựa trên queue depth (backlog) là pattern chuẩn.

Metric cần theo dõi để quyết định scale:

  • XLEN stream:orders — tổng số entry trong stream (bao gồm cả đã processed nếu chưa XDEL/XTRIM).
  • Pending count của group: số message đã deliver nhưng chưa XACK.
  • Lag (backlog) = số message chưa được deliver cho group nào cả.

Cách tính lag thực từ Redis:

XINFO GROUPS stream:orders

# Kết quả quan trọng:
# "lag" → số message chưa deliver cho group này
# "pending" → message đã deliver nhưng chưa XACK
# "entries-read" → tổng message đã đọc

KEDA (Kubernetes Event-Driven Autoscaler) hỗ trợ Redis Streams scaler built-in từ KEDA v2.x. Không cần tự implement metric exporter:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: order-worker-scaler
spec:
  scaleTargetRef:
    name: order-worker  # tên Deployment
  minReplicaCount: 1
  maxReplicaCount: 20
  triggers:
    - type: redis-streams
      metadata:
        address: redis:6379
        stream: stream:orders
        consumerGroup: order-processor
        # Scale up khi lag > 10 message per worker
        lagCount: "10"

KEDA tính: nếu lag = 50 message và lagCount=10, target replica = 50/10 = 5. HPA sẽ scale Deployment lên 5 replica. Khi lag về 0, scale down về minReplicaCount.

Không dùng KEDA thì cần tự export metric (Prometheus custom metric hoặc external metric) và cấu hình K8s HPA với external metric source — phức tạp hơn nhiều.

Capacity planning

Trước khi set maxReplicaCount trong KEDA, cần biết 1 worker xử lý được bao nhiêu message/s. Benchmark đơn giản:

import time

def benchmark_worker(duration_s: int = 30):
    """
    Chạy worker trong duration_s giây và đếm message processed.
    """
    count = 0
    start = time.time()
    while time.time() - start < duration_s:
        results = r.xreadgroup(GROUP, CONSUMER, {STREAM: ">"}, count=10)
        if results:
            _, messages = results[0]
            for msg_id, data in messages:
                process_message(msg_id, data)
                r.xack(STREAM, GROUP, msg_id)
                count += 1
    elapsed = time.time() - start
    print(f"Throughput: {count/elapsed:.1f} messages/s")

Sau khi có per-worker throughput:

per_worker_throughput = 50 msg/s  (kết quả benchmark)
peak_producer_rate    = 800 msg/s (peak load)
safety_factor         = 1.3       (buffer 30%)

workers_needed = ceil(peak_producer_rate / per_worker_throughput * safety_factor)
               = ceil(800 / 50 * 1.3)
               = ceil(20.8)
               = 21

Đặt maxReplicaCount: 25 (thêm buffer cho failover). Nếu 1 worker chết trong lúc peak, 24 worker còn lại vẫn đủ throughput.

Giới hạn cần biết:

  • Redis maxclients default 10000. 100 worker × 1 connection = 100 — không đáng lo.
  • Nếu worker dùng connection pool (vd thêm thread), tính theo pool size.
  • Nhiều worker quá không tăng throughput vô hạn: Redis single-threaded command processing, I/O multiplexing, network bandwidth đều là giới hạn thực tế.

Deploy nhiều worker với Kubernetes

Cấu hình Deployment đầy đủ cho distributed worker:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-worker
  labels:
    app: order-worker
spec:
  replicas: 5
  selector:
    matchLabels:
      app: order-worker
  template:
    metadata:
      labels:
        app: order-worker
    spec:
      terminationGracePeriodSeconds: 60
      containers:
        - name: worker
          image: your-registry/order-worker:latest
          ports:
            - containerPort: 8080  # health check
          env:
            - name: REDIS_URL
              valueFrom:
                secretKeyRef:
                  name: redis-secret
                  key: url
            - name: WORKER_ROLE
              value: "order-processor"
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
          livenessProbe:
            httpGet:
              path: /healthz
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 15
          readinessProbe:
            httpGet:
              path: /readyz
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
          resources:
            requests:
              cpu: "100m"
              memory: "128Mi"
            limits:
              cpu: "500m"
              memory: "256Mi"

Với POD_NAME được inject qua Kubernetes Downward API, worker biết pod name của mình mà không cần hardcode. Dùng cho stable consumer name nếu cần.

Worker consume nhiều stream

XREADGROUP hỗ trợ đọc nhiều stream trong 1 lần gọi:

results = r.xreadgroup(
    GROUP, CONSUMER,
    {
        "stream:orders": ">",
        "stream:emails": ">",
        "stream:notifications": ">",
    },
    count=10,
    block=5000
)

for stream_name, messages in results:
    for msg_id, data in messages:
        if stream_name == "stream:orders":
            process_order(msg_id, data)
        elif stream_name == "stream:emails":
            process_email(msg_id, data)
        else:
            process_notification(msg_id, data)
        r.xack(stream_name, GROUP, msg_id)

1 worker consume nhiều stream phù hợp khi:

  • Các stream có throughput thấp và không cần worker chuyên biệt.
  • Muốn giảm số pod/process trong K8s.

Dedicated worker per stream phù hợp hơn khi:

  • Mỗi stream cần scale riêng (thay đổi replica count độc lập).
  • Processing logic khác nhau hoàn toàn — dễ maintain hơn khi tách riêng.
  • 1 stream có throughput cao, không muốn block stream kia.

Worker discovery — không cần service mesh

Một điểm quan trọng: không cần service discovery hay service mesh (Consul, Istio, Linkerd) để quản lý worker pool với Redis Streams.

Khi worker start lên, nó chỉ cần:

  1. Kết nối Redis.
  2. Gọi XGROUP CREATE (idempotent).
  3. Gọi XREADGROUP với consumer name của mình.

Redis tự động ghi nhận consumer này vào group và bắt đầu giao message. XINFO CONSUMERS trả về danh sách tất cả consumer đang active — đây là "service registry" tự động, không cần dịch vụ bên ngoài.

# "Registry" consumer hiện tại — chỉ cần 1 lệnh Redis
XINFO CONSUMERS stream:orders order-processor

# Xem group summary
XINFO GROUPS stream:orders

Consumer "inactive" (đã disconnect hoặc crash) vẫn xuất hiện trong XINFO CONSUMERS cho đến khi được XGROUP DELCONSUMER hoặc XAUTOCLAIM claim hết PEL của nó.

Connection management

Mỗi worker process cần ít nhất 1 persistent connection đến Redis (cho blocking XREADGROUP). Thêm 1 connection nếu có health check server dùng redis.ping() trên thread khác.

Với 100 worker pod:

  • 100 connections blocking XREADGROUP.
  • +100 connections health check (tùy implement).
  • Tổng ~200 connections. Redis maxclients default 10000 — không có vấn đề.

Connection pool với blocking XREADGROUP cần cẩn thận: XREADGROUP với block=5000 giữ connection trong 5s. Nếu pool size nhỏ, các query khác (XACK, XAUTOCLAIM) phải chờ. Tốt nhất là dùng connection riêng cho blocking read và connection pool nhỏ (2-3 connections) cho các lệnh còn lại:

import redis

# Connection cho blocking XREADGROUP (dedicated)
read_conn = redis.Redis.from_url(REDIS_URL, decode_responses=True)

# Connection pool nhỏ cho XACK, XAUTOCLAIM, ping
pool = redis.ConnectionPool.from_url(REDIS_URL, max_connections=3,
                                     decode_responses=True)
cmd_conn = redis.Redis(connection_pool=pool)

Worker per process vs per thread

Lựa chọn đơn vị concurrency ảnh hưởng đến số pod cần thiết và phức tạp của code:

Runtime Đơn vị scale Ghi chú
Python Process GIL ngăn multi-thread chạy CPU song song. 1 process = 1 CPU effective. Scale bằng nhiều process/pod.
Python asyncio Coroutine trong 1 process I/O concurrent cao trong 1 process. Phù hợp nếu processing là I/O-bound (HTTP call, DB write).
Go Goroutine / GOMAXPROCS 1 process dùng nhiều CPU core. Cần ít pod hơn Python để đạt cùng throughput CPU-bound.
Java/JVM Thread / virtual thread (Java 21+) 1 JVM process nhiều thread. Virtual thread (Project Loom) scale I/O tốt với ít overhead.

Với Python I/O-bound worker (gọi API ngoài, ghi DB): asyncio trong 1 process có thể xử lý hàng trăm message concurrent với 1 pod. Ví dụ asyncio với aioredis:

import asyncio
import redis.asyncio as aioredis

async def worker_async():
    r = aioredis.Redis.from_url(REDIS_URL, decode_responses=True)
    while not shutdown:
        results = await r.xreadgroup(GROUP, CONSUMER, {STREAM: ">"}, count=10,
                                     block=5000)
        if not results:
            continue
        _, messages = results[0]
        # Xử lý concurrent với asyncio.gather
        tasks = [process_message_async(r, msg_id, data)
                 for msg_id, data in messages]
        await asyncio.gather(*tasks, return_exceptions=True)

async def process_message_async(r, msg_id, data):
    try:
        await do_async_work(data)  # HTTP call, DB write, etc.
        await r.xack(STREAM, GROUP, msg_id)
    except Exception as e:
        print(f"Failed {msg_id}: {e}")

Monitoring distributed workers

Các metric quan trọng cần theo dõi khi chạy nhiều worker:

Metric Lệnh Redis Ý nghĩa
Active consumer count XINFO CONSUMERS stream GROUP Số worker đang active. Giảm đột ngột → worker đang crash.
Group lag XINFO GROUPS streamlag Message chưa deliver. Tăng liên tục → cần scale up.
Pending per consumer XINFO CONSUMERSpending Consumer nào pending nhiều → có thể bị stuck.
Consumer idle time XINFO CONSUMERSidle Idle cao bất thường → worker hung (loop block, deadlock).
Stream length XLEN stream Tổng entry. Kết hợp với lag để tính backlog thực tế.

Script monitoring đơn giản:

def report_worker_status():
    groups = r.xinfo_groups(STREAM)
    for group in groups:
        print(f"Group: {group['name']}")
        print(f"  Consumers: {group['consumers']}")
        print(f"  Pending: {group['pending']}")
        print(f"  Lag: {group.get('lag', 'N/A')}")

    consumers = r.xinfo_consumers(STREAM, GROUP)
    for c in consumers:
        print(f"  Consumer {c['name']}: "
              f"pending={c['pending']}, "
              f"idle={c['idle']}ms")

Anti-pattern

  • Consumer name không unique: hai worker dùng cùng consumer name → PEL conflict, một trong hai nhận message nhưng không thể XACK đúng, dẫn đến message bị xử lý 2 lần hoặc không được ack.
  • SIGKILL (không graceful): message đang xử lý kẹt trong PEL. Nếu worker không idempotent, có thể gây duplicate sau khi XAUTOCLAIM claim lại.
  • Không có health check: K8s không biết worker đang hung (process còn sống nhưng không consume được). Pod không bị restart, backlog tích lũy.
  • Quá nhiều worker vượt Redis capacity: throughput thực tế giảm do Redis phải phục vụ quá nhiều concurrent XREADGROUP. Benchmark trước khi set maxReplicaCount cao.
  • Worker không idempotent nhưng không xử lý duplicate: XAUTOCLAIM sau crash có thể giao lại message đã được xử lý một phần. Thiếu idempotency → data corruption.
  • Không xóa consumer khi scale down: ghost consumer với PEL bị lock làm XINFO CONSUMERS nhiễu, và XAUTOCLAIM phải chờ idle threshold mới dọn được.

Best practice

  • Unique consumer name: hostname + pid + uuid8 cho Deployment, pod_name + role cho StatefulSet.
  • Startup recovery: luôn đọc PEL với "0" khi startup, đặc biệt với stable consumer name.
  • Graceful shutdown: handle SIGTERM, drain PEL, XGROUP DELCONSUMER trước khi exit. Đặt terminationGracePeriodSeconds đủ lớn (60s là điểm khởi đầu hợp lý).
  • Health check: liveness probe với redis.ping(), readiness probe sau setup xong.
  • XAUTOCLAIM định kỳ: mỗi worker nên autoclaim message stale của consumer khác để dọn PEL orphan sau crash.
  • Autoscaling dựa backlog: dùng KEDA Redis Streams scaler thay vì scale theo CPU.
  • Monitor consumer activity: alert khi lag tăng liên tục, consumer idle bất thường, pending tích lũy.
  • Worker idempotent: thiết kế processing logic chịu được duplicate, đặc biệt khi có XAUTOCLAIM.

Bài tập

  1. Viết script Python chạy 3 worker goroutine (hoặc 3 process) đồng thời consume từ cùng 1 group. Đo tổng throughput so với 1 worker đơn lẻ trên cùng machine.
  2. Mô phỏng worker crash: chạy 2 worker, kill -9 worker 1 (không graceful), quan sát PEL của worker 1 qua XINFO CONSUMERS. Sau đó chạy XAUTOCLAIM từ worker 2 và verify message được xử lý.
  3. Giải thích: nếu 2 worker dùng cùng consumer name "worker-A", Redis sẽ hành xử như thế nào khi cả hai gọi XREADGROUP đồng thời? Điều gì xảy ra với PEL?
  4. Implement drain pattern cho graceful shutdown: worker nhận SIGTERM → dừng đọc message mới → xử lý hết PEL của mình → XGROUP DELCONSUMER → exit. Verify qua XINFO CONSUMERS rằng consumer đã bị xóa.

Đáp án gợi ý

  1. Với I/O-bound processing (sleep giả lập), throughput tăng gần tuyến tính với số worker (2 worker ≈ 2× throughput). Với CPU-bound processing trên Python (GIL), 3 process riêng biệt tốt hơn 3 thread trong 1 process.
  2. Sau kill -9, XINFO CONSUMERS cho thấy worker-1 vẫn tồn tại với pending > 0idle tăng dần. XAUTOCLAIM với min-idle-time=30000 claim lại message sau 30s idle. Sau claim thành công, pending của worker-1 về 0 (hoặc giảm).
  3. Redis không có khái niệm "unique consumer name enforcement". Nếu 2 process cùng gọi XREADGROUP với consumer name "worker-A", cả hai đều thành công về mặt lệnh. Redis treat chúng như 1 consumer duy nhất — PEL sẽ chứa message từ cả 2 process nhưng dưới cùng key, dẫn đến race condition: cả 2 có thể XACK cùng msg_id (OK, idempotent), hoặc 1 process xử lý xong và XACK trong khi process kia vẫn đang xử lý cùng message (duplicate processing).
  4. Kiểm tra qua: XPENDING stream:orders order-processor - + 10 worker-name → số pending về 0 sau drain. XINFO CONSUMERS → consumer đã không còn trong list sau DELCONSUMER.

Bài tiếp theo

Bài 63 so sánh Redis queue với Kafka, RabbitMQ và BullMQ — khi nào dùng cái nào, điểm mạnh và giới hạn của từng hệ thống.

Tham khảo