Danh sách bài viết

Bài 257: tokio::sync::broadcast Channel

Bài 257 của series Rust Cơ Bản — sau mpsc (nhiều producer, một consumer) và oneshot (đúng một message), bài này hoàn tất bộ ba channel tokio bằng tokio::sync::broadcast — pub/sub local nhiều-tới-nhiều. Tính chất quan trọng nhất: mỗi Receiver đang sống đều nhận được mọi message Sender phát ra (clone giá trị tới từng subscriber). Khác hẳn mpsc ở chỗ đó — mpsc mỗi message chỉ tới đúng một consumer. API ngắn: broadcast::channel(capacity) trả (Sender, Receiver), tx.subscribe() tạo receiver mới bất cứ lúc nào (kể cả runtime, sau khi đã phát vài message), rx.recv().await trả Result<T, RecvError> với hai variant: Closed (tất cả sender drop) và Lagged(n) (receiver xử lý chậm, n message cũ bị drop khỏi ring buffer). Use case kinh điển: shutdown signal fan-out từ main task tới N worker, event bus trong process, broadcast tin nhắn chat trong room. Bài này demo basic 1 tx + 3 receiver, subscribe runtime, xử lý Lagged đúng cách, pattern shutdown, và bảng so sánh mpsc vs broadcast vs watch.

10/06/2026
10 phút đọc
1 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu tokio::sync::broadcast là channel pub/sub — nhiều sender, nhiều receiver, mỗi receiver nhận mọi message (khác hẳn mpsc nơi mỗi message chỉ tới một consumer).
  • Dùng được API: broadcast::channel(capacity) tạo cặp (Sender, Receiver), tx.subscribe() tạo receiver mới sống cùng channel.
  • Đọc đúng kiểu trả về của rx.recv().await: Result<T, RecvError> với hai variant ClosedLagged(u64) — và biết xử lý mỗi variant.
  • Hiểu cơ chế ring buffer capacity: receiver chậm bị lag, message cũ bị drop, không block tx — đây là backpressure khác hẳn mpsc bounded.
  • Viết được idiom shutdown signal: 1 broadcast tx fan-out tới N worker, mỗi worker nghe stop bằng rx.recv().
  • Phân biệt khi nào dùng mpsc (work distribution), broadcast (fan-out event), watch (latest-value snapshot).
2

broadcast Là Gì — Many-To-Many, Mỗi Receiver Nhận Mọi Message

Nhớ lại Bài 255 — mpsc: nhiều producer, một consumer. Mỗi message tx gửi đi sẽ tới đúng một rx (FIFO theo thứ tự gửi). Đây là pattern work distribution — chia việc.

broadcast đảo ngược: nhiều producer, nhiều consumer. Một message tx gửi được nhân bản tới tất cả Receiver đang sống. Đây là pattern event fan-out — phát quảng bá.

Hệ quả quan trọng:

  • Type T truyền qua broadcast phải implement Clone — vì cần copy ra cho mỗi receiver. String, Arc<T>, primitive đều OK. File, TcpStream thì không.
  • Receiver join channel muộn (sau khi đã có message phát ra) chỉ thấy message từ thời điểm subscribe trở đi, không đọc lại lịch sử.
  • Không có khái niệm "message đã được nhận hết" như mpsc — message ở lại trong ring buffer đến khi bị đẩy ra bởi message mới hơn.
  • Channel đóng khi tất cả Sender drop — giống mpsc. Lúc đó receiver còn lại nhận hết buffer rồi recv() trả Err(Closed).

Hình dung trực quan: tx như đài phát thanh, rx như các máy thu — bật máy nào lên cũng nghe được sóng, bật muộn thì lỡ phần đầu, máy nào nghe chậm thì lỡ nhiều phần ở giữa (lag).

3

API: broadcast::channel(capacity)

Khai báo channel với capacity bắt buộc (broadcast không có biến thể unbounded — vì ring buffer cố định là một phần thiết kế):

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    // Capacity 16 — ring buffer giữ tối đa 16 message
    let (tx, mut rx1) = broadcast::channel::<String>(16);
    let mut rx2 = tx.subscribe();
    let mut rx3 = tx.subscribe();

    // Spawn 3 receiver, mỗi receiver in mọi message nhận được
    let r1 = tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("rx1 nhận: {}", msg);
        }
    });
    let r2 = tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("rx2 nhận: {}", msg);
        }
    });
    let r3 = tokio::spawn(async move {
        while let Ok(msg) = rx3.recv().await {
            println!("rx3 nhận: {}", msg);
        }
    });

    // Producer ở main task — gửi 3 message
    for i in 0..3 {
        tx.send(format!("event-{}", i)).unwrap();
    }
    drop(tx); // Đóng channel để 3 receiver thoát loop

    let _ = tokio::join!(r1, r2, r3);
}
// Mỗi rx in đủ 3 message — tổng 9 dòng (event-0/1/2 × 3 receiver)

Vài chi tiết cú pháp đáng nhớ:

  • broadcast::channel::<T>(cap) trả tuple (Sender<T>, Receiver<T>) — luôn có sẵn 1 receiver đầu tiên.
  • tx.send(value) trả Result<usize, SendError>usizesố receiver đang sống nhận được message. Lỗi SendError chỉ khi không còn receiver nào.
  • tx.send không phải async (không .await) — vì ring buffer overwrite chứ không chờ.
  • Sender::clone() cho phép có nhiều producer giống mpsc; receiver được tạo qua tx.subscribe(), không phải rx.clone().

Lưu ý cú pháp: dùng async move trong tokio::spawn để closure move các rx vào task; nếu quên move sẽ gặp lỗi mượn rx vượt scope, rust-analyzer báo đỏ ngay.

4

tx.subscribe() — Tạo Receiver Mới Bất Cứ Lúc Nào

Đặc điểm rất hay của broadcast: có thể subscribe runtime — tạo Receiver mới ngay cả khi tx đã phát vài message. Receiver mới chỉ thấy message kể từ thời điểm subscribe trở đi, không đọc lại lịch sử:

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel::<u32>(16);

    // Phát 2 message đầu khi mới chỉ có rx1
    tx.send(100).unwrap();
    tx.send(101).unwrap();

    // rx2 subscribe SAU 2 message trên — rx2 không thấy 100, 101
    let mut rx2 = tx.subscribe();

    tx.send(200).unwrap(); // cả rx1 và rx2 đều nhận

    println!("rx1: {:?}", rx1.recv().await); // Ok(100)
    println!("rx1: {:?}", rx1.recv().await); // Ok(101)
    println!("rx1: {:?}", rx1.recv().await); // Ok(200)

    println!("rx2: {:?}", rx2.recv().await); // Ok(200) — không có 100/101
}

Use case thực tế của subscribe runtime:

  • WebSocket chat room: mỗi user vào room thì subscribe — chỉ nhận tin nhắn mới sau khi join. Tin nhắn cũ load qua HTTP API riêng.
  • SSE/Server-Sent Events: mỗi client connect thì subscribe vào event stream chung; client connect muộn không cần biết event cũ.
  • Spawn worker theo demand: worker pool elastic, mỗi worker mới spawn thì subscribe để nhận command từ controller chung.

Pitfall: nếu code subscribe rồi mới spawn task chứa receiver, có thể race — message được tx phát ra giữa hai bước này sẽ bị rx mới subscribe nhận luôn (vì đã có trong buffer kể từ thời điểm subscribe). Thường không phải vấn đề, nhưng đáng để ý nếu cần "bắt đầu sạch".

5

recv() Trả Result<T, RecvError>: Lagged, Closed

Khác mpsc (recv -> Option<T>), broadcast trả Result<T, RecvError>. RecvError có đúng hai variant:

  • RecvError::Closed — tất cả Sender đã drop, buffer đã rỗng. Receiver biết "không còn ai gửi nữa", giống None của mpsc. Idiom: thoát loop.
  • RecvError::Lagged(u64) — receiver xử lý chậm, n message cũ đã bị overwrite khỏi ring buffer trước khi receiver kịp đọc. Không đóng channel — recv tiếp theo sẽ trả message mới nhất còn trong buffer.

Pattern xử lý đúng (đừng vội while let Ok):

use tokio::sync::broadcast::{self, error::RecvError};

async fn consume(mut rx: broadcast::Receiver<String>) {
    loop {
        match rx.recv().await {
            Ok(msg) => {
                println!("nhận: {}", msg);
            }
            Err(RecvError::Lagged(n)) => {
                // Receiver bị chậm, n message đã bị drop.
                // KHÔNG break — channel vẫn sống, tiếp tục đọc.
                eprintln!("lag: bỏ qua {} message", n);
                continue;
            }
            Err(RecvError::Closed) => {
                println!("channel đóng, kết thúc");
                break;
            }
        }
    }
}

Tại sao tách hai variant: Closedterminal (channel chết, thoát), còn Laggedcảnh báo (vẫn còn data, chỉ vừa mất một đoạn). Dùng while let Ok(msg) = rx.recv().await như mpsc sẽ thoát luôn khi gặp Lagged — đây là bug phổ biến nhất khi mới dùng broadcast.

Khi đo throughput, log số Lagged(n) là tín hiệu cần tăng capacity hoặc tối ưu consumer — không phải lỗi cần panic.

6

Backpressure: Receiver Chậm Bị Lag (Drop Message)

Triết lý backpressure của broadcast khác hẳn mpsc:

  • mpsc bounded: receiver chậm → buffer đầy → send().await chờ, sender bị chậm theo (chuyển áp lực ngược lại producer).
  • broadcast: receiver chậm → ring buffer overwrite → message cũ bị drop, sender không bao giờ chờ. Áp lực chuyển sang receiver chậm dưới dạng Lagged error.

Quyết định thiết kế này hợp lý: nếu broadcast cũng chờ thì một receiver chậm sẽ làm tất cả receiver còn lại bị chậm theo — không chấp nhận được cho event fan-out. Thay vào đó, broadcast hi sinh receiver chậm để bảo vệ throughput chung.

Ví dụ minh hoạ rõ:

use tokio::sync::broadcast::{self, error::RecvError};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Capacity nhỏ để dễ thấy lag
    let (tx, mut rx_slow) = broadcast::channel::<u32>(4);
    let mut rx_fast = tx.subscribe();

    // Fast consumer — đọc ngay, không bao giờ lag
    tokio::spawn(async move {
        loop {
            match rx_fast.recv().await {
                Ok(v) => println!("FAST {}", v),
                Err(RecvError::Closed) => break,
                Err(RecvError::Lagged(_)) => continue,
            }
        }
    });

    // Slow consumer — sleep 50ms mỗi message, nhanh chóng bị lag
    tokio::spawn(async move {
        loop {
            match rx_slow.recv().await {
                Ok(v) => {
                    println!("SLOW {}", v);
                    sleep(Duration::from_millis(50)).await;
                }
                Err(RecvError::Closed) => break,
                Err(RecvError::Lagged(n)) => {
                    eprintln!("SLOW bị lag mất {} message", n);
                    continue;
                }
            }
        }
    });

    // Producer phát 20 message liên tục, không sleep
    for i in 0..20 {
        let _ = tx.send(i);
    }

    sleep(Duration::from_secs(2)).await;
}
// FAST in đủ 0..19. SLOW in vài message rồi liên tục "lag mất N message"

Chiến lược chỉnh:

  • Tăng capacity nếu burst ngắn — đủ chỗ chứa peak. Nhưng cost RAM = capacity * size_of::<T>().
  • Move work nặng ra task riêng bằng tokio::spawn bên trong receiver — giữ recv loop "nhẹ" để theo kịp sender.
  • Nếu lag là không chấp nhận (loss = bug), dùng mpsc bounded thay vì broadcast.
7

Use Case: Pub/Sub Local Và Shutdown Signal

Hai use case quan trọng nhất:

1. Event bus / pub/sub trong process: thay vì chạy Redis pub/sub riêng cho hệ thống nhỏ, broadcast cho phép phát event cùng process giữa các module — order created, cache invalidated, config reloaded. Mỗi subscriber tự quyết định xử lý gì.

2. Shutdown signal: pattern phổ biến nhất. Một tx broadcast fan-out tín hiệu "dừng" tới N worker; mỗi worker đang chạy loop chính kết hợp tokio::select! (sẽ học ở Bài 258) hoặc check shutdown_rx.recv() để tự thoát sạch.

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};

async fn worker(id: u32, mut shutdown: broadcast::Receiver<()>) {
    loop {
        tokio::select! {
            _ = sleep(Duration::from_millis(500)) => {
                println!("worker {} đang làm việc...", id);
            }
            _ = shutdown.recv() => {
                println!("worker {} nhận shutdown, dọn dẹp rồi thoát", id);
                // Đóng file, flush queue, ack pending job...
                break;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    // Capacity 1 đủ — message shutdown chỉ gửi 1 lần
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    let mut handles = Vec::new();
    for i in 0..4 {
        let rx = shutdown_tx.subscribe(); // mỗi worker 1 receiver riêng
        handles.push(tokio::spawn(worker(i, rx)));
    }

    // Cho worker chạy 2 giây
    sleep(Duration::from_secs(2)).await;

    // Phát shutdown — TẤT CẢ 4 worker đều nhận
    println!("main: gửi shutdown");
    let _ = shutdown_tx.send(());

    // Chờ worker dọn xong
    for h in handles {
        let _ = h.await;
    }
    println!("main: tất cả worker đã thoát");
}

Vì sao broadcast hợp với shutdown hơn mpsc: cần fan-out 1 tín hiệu tới N consumer cùng lúc — đúng định nghĩa broadcast. Dùng mpsc phải gửi N message (sender không biết có bao nhiêu worker), hoặc dùng Notify nhưng không gắn được payload (vd lý do shutdown). Pattern broadcast + select! là idiom chuẩn cho graceful shutdown trong tokio.

8

So Sánh mpsc vs broadcast vs watch

Tokio cung cấp bộ ba channel sync giữa task, mỗi loại tối ưu cho một pattern khác nhau. Bảng so sánh nhanh:

┌────────────┬─────────────┬─────────────┬──────────────────────┬───────────────────┬────────────────────────┐
│  Channel   │   Sender    │  Receiver   │   Mỗi msg tới ai     │  Backpressure     │   Use case chính       │
├────────────┼─────────────┼─────────────┼──────────────────────┼───────────────────┼────────────────────────┤
│   mpsc     │  N (clone)  │     1       │  đúng 1 receiver     │ send().await chờ  │ work distribution      │
│            │             │             │  (FIFO)              │ (bounded)         │ worker pool            │
├────────────┼─────────────┼─────────────┼──────────────────────┼───────────────────┼────────────────────────┤
│ broadcast  │  N (clone)  │ N(subscribe)│  TẤT CẢ receiver     │ drop msg cũ       │ event fan-out          │
│            │             │             │  đang sống           │ (Lagged error)    │ pub/sub, shutdown      │
├────────────┼─────────────┼─────────────┼──────────────────────┼───────────────────┼────────────────────────┤
│   watch    │  N (clone)  │ N(subscribe)│  giá trị MỚI NHẤT    │ overwrite ngay    │ config/state snapshot  │
│            │             │             │  (không lịch sử)     │ (không có lag)    │ shared cache flag      │
├────────────┼─────────────┼─────────────┼──────────────────────┼───────────────────┼────────────────────────┤
│  oneshot   │     1       │     1       │  đúng 1 message      │ N/A               │ task spawn + return    │
│            │ (consumed)  │             │                      │                   │ single result          │
└────────────┴─────────────┴─────────────┴──────────────────────┴───────────────────┴────────────────────────┘

Quy tắc chọn:

  • mpsc: cần chia việc — mỗi job xử lý đúng 1 lần. Worker pool, job queue.
  • broadcast: cần thông báo — mọi subscriber đều phải biết. Shutdown, event bus, chat room.
  • watch: cần chia sẻ state — chỉ care giá trị hiện tại, không care lịch sử. Config reload, current theme, latest sensor reading.
  • oneshot: cần chờ kết quả — spawn task tính toán xong trả về 1 lần. Request/response cục bộ.

Nguyên tắc thiết kế: chọn channel khớp với semantics của bài toán, không "tạm dùng" loại sai rồi chữa cháy bằng logic ngoài (ví dụ broadcast cho work distribution sẽ khiến mỗi job chạy N lần — bug nghiêm trọng).

9

Tổng Kết

  • tokio::sync::broadcast là channel many-to-many: nhiều sender, nhiều receiver, mỗi receiver nhận mọi message. Type T phải Clone.
  • API: broadcast::channel(capacity) trả (Sender, Receiver) đầu tiên. tx.subscribe() tạo receiver mới — kể cả runtime, chỉ thấy message từ thời điểm subscribe.
  • tx.send(v) không async — trả ngay Result<usize, SendError> (usize = số receiver đang sống). Lỗi chỉ khi không còn receiver nào.
  • rx.recv().await trả Result<T, RecvError>. Closed = channel đóng (thoát loop). Lagged(n) = receiver chậm, n message bị drop — KHÔNG thoát, continue đọc tiếp.
  • Backpressure khác mpsc: broadcast không bao giờ chờ sender, thay vào đó drop message cũ trong ring buffer khi receiver chậm. Trade-off: bảo vệ throughput chung, hi sinh receiver chậm.
  • Use case: pub/sub local (event bus trong process), shutdown signal (fan-out tín hiệu stop tới N worker, kết hợp tokio::select!).
  • Phân biệt: mpsc = work distribution (1 msg → 1 receiver), broadcast = event fan-out (1 msg → N receiver), watch = state snapshot (latest value), oneshot = single result.
  • Pitfall lớn nhất: dùng while let Ok(msg) = rx.recv().await như mpsc — sẽ thoát loop khi gặp Lagged. Đúng phải match 3 nhánh: Ok, Lagged (continue), Closed (break).
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. Khác biệt cốt lõi giữa mpscbroadcast về cách phân phối message tới receiver? Tại sao type T trong broadcast phải Clone còn mpsc không cần?
  2. Bạn viết while let Ok(msg) = rx.recv().await { process(msg); } với broadcast. Sau khi chạy một lúc, loop dừng — log thấy "lag mất 3 message" trước đó. Nguyên nhân và cách fix?
  3. Một broadcast::Sender phát message khi không có receiver nào subscribe — kết quả? Khi nào pattern này hợp lý?
  4. Viết worker dùng tokio::select! nghe shutdown từ broadcast::Receiver<()> đồng thời với loop làm việc chính (sleep 1s rồi print). Đảm bảo worker thoát ngay khi nhận shutdown, không chờ hết 1s.
  5. Tại sao broadcast không có biến thể unbounded_channel() như mpsc? Mở rộng: vì sao broadcast chọn drop message cũ thay vì chờ sender khi buffer "đầy"?
  6. Bạn cần fan-out config mới tới 50 task khi user gọi API PUT /config. Mỗi task chỉ cần biết giá trị mới nhất, không cần biết lịch sử thay đổi. Nên dùng broadcast hay watch? Vì sao?
Đáp án
  1. mpsc: mỗi message tới đúng 1 receiver (FIFO consume). broadcast: mỗi message tới tất cả receiver đang sống. Vì broadcast phải nhân bản giá trị ra cho từng receiver nên T: Clone bắt buộc; mpsc chỉ move value cho 1 consumer nên không cần Clone — chỉ cần ownership chuyển sạch.
  2. Receiver bị lag (xử lý chậm hơn sender), rx.recv().await trả Err(RecvError::Lagged(3)). Pattern while let Ok(...) coi mọi Err là dừng — bao gồm cả Lagged vốn nên continue. Fix: dùng loop { match rx.recv().await { Ok(m) => ..., Err(Lagged(n)) => continue, Err(Closed) => break } }.
  3. tx.send(v) trả Err(SendError(v)) — không ai để gửi tới. Hợp lý khi sender là phần "vẫn chạy" của hệ thống còn receiver tạm thời không có ai (vd: log bus, nếu không subscriber nào quan tâm thì discard log). Caller xử lý bằng let _ = tx.send(v); hoặc log warn nhẹ — không nên panic.
  4. async fn worker(mut shutdown: broadcast::Receiver<()>) { loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(1)) => { println!("tick"); } _ = shutdown.recv() => { println!("shutdown received"); break; } } } }select! chờ nhánh nào ready trước, hủy nhánh kia. Khi shutdown ready trước, sleep bị cancel ngay → worker không chờ hết 1s.
  5. Broadcast dùng ring buffer cố định để cho phép subscribe runtime và đảm bảo memory bounded. Unbounded sẽ phá vỡ ring buffer model (không biết khi nào drop slot). Chọn drop msg cũ thay vì chờ sender: nếu chờ, một receiver chậm sẽ làm tất cả receiver còn lại bị block theo (vì sender bị block) — không chấp nhận được cho fan-out. Hi sinh receiver chậm để bảo vệ throughput của các receiver khoẻ.
  6. watch. Đặc trưng watch: chỉ giữ giá trị mới nhất, receiver join muộn vẫn thấy current value (không phải subscribe-from-now như broadcast); nếu receiver chậm mà config đổi 5 lần, receiver chỉ thấy lần cuối, không spam 5 lần. Broadcast trong tình huống này gây 5 lần xử lý dư thừa, lại có rủi ro Lagged nếu config đổi nhanh hơn task đọc.
11

Bài Tiếp Theo

Bài 258: tokio::select! Macro — Wait First Of Many Futures — giới thiệu macro tokio::select! dùng để chờ future nào ready trước trong nhiều nhánh (như đã thấy thoáng qua ở pattern shutdown bài này). Bạn sẽ học cú pháp đầy đủ, cơ chế cancellation các nhánh không thắng, và khái niệm cực kỳ quan trọng cancellation-safety — quy tắc để biết future nào có thể đặt trong select! mà không bị mất data.