Mục lục
- Mục Tiêu Bài Học
- Bài Toán Fanout
- Fanout-on-Write (Push)
- Vấn Đề Với Hot Account
- Fanout-on-Read (Pull)
- Hybrid Pattern
- Async Fanout Qua Redis Streams
- Push Notification Mobile (APNs/FCM)
- Online/Offline — Pub/Sub Kết Hợp Inbox
- Persistent Inbox
- Dedup & Rate Limit Notification
- Counter Fanout & Fan-in
- Pub/Sub vs Streams Cho Fanout
- Anti-patterns & Best Practices
- Tổng Kết & Quiz
Mục Tiêu Bài Học
- Hiểu hai chiến lược fanout cơ bản — fanout-on-write và fanout-on-read — cùng trade-off latency/storage/query cost của từng loại.
- Biết khi nào áp dụng hybrid pattern và tại sao follower count là ngưỡng phân loại.
- Xây dựng được fanout-on-write với Redis pipeline, timeline capped, và notification queue.
- Thiết kế async fanout dùng Redis Streams với consumer group để tách write path.
- Kết hợp Pub/Sub (user online) với persistent inbox (user offline) đúng cách.
- Áp dụng dedup, rate limit notification per user, và idempotency key tránh push trùng.
- Nhận diện các anti-pattern phổ biến: sync fanout, celebrity write storm, thiếu cap timeline.
Bài Toán Fanout
Fanout là quá trình nhân bản 1 sự kiện ra nhiều điểm nhận. Ví dụ điển hình: user X đăng bài mới → tất cả người follow X đều nhận thông báo và bài đó xuất hiện trên timeline của họ.
Bài toán nghe đơn giản nhưng ẩn nhiều trade-off:
| Kịch bản | Số người nhận (M) | Vấn đề chủ yếu |
|---|---|---|
| User bình thường | 10–1 000 | Không đáng kể, pipeline Redis giải quyết được |
| Influencer nhỏ | 10 000–500 000 | Cần async, tốn thêm memory |
| Celebrity | 1M–100M | Fanout-on-write không thực tế |
Ba chiều trade-off chính:
- Latency đọc: fanout-on-write thấp (timeline sẵn sàng), fanout-on-read cao hơn (phải aggregate lúc đọc).
- Storage: fanout-on-write tốn M lần (mỗi follower 1 bản), fanout-on-read chỉ 1 bản.
- Write cost: fanout-on-write tốn M write, fanout-on-read chỉ 1 write nhưng read phức tạp hơn.
Fanout-on-Write (Push)
Khi X tạo post, hệ thống ngay lập tức đẩy post_id vào timeline của từng follower. Khi follower F mở feed, hệ thống chỉ cần LRANGE timeline:F 0 49 — read O(1) không cần join.
import redis
import json
r = redis.Redis()
def on_user_post(user_id: str, post_id: str):
followers = r.smembers(f"followers:{user_id}") # Set chứa follower IDs
pipe = r.pipeline()
for follower in followers:
fid = follower.decode()
# Đẩy post_id vào đầu timeline
pipe.lpush(f"timeline:{fid}", post_id)
# Giữ tối đa 1000 entry gần nhất (bài 22)
pipe.ltrim(f"timeline:{fid}", 0, 999)
# Đẩy notification vào inbox queue
notif = json.dumps({
"type": "new_post",
"from": user_id,
"post": post_id
})
pipe.lpush(f"notif:queue:{fid}", notif)
pipe.ltrim(f"notif:queue:{fid}", 0, 499) # cap 500 notification
pipe.execute()
Một pipeline gom tất cả lệnh vào 1 round-trip duy nhất đến Redis. Với 1 000 follower, lệnh execute() gửi ~3 000 lệnh Redis trong 1 request TCP — chi phí network cực thấp.
Đặc điểm:
- Latency đọc feed = O(1), không phụ thuộc số người follow.
- Write path tốn nhiều time và memory hơn khi M lớn.
- Phù hợp với user có M < 10 000.
Vấn Đề Với Hot Account
Giả sử celebrity C có 50M follower. Áp dụng fanout-on-write:
- 1 post → 50M lệnh
LPUSH+LTRIM→ khoảng 100M Redis commands. - Dù pipeline, Redis vẫn phải xử lý từng lệnh tuần tự. Ước tính Redis 7.x xử lý ~1M commands/s (single-threaded command loop) → mất ~100 giây để hoàn thành.
- Memory: nếu mỗi timeline lưu 1 000 entry (8 byte mỗi entry), 50M timeline = ~400GB. Riêng 1 bài post của C đã tạo 50M entry phân tán khắp nơi.
- Khi C post liên tục, backpressure queue fanout tích tụ, latency delivery tăng vọt.
Kết luận: fanout-on-write không khả thi cho celebrity account. Cần chiến lược khác.
Fanout-on-Read (Pull)
Với fanout-on-read, chỉ lưu 1 bản post gốc. Khi user F muốn xem feed, hệ thống tổng hợp (aggregate) posts từ tất cả account F đang follow.
-- Không cache: query naive
SELECT p.*
FROM posts p
WHERE p.author_id IN (
SELECT followed_id FROM follows WHERE follower_id = :me
)
ORDER BY p.created_at DESC
LIMIT 50;
Câu query này chậm khi F follow hàng trăm account và mỗi account có nhiều bài. Redis giúp ở hai điểm:
def get_feed_pull(user_id: str, limit: int = 50) -> list:
# 1. Lấy danh sách followed IDs từ cache (Set)
followed = r.smembers(f"follows:{user_id}")
all_posts = []
pipe = r.pipeline()
for uid in followed:
# 2. Lấy 10 post gần nhất của mỗi author từ ZSet
# score = timestamp, member = post_id
pipe.zrevrange(f"posts:by:{uid.decode()}", 0, 9, withscores=True)
results = pipe.execute()
for posts_with_score in results:
all_posts.extend(posts_with_score)
# 3. Sort theo score (timestamp) giảm dần, lấy top N
all_posts.sort(key=lambda x: x[1], reverse=True)
return [pid.decode() for pid, _ in all_posts[:limit]]
Mỗi author có 1 ZSet posts:by:{author_id} lưu post_id theo timestamp. Khi read feed, aggregate N ZSet lại. Với F follow 200 account, đó là 200 ZREVRANGE pipeline — nhanh, nhưng vẫn tốn hơn fanout-on-write O(1).
Đặc điểm:
- Write = O(1) — celebrity chỉ cần thêm vào ZSet của mình.
- Read = O(K) với K = số account đang follow — tăng theo số follow.
- Storage hiệu quả: không nhân bản post.
Hybrid Pattern
Twitter (X) đã công bố kiến trúc này: phân loại account theo follower count và áp dụng chiến lược khác nhau.
- Normal user (< 10 000 follower): fanout-on-write — timeline pre-built, read O(1).
- Celebrity (>= 10 000 follower): fanout-on-read — post lưu trong ZSet riêng.
Khi user F đọc feed:
- Lấy pre-built timeline (từ fanout-on-write của normal authors F đang follow).
- Lấy recent posts từ celebrity authors F đang follow (fanout-on-read).
- Merge + sort theo timestamp.
CELEBRITY_THRESHOLD = 10_000 # follower count
def get_timeline_hybrid(user_id: str) -> list:
# Pre-pushed posts từ normal authors
pushed = r.lrange(f"timeline:{user_id}", 0, 49)
# Celebrity authors user đang follow
celeb_ids = r.smembers(f"follows:celebrities:{user_id}")
celeb_posts = []
if celeb_ids:
pipe = r.pipeline()
for cid in celeb_ids:
# ZSet: member = post_id, score = timestamp
pipe.zrevrange(f"posts:by:{cid.decode()}", 0, 9, withscores=True)
results = pipe.execute()
for posts_with_score in results:
celeb_posts.extend(posts_with_score)
# Merge: pushed (post_id bytes) cần fetch timestamp từ metadata
# Simplified: giả sử pushed posts đã có timestamp trong payload
merged = merge_and_sort(pushed, celeb_posts)
return merged[:50]
def on_user_follow(follower_id: str, followed_id: str):
"""Cập nhật phân loại khi follow mới."""
follower_count = r.scard(f"followers:{followed_id}")
pipe = r.pipeline()
pipe.sadd(f"follows:{follower_id}", followed_id)
if follower_count >= CELEBRITY_THRESHOLD:
pipe.sadd(f"follows:celebrities:{follower_id}", followed_id)
pipe.execute()
Điểm quan trọng: follows:celebrities:{user_id} là Set chứa celebrity authors mà user đang follow. Set này cần được cập nhật khi follow/unfollow và khi một account vượt ngưỡng celebrity. Đây là phần phức tạp nhất của hybrid approach — quản lý trạng thái phân loại của account khi follower count thay đổi theo thời gian.
Async Fanout Qua Redis Streams
Dù chỉ có 1 000 follower, thực hiện fanout ngay trong request handler của user vẫn không nên: request đó phải chờ 1 000+ Redis commands hoàn thành trước khi trả response. Giải pháp: tách fanout ra worker bất đồng bộ qua Redis Streams (đã học ở bài 53–54).
# --- Producer (trong request handler) ---
def create_post(user_id: str, content: str) -> str:
post_id = save_post_to_db(user_id, content) # ghi vào DB chính
# Chỉ enqueue job, không fanout trực tiếp
r.xadd("fanout:queue", {
"type": "new_post",
"user_id": user_id,
"post_id": post_id,
})
return post_id # trả response ngay lập tức
# --- Worker (process riêng hoặc thread pool) ---
GROUP = "fanout-workers"
CONSUMER = "worker-1"
def setup_stream():
try:
r.xgroup_create("fanout:queue", GROUP, id="0", mkstream=True)
except redis.exceptions.ResponseError:
pass # group đã tồn tại
def fanout_worker():
setup_stream()
while True:
# Đọc tối đa 10 message mới từ group
messages = r.xreadgroup(
GROUP, CONSUMER,
{"fanout:queue": ">"},
count=10, block=2000
)
if not messages:
continue
for stream_name, entries in messages:
for entry_id, data in entries:
try:
do_fanout(data[b"user_id"].decode(), data[b"post_id"].decode())
r.xack("fanout:queue", GROUP, entry_id)
except Exception as e:
# Không ACK → message ở lại pending, retry sau
log_error(e)
def do_fanout(user_id: str, post_id: str):
followers = r.smembers(f"followers:{user_id}")
follower_count = len(followers)
if follower_count >= CELEBRITY_THRESHOLD:
# Celebrity: chỉ append vào ZSet, không push tới timeline
r.zadd(f"posts:by:{user_id}", {post_id: time.time()})
# Giữ 500 post gần nhất
r.zremrangebyrank(f"posts:by:{user_id}", 0, -501)
else:
# Normal: fanout-on-write
pipe = r.pipeline()
for follower in followers:
fid = follower.decode()
pipe.lpush(f"timeline:{fid}", post_id)
pipe.ltrim(f"timeline:{fid}", 0, 999)
pipe.execute()
Lợi ích của Streams so với List queue: message có ID định danh, consumer group cho phép nhiều worker song song, unacked message tự động retry khi worker crash (không mất message), và XPENDING để monitor lag.
Push Notification Mobile (APNs/FCM)
Fanout không chỉ là timeline update — còn bao gồm push notification tới thiết bị di động qua APNs (Apple) và FCM (Google). Workflow:
- Event xảy ra (new message, like, follow).
- Enqueue notification job với receiver list và payload.
- Notification worker batch-fetch device tokens từ DB.
- Gọi APNs/FCM API với batch payload.
- Xử lý response: mark invalid tokens, retry transient failure.
def enqueue_push_notification(
event_type: str,
source_id: str,
receiver_ids: list[str],
payload: dict
):
for user_id in receiver_ids:
# Idempotency key: tránh push trùng khi worker retry
idem_key = f"notif:sent:{user_id}:{event_type}:{source_id}"
if r.set(idem_key, 1, nx=True, ex=3600):
# nx=True: chỉ SET nếu key chưa tồn tại
r.xadd("push:queue", {
"user_id": user_id,
"event_type": event_type,
"source_id": source_id,
"payload": json.dumps(payload),
})
Idempotency key notif:sent:{user_id}:{event_type}:{source_id} với TTL 1 giờ đảm bảo: dù worker retry vì crash hay network lỗi, user vẫn chỉ nhận 1 push cho cùng 1 sự kiện. Nếu cần window dài hơn (ví dụ 24 giờ để dedup qua restart), tăng TTL tương ứng.
Online/Offline — Pub/Sub Kết Hợp Inbox
Pub/Sub (bài 65) là kênh fire-and-forget: message không persist, user offline sẽ mất. Để phục vụ cả hai trường hợp, cần kết hợp Pub/Sub với persistent inbox:
def deliver_notification(user_id: str, notification: dict):
payload = json.dumps(notification)
# Kiểm tra presence (bài 69): key tồn tại = user online
is_online = r.exists(f"presence:online:{user_id}")
if is_online:
# User đang kết nối WebSocket: deliver ngay qua Pub/Sub
# Channel per-user → Sharded Pub/Sub (bài 68) scale tốt
r.publish(f"notif:{user_id}", payload)
# Vẫn lưu inbox để user xem lại sau
_save_to_inbox(user_id, payload)
else:
# User offline: lưu inbox + enqueue push notification
_save_to_inbox(user_id, payload)
enqueue_push_notification(
notification["type"],
notification.get("source_id", ""),
[user_id],
notification
)
def _save_to_inbox(user_id: str, payload: str):
pipe = r.pipeline()
# ZSet: score = timestamp để sort, member = payload
score = time.time()
pipe.zadd(f"inbox:{user_id}", {payload: score})
# Cap 500 notification gần nhất
pipe.zremrangebyrank(f"inbox:{user_id}", 0, -501)
# TTL 30 ngày tính từ lần gần nhất có entry mới
pipe.expire(f"inbox:{user_id}", 30 * 24 * 3600)
pipe.execute()
Pattern này cũng phù hợp cho channel-based fanout (group chat, broadcast channel): Pub/Sub cho member online, inbox persist cho member offline. Khi member online trở lại, load inbox và đánh dấu đã đọc.
Persistent Inbox
Inbox là danh sách notification của từng user, cần đọc lại và đánh dấu đã xem. ZSet phù hợp hơn List vì cho phép query theo khoảng thời gian và sort tự nhiên theo score (timestamp).
def get_inbox(user_id: str, page: int = 0, page_size: int = 50) -> list[dict]:
"""Lấy notification gần nhất, phân trang bằng ZREVRANGE."""
start = page * page_size
end = start + page_size - 1
entries = r.zrevrange(f"inbox:{user_id}", start, end, withscores=True)
return [
{"data": json.loads(payload), "timestamp": score}
for payload, score in entries
]
def mark_read(user_id: str, before_timestamp: float):
"""Đánh dấu đã đọc tất cả notification trước mốc thời gian."""
# Lưu watermark "đã đọc đến đây"
r.set(f"inbox:read_at:{user_id}", before_timestamp, ex=30 * 24 * 3600)
def get_unread_count(user_id: str) -> int:
"""Đếm notification chưa đọc."""
read_at = r.get(f"inbox:read_at:{user_id}")
if not read_at:
return r.zcard(f"inbox:{user_id}")
# Đếm entry có score > read_at
return r.zcount(f"inbox:{user_id}", float(read_at), "+inf")
Dùng watermark inbox:read_at:{user_id} thay vì flag per-entry để tránh N lần write mỗi khi user mở inbox. Tradeoff: nếu user đọc entry cũ mà không đọc entry mới hơn, watermark không hoạt động đúng. Với UX thực tế (đọc từ mới đến cũ, "mark all as read"), watermark đủ chính xác.
Dedup & Rate Limit Notification
Dedup: cùng 1 sự kiện không được gửi 2 lần cho cùng 1 user. Nguyên nhân duplicate: worker retry sau crash, message delivered 2 lần do at-least-once semantics của Stream/queue. Dùng idempotency key với SET NX EX như đã thấy ở phần push notification.
Rate limit per user: người dùng bị spam notification nếu 1 bài post viral nhận 1000 like trong 1 phút. Cần giới hạn số notification gửi ra theo loại trong khoảng thời gian nhất định. Bài 36 đã trình bày sliding window rate limiter — áp dụng trực tiếp:
def should_send_notification(
user_id: str,
event_type: str,
limit: int = 20,
window_seconds: int = 3600
) -> bool:
"""Trả về False nếu user đã nhận >= limit notification loại này trong window."""
key = f"notif:rate:{user_id}:{event_type}"
now = time.time()
window_start = now - window_seconds
pipe = r.pipeline()
# Sliding window: remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Đếm trong window hiện tại
pipe.zcard(key)
# Thêm event mới
pipe.zadd(key, {str(now): now})
# TTL
pipe.expire(key, window_seconds)
_, count, _, _ = pipe.execute()
return count < limit # True = được gửi
Lưu ý: rate limit notification và rate limit API request (bài 36) cùng pattern nhưng khác key namespace và ngưỡng. Notification rate limit thường looser (20 notification/giờ) so với API rate limit (100 req/phút).
Counter Fanout & Fan-in
Không phải fanout nào cũng là "push notification". Like count, view count, follower count là trường hợp fanout counter: 1 action từ user → update 1 counter được nhiều người đọc.
# Like action: INCR atomic, không cần pipeline
def like_post(post_id: str, user_id: str):
# Dedup: tránh cùng user like nhiều lần
if r.sadd(f"likes:{post_id}", user_id): # sadd trả 1 nếu member mới
r.incr(f"like_count:{post_id}")
def unlike_post(post_id: str, user_id: str):
if r.srem(f"likes:{post_id}", user_id): # srem trả 1 nếu xóa được
r.decr(f"like_count:{post_id}")
Fan-in là chiều ngược: nhiều source → 1 counter. Ví dụ: 1M user concurrent view 1 video → cùng increment 1 counter. Vấn đề là contention: nhiều client cùng INCR trên 1 key không phải vấn đề với Redis (single-threaded command execution, mỗi INCR atomic), nhưng nếu counter cần sync về DB thường xuyên thì tạo nhiều write DB.
Sharded counter (bài 26) giải quyết khi cần giảm tần suất sync DB: dùng N key view_count:{post_id}:shard:{0..N-1}, mỗi request random chọn 1 shard. Aggregate bằng GET N shard. Counter tổng = sum của N shard. Sync DB chỉ cần 1 lần mỗi vài giây, gom tất cả shard.
Pub/Sub vs Streams Cho Fanout
| Tiêu chí | Pub/Sub | Streams |
|---|---|---|
| Persistence | Không — fire-and-forget | Có — message lưu đến khi trimmed |
| Delivery | At-most-once | At-least-once (với ACK) |
| Consumer group | Không | Có — nhiều worker song song |
| Retry | Không | Có — pending entries tự retry |
| Latency | Sub-millisecond | 1–5ms (thêm overhead XADD/XREAD) |
| Use case cho fanout | Realtime delivery tới user online | Durable fanout job queue |
Pattern kết hợp tối ưu:
- Streams làm fanout job queue: producer enqueue event, worker pool xử lý fanout bất đồng bộ, retry khi fail.
- Pub/Sub làm delivery channel tới WebSocket của user online: worker sau khi fanout xong sẽ publish lên channel per-user.
- Inbox ZSet làm storage cho user offline: worker song song lưu vào inbox.
Ba thành phần này phục vụ 3 nhu cầu khác nhau và không thể thay thế lẫn nhau.
Anti-patterns & Best Practices
Anti-patterns
- Sync fanout trong request handler: user post → handler chạy 1 000 LPUSH → response delay hàng trăm ms. Tách ra async worker ngay từ đầu.
- Fanout-on-write cho celebrity: như phân tích ở phần 4, 50M follower = 100M Redis commands từ 1 post. Dùng hybrid.
- Không dedup: worker retry sau crash → user nhận 2 push cho cùng 1 like. Luôn dùng idempotency key.
- Không cap timeline/inbox: power user có 10 000 post từ accounts họ follow → timeline List tăng vô hạn, consume RAM không giới hạn. LTRIM bắt buộc.
- Pub/Sub only cho notification: user offline mất tất cả notification. Phải kết hợp persistent inbox.
- KEYS để scan inbox: không dùng
KEYS notif:queue:*trên production, dùng SCAN iterator hoặc explicit key management.
Best Practices
- Phân loại account theo follower count, áp dụng hybrid push/pull tương ứng.
- Tất cả heavy fanout qua async queue (Streams) — tách write path của post khỏi fanout cost.
- Cap mọi List/ZSet per-user: timeline 1 000, inbox 500 notification.
- Idempotency key
(user, event_type, source_id)với TTL phù hợp. - Rate limit notification per user per event type (sliding window).
- Online → Pub/Sub realtime + lưu inbox. Offline → inbox + mobile push.
- Theo dõi Stream consumer lag (
XPENDING) để phát hiện worker bị tắc. - Webhook external: HTTP POST với exponential backoff + Dead Letter Queue cho failed delivery.
Tổng Kết & Quiz
Tóm tắt các điểm chính:
- Fanout-on-write: write cost O(M), read O(1). Tốt cho M nhỏ.
- Fanout-on-read: write O(1), read O(K) theo số account follow. Tốt cho celebrity.
- Hybrid: phân loại theo follower count, kết hợp cả hai chiến lược tại read time.
- Async qua Streams: tách write path, retry tự động, nhiều worker song song.
- Online → Pub/Sub. Offline → inbox + push. Không dùng Pub/Sub alone.
- Dedup bằng idempotency key
SET NX EX. Rate limit bằng sliding window ZSet.
Quiz
- User A có 500 follower, user B có 20M follower. Trong hybrid pattern, timeline đọc của 1 người follow cả A và B được tổng hợp từ nguồn dữ liệu nào?
- Tại sao
LTRIM timeline:{user_id} 0 999phải chạy ngay sau mỗiLPUSH, không phải theo batch định kỳ? - Worker fanout crash sau khi ghi 300/1000 timeline và chưa ACK message. Điều gì xảy ra khi worker khởi động lại với Streams consumer group?
- User nhận 2 push notification cho cùng 1 like. Nguyên nhân có thể là gì và fix bằng cơ chế nào?
- Tại sao không dùng Redis List mà dùng ZSet cho persistent inbox, dù List đơn giản hơn?
Đáp án gợi ý
- Hai nguồn: (1) timeline pre-built từ fanout-on-write của A (và các normal author khác), (2) ZSet
posts:by:Bđược pull trực tiếp lúc đọc. Merge + sort tại read time. - Nếu LTRIM chạy batch, trong khoảng giữa hai lần LTRIM, List có thể chứa nhiều hơn 1 000 entry — khi nhiều post liên tiếp được fanout vào. LTRIM ngay sau LPUSH đảm bảo List không bao giờ vượt ngưỡng, ngay cả trong burst.
- Message không được ACK → nằm trong
XPENDINGvới consumer worker-1. Khi worker khởi động lại, gọiXAUTOCLAIMhoặcXCLAIMđể nhận lại pending entries và xử lý tiếp từ đầu. Fanout có thể bị lặp cho 300 user đầu — cần idempotency tại timeline write (LPUSHduplicate là chấp nhận được vì LTRIM sẽ giữ post gần nhất; với notification queue cần check trùng). - Nguyên nhân: worker retry message (at-least-once) hoặc message được delivered 2 lần do queue glitch. Fix: idempotency key
SET notif:sent:{user}:{event_type}:{source_id} 1 NX EX 3600— chỉ enqueue push nếu SET thành công (key chưa tồn tại). - ZSet cho phép: query theo khoảng thời gian (
ZRANGEBYSCORE), đếm unread theo watermark timestamp (ZCOUNT inbox:{id} read_at +inf), và phân trang ngược từ mới đến cũ (ZREVRANGE). List chỉ có LRANGE theo index — không query theo timestamp, không đếm unread hiệu quả.
Bài tiếp theo
Bài 72 xây dựng Chat Room — kiến trúc kết hợp Streams, Pub/Sub, và presence tracking cho hệ thống chat có lịch sử message, trạng thái online, và scale multi-instance.
