Danh sách bài viết

Bài 255: tokio::sync::mpsc Channel

Bài 255 của series Rust Cơ Bản — channel là cách nhiều task async giao tiếp với nhau mà không cần shared memory + lock. Tokio cung cấp tokio::sync::mpsc — multi-producer single-consumer channel async-aware: tx.send(v).await không block worker thread (nếu channel đầy thì task yield), rx.recv().await trả Option<T> (Some khi có message, None khi tất cả sender đã drop — channel đã đóng). Hai loại: bounded (mpsc::channel(capacity)) cho phép backpressure tự nhiên — khi consumer xử lý chậm, producer bị chậm theo; và unbounded (mpsc::unbounded_channel()) không giới hạn buffer, send sync không cần await, nhưng rủi ro out-of-memory nếu producer nhanh hơn consumer. Idiom phổ biến nhất là worker pool: 1 task consumer nhận job qua rx, nhiều task producer clone tx và send job. Bài này phân biệt với std::sync::mpsc, demo bounded basic, multi-producer, close detection, worker pool pattern, và cảnh báo unbounded OOM bằng ví dụ cụ thể.

10/06/2026
10 phút đọc
1 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Phân biệt std::sync::mpsc (blocking, dùng OS thread) và tokio::sync::mpsc (async-aware, không block worker thread của runtime).
  • Hiểu hai loại channel của tokio: bounded qua mpsc::channel(capacity) có backpressure tự nhiên, và unbounded qua mpsc::unbounded_channel() dễ dùng nhưng rủi ro out-of-memory.
  • Viết được pattern send/recv async cơ bản: tx.send(value).await trả Result, rx.recv().await trả Option<T>.
  • Hiểu cách đóng channel: khi tất cả Sender bị drop, rx.recv() trả None; idiom while let Some(msg) = rx.recv().await để consume tới khi đóng.
  • Viết được idiom worker pool: 1 task consumer giữ rx, nhiều task producer clone txsend job vào.
  • Biết khi nào dùng bounded (mặc định nên chọn — có backpressure) và khi nào unbounded (chỉ khi chắc chắn producer rate < consumer rate).
2

Khác Biệt std::sync::mpsc vs tokio::sync::mpsc

Rust stdlib đã có sẵn channel: std::sync::mpsc::channel(). Vì sao tokio phải làm thêm một channel khác?

Câu trả lời: std::sync::mpsc block OS thread khi recv không có data. Trong runtime tokio, mỗi worker thread phải phục vụ hàng nghìn task — block một worker thread đồng nghĩa với đứng yên tất cả task khác đang chia sẻ worker đó. Đây là cardinal sin của lập trình async (xem lại Bài 245).

tokio::sync::mpsc giải quyết bằng cách: khi rx.recv().await mà channel rỗng, task yield về scheduler thay vì block thread. Worker được giải phóng đi phục vụ task khác; khi có message mới, reactor đánh thức task chờ qua Waker. Tương tự với tx.send().await trên bounded channel đầy: task yield chứ không spin-wait.

Bảng so sánh nhanh:

  • std::sync::mpsc: sync API, tx.send(v) không trả future, rx.recv() block OS thread. Dùng ngoài runtime async, hoặc giữa hai thread sync.
  • tokio::sync::mpsc: async API, tx.send(v).await, rx.recv().await. Dùng trong async task. Không bao giờ trộn lẫn với code sync block.
  • crossbeam_channel: lựa chọn thứ ba — sync nhưng tối ưu hơn stdlib, dùng cho thread pool sync.

Quy tắc đơn giản: viết task async → dùng tokio::sync::mpsc. Tuyệt đối không dùng std::sync::mpsc::Receiver::recv() trong task tokio — nó sẽ block worker thread và làm cả runtime đứng.

3

Bounded Channel: mpsc::channel(100)

Bounded channel là loại nên chọn mặc định. Khai báo capacity (số message tối đa trong buffer):

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Tạo channel với capacity 100
    let (tx, mut rx) = mpsc::channel::<i32>(100);

    // Producer task
    tokio::spawn(async move {
        for i in 0..5 {
            // .await: nếu buffer đầy (100), task yield chờ slot trống
            if tx.send(i).await.is_err() {
                println!("receiver đã drop, dừng");
                break;
            }
        }
    });

    // Consumer ở main task
    while let Some(value) = rx.recv().await {
        println!("nhận: {}", value);
    }
}

Điểm cốt lõi của bounded: khi consumer xử lý chậm, buffer đầy lên — producer gọi send().await sẽ chờ tự nhiên tới khi có slot trống. Đây gọi là backpressure: dòng chảy dữ liệu tự cân bằng giữa producer và consumer, không cần code thêm logic throttle.

Capacity nên chọn bao nhiêu? Quy tắc thực tế:

  • 10-100 cho job nặng (mỗi message tốn nhiều giây xử lý) — buffer nhỏ để backpressure phản ứng nhanh.
  • 1000-10000 cho event nhẹ (log line, metric) — buffer to để chịu burst.
  • 1 nếu muốn lock-step producer/consumer (gọi là rendezvous channel) — mỗi send chờ recv tương ứng.

Tránh capacity quá lớn (ví dụ 1 triệu) — tương đương unbounded, mất tác dụng backpressure.

4

Unbounded Channel: mpsc::unbounded_channel()

Unbounded không giới hạn buffer. API hơi khác — send trả ngay không cần .await:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::unbounded_channel::<String>();

    tokio::spawn(async move {
        for i in 0..1000 {
            // KHÔNG .await — send sync, luôn thành công (trừ khi rx đã drop)
            tx.send(format!("msg-{}", i)).unwrap();
        }
    });

    while let Some(s) = rx.recv().await {
        println!("{}", s);
    }
}

Tiện hơn — không cần lo await, gọi từ context sync cũng được. Nhưng đi kèm rủi ro lớn: nếu producer nhanh hơn consumer, message tích luỹ trong buffer không giới hạn, dẫn đến out-of-memory và process bị OS kill.

Ví dụ thảm hoạ minh hoạ rõ:

// NGUY HIỂM — unbounded + producer fast + consumer slow = OOM
let (tx, mut rx) = mpsc::unbounded_channel::<Vec<u8>>();

// Producer push 1MB/lần, không giới hạn tốc độ
tokio::spawn(async move {
    loop {
        tx.send(vec![0u8; 1_000_000]).unwrap(); // 1 MB
    }
});

// Consumer xử lý chậm — sleep 1s mỗi message
tokio::spawn(async move {
    while let Some(_msg) = rx.recv().await {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
});
// Sau vài giây: process ăn hết RAM, bị kill

Khi nào dùng unbounded an toàn?

  • Producer chắc chắn chậm hơn consumer — ví dụ event UI từ user click (giới hạn vật lý).
  • Channel chỉ chứa số message hữu hạn nhỏ — ví dụ control message khởi tạo.
  • Không có lựa chọn khác (gọi từ context sync không thể .await).

Nguyên tắc: nghi ngờ thì chọn bounded. Backpressure tự nhiên đáng giá hơn sự tiện lợi của unbounded.

5

Send / Recv Async

Signature chính xác của hai method quan trọng nhất:

  • tx.send(value).await -> Result<(), SendError<T>>: trả Err chỉ khi Receiver đã drop (không còn ai nghe). SendError chứa lại value không gửi được để caller có thể xử lý.
  • rx.recv().await -> Option<T>: trả Some(t) khi có message, trả None khi tất cả Sender đã drop và buffer đã rỗng.

Lưu ý hai chiều "đóng" khác nhau:

  • Receiver drop → mọi tx.send().await tiếp theo trả Err. Producer biết "không còn ai nghe, dừng làm việc đi".
  • Tất cả Sender droprx.recv().await trả None sau khi consume hết buffer. Consumer biết "không còn ai gửi nữa, kết thúc loop".

Có thêm method non-blocking khi cần check không chờ:

// try_send: không await, fail ngay nếu buffer đầy
match tx.try_send(42) {
    Ok(()) => println!("gửi OK"),
    Err(mpsc::error::TrySendError::Full(v)) => println!("buffer đầy, drop {}", v),
    Err(mpsc::error::TrySendError::Closed(v)) => println!("receiver đã đóng, drop {}", v),
}

// try_recv: không await, fail ngay nếu rỗng
match rx.try_recv() {
    Ok(msg) => println!("nhận {}", msg),
    Err(mpsc::error::TryRecvError::Empty) => println!("rỗng"),
    Err(mpsc::error::TryRecvError::Disconnected) => println!("đóng rồi"),
}

try_send hữu ích khi muốn drop message thay vì chờ (ví dụ telemetry không quan trọng), nhưng làm mất tác dụng backpressure — cân nhắc kỹ.

6

Close Channel: Drop Tất Cả tx

Channel không có method close() tường minh. Cơ chế đóng là RAII: khi tất cả instance Sender bị drop (ra khỏi scope), channel coi như đóng — rx.recv() sẽ trả None sau khi consume hết buffer còn lại.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<i32>(10);

    tokio::spawn(async move {
        for i in 0..3 {
            tx.send(i).await.unwrap();
        }
        // tx ra khỏi scope ở đây — drop tự động
        // Vì là tx duy nhất, channel đóng ngay lập tức
    });

    // Loop tới khi rx.recv() trả None (channel đóng)
    while let Some(v) = rx.recv().await {
        println!("nhận: {}", v);
    }
    println!("channel đã đóng, kết thúc");
}
// In: nhận: 0 / nhận: 1 / nhận: 2 / channel đã đóng, kết thúc

Pitfall thường gặp: quên drop một tx. Nếu bạn clone tx để gửi cho nhiều task nhưng vẫn giữ một bản trong main task, channel không bao giờ đóngrx.recv().await sẽ chờ mãi:

let (tx, mut rx) = mpsc::channel::<i32>(10);

for i in 0..3 {
    let tx = tx.clone();
    tokio::spawn(async move {
        tx.send(i).await.unwrap();
    });
}
// QUÊN drop(tx) ở main — channel không bao giờ đóng!
drop(tx); // BẮT BUỘC để rx.recv() trả None khi đủ

while let Some(v) = rx.recv().await {
    println!("{}", v);
}

Idiom: clone tx cho task, drop bản gốc ngay ở main để main không giữ tham chiếu thừa.

7

Idiom Worker Pool

Use case kinh điển nhất của mpsc là worker pool: nhiều producer task đẩy job vào channel, một consumer task xử lý tuần tự. Pattern này phù hợp khi consumer cần truy cập resource không thể chia sẻ (database connection, file handle, GPIO pin) — kênh hóa qua channel để serialize access.

use tokio::sync::mpsc;
use std::time::Duration;

#[derive(Debug)]
struct Job {
    id: u32,
    payload: String,
}

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<Job>(100);

    // CONSUMER task — single instance, xử lý job tuần tự
    let consumer = tokio::spawn(async move {
        while let Some(job) = rx.recv().await {
            println!("xử lý job {}: {}", job.id, job.payload);
            // Giả lập I/O nặng — không block worker vì là async
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
        println!("consumer: channel đóng, kết thúc");
    });

    // N PRODUCER task — mỗi task clone tx riêng và gửi job
    let mut handles = Vec::new();
    for i in 0..3 {
        let tx = tx.clone();
        handles.push(tokio::spawn(async move {
            for j in 0..5 {
                let job = Job {
                    id: i * 10 + j,
                    payload: format!("from-producer-{}", i),
                };
                tx.send(job).await.unwrap();
            }
        }));
    }

    drop(tx); // Drop tx gốc — chờ tất cả producer clone hoàn tất rồi consumer tự kết thúc

    for h in handles { h.await.unwrap(); }
    consumer.await.unwrap();
}

Phân tích: 3 producer chạy đồng thời gửi tổng 15 job vào channel có capacity 100. Consumer single-thread tiêu thụ tuần tự — đúng 1 job mỗi 100ms. Khi 3 producer xong và drop clone tx, cộng với drop(tx) gốc → channel đóng → rx.recv() trả None sau khi xử lý hết buffer → consumer thoát loop.

Mở rộng dễ dàng: muốn N consumer (worker pool thực sự) thì dùng broadcast hoặc spawn nhiều consumer cùng đọc qua Mutex<Receiver> — nhưng đây là pattern phức tạp hơn, sẽ bàn ở bài broadcast.

8

Multi-Producer: Clone tx

Chữ MP trong "mpsc" = multi-producer. Tạo nhiều producer rất rẻ: chỉ cần .clone() instance Sender:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(50);

    // Spawn 5 producer task, mỗi task có clone riêng
    for worker_id in 0..5 {
        let tx = tx.clone(); // CHEAP — chỉ tăng atomic refcount
        tokio::spawn(async move {
            for msg_id in 0..3 {
                let msg = format!("worker-{} msg-{}", worker_id, msg_id);
                tx.send(msg).await.unwrap();
            }
            // tx clone drop ở đây khi task xong
        });
    }

    drop(tx); // Drop bản gốc — channel sẽ đóng khi tất cả clone drop

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}
// In ra 15 message từ 5 worker, thứ tự đan xen (concurrent)

Đặc điểm quan trọng:

  • Sender::clone() không tạo channel mới — chỉ tăng atomic reference count đến cùng một channel.
  • Tất cả clone gửi vào cùng buffer, consumer thấy luồng message đan xen theo thứ tự đến.
  • Channel không đóng cho tới khi toàn bộ clone drop. Đây là lý do phải drop(tx) bản gốc ở main.
  • Capacity được chia sẻ: nếu buffer 50 đầy, tất cả sender gọi send().await đều bị chờ (fair queue).

Ngược lại, Receiver không implement Clone — đó là chữ SC (single-consumer). Muốn nhiều consumer, dùng tokio::sync::broadcast (B257) hoặc bọc Receiver trong Arc<Mutex<_>> nếu thật sự cần work-stealing.

9

Tổng Kết

  • tokio::sync::mpscasync-aware channel — không block worker thread khi đầy/rỗng, chỉ yield task. Tuyệt đối không dùng std::sync::mpsc trong task tokio.
  • Bounded qua mpsc::channel(capacity) — có backpressure tự nhiên, send().await chờ slot trống. Mặc định nên chọn loại này.
  • Unbounded qua mpsc::unbounded_channel() — buffer không giới hạn, send() không cần await. Rủi ro out-of-memory nếu producer nhanh hơn consumer.
  • Send: tx.send(v).await -> Result<(), SendError>. Recv: rx.recv().await -> Option<T>. Variant non-blocking: try_send, try_recv.
  • Channel đóng theo cơ chế RAII: khi tất cả Sender drop, rx.recv() trả None. Pitfall: quên drop tx gốc ở main → loop chờ mãi.
  • Multi-producer dễ: tx.clone() chỉ tăng atomic refcount; tất cả clone gửi vào cùng buffer.
  • Single-consumer cứng: Receiver không implement Clone; muốn nhiều consumer dùng broadcast hoặc Arc<Mutex<Receiver>>.
  • Idiom worker pool = 1 consumer + N producer qua channel, phù hợp khi consumer truy cập resource serial (DB connection, file).
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. Vì sao không nên dùng std::sync::mpsc::Receiver::recv() trong task tokio? Hậu quả cụ thể là gì khi cả runtime chỉ có 4 worker thread mà 4 task đều block trên recv() sync?
  2. Khác biệt API giữa mpsc::channel(100)mpsc::unbounded_channel() trên phương thức send? Lý do API khác nhau?
  3. Một dev viết code: for i in 0..1000 { tx.send(vec![0u8; 1_000_000]).unwrap(); } với unbounded_channel, consumer xử lý mỗi message tốn 100ms. Mô tả điều gì sẽ xảy ra trên process Linux 8 GB RAM?
  4. Viết worker pool: 4 producer task gửi tổng cộng 100 số ngẫu nhiên vào channel bounded(20), 1 consumer in tổng cộng đã nhận. Channel đóng đúng cách để consumer thoát loop.
  5. Pattern let (tx, mut rx) = mpsc::channel(10); for _ in 0..3 { let tx = tx.clone(); spawn(async move { ... }); } rồi while let Some(_) = rx.recv().await {} — vì sao loop không bao giờ kết thúc dù 3 task spawn đã xong? Cách fix?
  6. Khi nào chọn try_send thay vì send().await? Đánh đổi gì khi chọn try_send trên bounded channel?
Đáp án
  1. std::sync::mpsc::Receiver::recv() block OS thread — trong tokio mỗi worker thread phục vụ hàng nghìn task qua cooperative scheduling. Block 1 worker = đứng yên hàng nghìn task chia sẻ worker đó. Nếu 4/4 worker đều block trên recv() sync → cả runtime đứng hình, không task nào tiến triển, bao gồm cả task lẽ ra đáng gửi message vào channel — deadlock. Dùng tokio::sync::mpsc::Receiver::recv().await để task yield thay vì block.
  2. Bounded: tx.send(v).await — async, vì khi buffer đầy phải chờ slot trống (chờ = await). Unbounded: tx.send(v) — sync, vì buffer không giới hạn, không bao giờ phải chờ (chỉ fail nếu receiver đã drop). API khác để phản ánh chính xác semantics: bounded có thể block (cần yield point), unbounded không bao giờ block (không cần yield point).
  3. Producer push 1000 message × 1 MB = 1 GB vào buffer trong vài mili giây. Consumer xử lý 100ms/message = cần 100 giây để clear. Trong khoảng đó buffer giữ ~1 GB liên tục. Nếu producer tiếp tục push, hoặc có nhiều channel tương tự, RAM nhanh chóng cạn — OOM killer của Linux kill process. Fix: dùng bounded với capacity hợp lý (vd 10) — producer bị backpressure, tự chờ consumer kịp.
  4. let (tx, mut rx) = mpsc::channel::<u32>(20); let mut handles = Vec::new(); for w in 0..4 { let tx = tx.clone(); handles.push(tokio::spawn(async move { for _ in 0..25 { tx.send(rand::random::<u32>()).await.unwrap(); } })); } drop(tx); let consumer = tokio::spawn(async move { let mut count = 0; while let Some(_) = rx.recv().await { count += 1; } println!("nhận {} message", count); }); for h in handles { h.await.unwrap(); } consumer.await.unwrap(); — chú ý drop(tx) sau khi clone xong.
  5. Channel chỉ đóng khi tất cả Sender drop. Pattern trên còn giữ bản gốc tx trong main scope (chưa drop) — dù 3 clone đã drop khi task xong, bản gốc vẫn alive → channel không đóng → rx.recv() chờ mãi. Fix: gọi drop(tx) ngay sau vòng for spawn, hoặc bọc spawn trong block { } để tx ra khỏi scope.
  6. try_send dùng khi muốn drop message thay vì chờ — phù hợp telemetry/metric không quan trọng, nơi mất một vài data point chấp nhận được. Đánh đổi: mất backpressure. Nếu consumer chậm, message bị drop liên tục — producer không bị chậm theo nên không biết đang có vấn đề. Phải log/metric riêng số message drop để monitor. Mặc định nên send().await để giữ backpressure.
11

Bài Tiếp Theo

Bài 256: tokio::sync::oneshot Channel — giới thiệu channel chuyên dụng cho pattern spawn task lấy 1 kết quả: chỉ gửi đúng 1 message, tx bị consume sau send. Use case kinh điển: spawn task tính toán heavy, dùng oneshot để chờ kết quả trả về thay vì Vec hoặc Option global state. Bạn cũng sẽ thấy oneshot là building block của tokio::spawn(...).await dưới capo.