Mục lục
Mục Tiêu Bài Học
Sau bài học, bạn sẽ:
- Phân biệt
std::sync::mpsc(blocking, dùng OS thread) vàtokio::sync::mpsc(async-aware, không block worker thread của runtime). - Hiểu hai loại channel của tokio: bounded qua
mpsc::channel(capacity)có backpressure tự nhiên, và unbounded quampsc::unbounded_channel()dễ dùng nhưng rủi ro out-of-memory. - Viết được pattern send/recv async cơ bản:
tx.send(value).awaittrảResult,rx.recv().awaittrảOption<T>. - Hiểu cách đóng channel: khi tất cả
Senderbị drop,rx.recv()trảNone; idiomwhile let Some(msg) = rx.recv().awaitđể consume tới khi đóng. - Viết được idiom worker pool: 1 task consumer giữ
rx, nhiều task producer clonetxvàsendjob vào. - Biết khi nào dùng bounded (mặc định nên chọn — có backpressure) và khi nào unbounded (chỉ khi chắc chắn producer rate < consumer rate).
Khác Biệt std::sync::mpsc vs tokio::sync::mpsc
Rust stdlib đã có sẵn channel: std::sync::mpsc::channel(). Vì sao tokio phải làm thêm một channel khác?
Câu trả lời: std::sync::mpsc block OS thread khi recv không có data. Trong runtime tokio, mỗi worker thread phải phục vụ hàng nghìn task — block một worker thread đồng nghĩa với đứng yên tất cả task khác đang chia sẻ worker đó. Đây là cardinal sin của lập trình async (xem lại Bài 245).
tokio::sync::mpsc giải quyết bằng cách: khi rx.recv().await mà channel rỗng, task yield về scheduler thay vì block thread. Worker được giải phóng đi phục vụ task khác; khi có message mới, reactor đánh thức task chờ qua Waker. Tương tự với tx.send().await trên bounded channel đầy: task yield chứ không spin-wait.
Bảng so sánh nhanh:
std::sync::mpsc: sync API,tx.send(v)không trả future,rx.recv()block OS thread. Dùng ngoài runtime async, hoặc giữa hai thread sync.tokio::sync::mpsc: async API,tx.send(v).await,rx.recv().await. Dùng trong async task. Không bao giờ trộn lẫn với code sync block.crossbeam_channel: lựa chọn thứ ba — sync nhưng tối ưu hơn stdlib, dùng cho thread pool sync.
Quy tắc đơn giản: viết task async → dùng tokio::sync::mpsc. Tuyệt đối không dùng std::sync::mpsc::Receiver::recv() trong task tokio — nó sẽ block worker thread và làm cả runtime đứng.
Bounded Channel: mpsc::channel(100)
Bounded channel là loại nên chọn mặc định. Khai báo capacity (số message tối đa trong buffer):
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Tạo channel với capacity 100
let (tx, mut rx) = mpsc::channel::<i32>(100);
// Producer task
tokio::spawn(async move {
for i in 0..5 {
// .await: nếu buffer đầy (100), task yield chờ slot trống
if tx.send(i).await.is_err() {
println!("receiver đã drop, dừng");
break;
}
}
});
// Consumer ở main task
while let Some(value) = rx.recv().await {
println!("nhận: {}", value);
}
}
Điểm cốt lõi của bounded: khi consumer xử lý chậm, buffer đầy lên — producer gọi send().await sẽ chờ tự nhiên tới khi có slot trống. Đây gọi là backpressure: dòng chảy dữ liệu tự cân bằng giữa producer và consumer, không cần code thêm logic throttle.
Capacity nên chọn bao nhiêu? Quy tắc thực tế:
- 10-100 cho job nặng (mỗi message tốn nhiều giây xử lý) — buffer nhỏ để backpressure phản ứng nhanh.
- 1000-10000 cho event nhẹ (log line, metric) — buffer to để chịu burst.
- 1 nếu muốn lock-step producer/consumer (gọi là rendezvous channel) — mỗi
sendchờrecvtương ứng.
Tránh capacity quá lớn (ví dụ 1 triệu) — tương đương unbounded, mất tác dụng backpressure.
Unbounded Channel: mpsc::unbounded_channel()
Unbounded không giới hạn buffer. API hơi khác — send trả ngay không cần .await:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
tokio::spawn(async move {
for i in 0..1000 {
// KHÔNG .await — send sync, luôn thành công (trừ khi rx đã drop)
tx.send(format!("msg-{}", i)).unwrap();
}
});
while let Some(s) = rx.recv().await {
println!("{}", s);
}
}
Tiện hơn — không cần lo await, gọi từ context sync cũng được. Nhưng đi kèm rủi ro lớn: nếu producer nhanh hơn consumer, message tích luỹ trong buffer không giới hạn, dẫn đến out-of-memory và process bị OS kill.
Ví dụ thảm hoạ minh hoạ rõ:
// NGUY HIỂM — unbounded + producer fast + consumer slow = OOM
let (tx, mut rx) = mpsc::unbounded_channel::<Vec<u8>>();
// Producer push 1MB/lần, không giới hạn tốc độ
tokio::spawn(async move {
loop {
tx.send(vec![0u8; 1_000_000]).unwrap(); // 1 MB
}
});
// Consumer xử lý chậm — sleep 1s mỗi message
tokio::spawn(async move {
while let Some(_msg) = rx.recv().await {
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
// Sau vài giây: process ăn hết RAM, bị kill
Khi nào dùng unbounded an toàn?
- Producer chắc chắn chậm hơn consumer — ví dụ event UI từ user click (giới hạn vật lý).
- Channel chỉ chứa số message hữu hạn nhỏ — ví dụ control message khởi tạo.
- Không có lựa chọn khác (gọi từ context sync không thể
.await).
Nguyên tắc: nghi ngờ thì chọn bounded. Backpressure tự nhiên đáng giá hơn sự tiện lợi của unbounded.
Send / Recv Async
Signature chính xác của hai method quan trọng nhất:
tx.send(value).await -> Result<(), SendError<T>>: trảErrchỉ khiReceiverđã drop (không còn ai nghe).SendErrorchứa lạivaluekhông gửi được để caller có thể xử lý.rx.recv().await -> Option<T>: trảSome(t)khi có message, trảNonekhi tất cảSenderđã drop và buffer đã rỗng.
Lưu ý hai chiều "đóng" khác nhau:
- Receiver drop → mọi
tx.send().awaittiếp theo trảErr. Producer biết "không còn ai nghe, dừng làm việc đi". - Tất cả Sender drop →
rx.recv().awaittrảNonesau khi consume hết buffer. Consumer biết "không còn ai gửi nữa, kết thúc loop".
Có thêm method non-blocking khi cần check không chờ:
// try_send: không await, fail ngay nếu buffer đầy
match tx.try_send(42) {
Ok(()) => println!("gửi OK"),
Err(mpsc::error::TrySendError::Full(v)) => println!("buffer đầy, drop {}", v),
Err(mpsc::error::TrySendError::Closed(v)) => println!("receiver đã đóng, drop {}", v),
}
// try_recv: không await, fail ngay nếu rỗng
match rx.try_recv() {
Ok(msg) => println!("nhận {}", msg),
Err(mpsc::error::TryRecvError::Empty) => println!("rỗng"),
Err(mpsc::error::TryRecvError::Disconnected) => println!("đóng rồi"),
}
try_send hữu ích khi muốn drop message thay vì chờ (ví dụ telemetry không quan trọng), nhưng làm mất tác dụng backpressure — cân nhắc kỹ.
Close Channel: Drop Tất Cả tx
Channel không có method close() tường minh. Cơ chế đóng là RAII: khi tất cả instance Sender bị drop (ra khỏi scope), channel coi như đóng — rx.recv() sẽ trả None sau khi consume hết buffer còn lại.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
tokio::spawn(async move {
for i in 0..3 {
tx.send(i).await.unwrap();
}
// tx ra khỏi scope ở đây — drop tự động
// Vì là tx duy nhất, channel đóng ngay lập tức
});
// Loop tới khi rx.recv() trả None (channel đóng)
while let Some(v) = rx.recv().await {
println!("nhận: {}", v);
}
println!("channel đã đóng, kết thúc");
}
// In: nhận: 0 / nhận: 1 / nhận: 2 / channel đã đóng, kết thúc
Pitfall thường gặp: quên drop một tx. Nếu bạn clone tx để gửi cho nhiều task nhưng vẫn giữ một bản trong main task, channel không bao giờ đóng — rx.recv().await sẽ chờ mãi:
let (tx, mut rx) = mpsc::channel::<i32>(10);
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(i).await.unwrap();
});
}
// QUÊN drop(tx) ở main — channel không bao giờ đóng!
drop(tx); // BẮT BUỘC để rx.recv() trả None khi đủ
while let Some(v) = rx.recv().await {
println!("{}", v);
}
Idiom: clone tx cho task, drop bản gốc ngay ở main để main không giữ tham chiếu thừa.
Idiom Worker Pool
Use case kinh điển nhất của mpsc là worker pool: nhiều producer task đẩy job vào channel, một consumer task xử lý tuần tự. Pattern này phù hợp khi consumer cần truy cập resource không thể chia sẻ (database connection, file handle, GPIO pin) — kênh hóa qua channel để serialize access.
use tokio::sync::mpsc;
use std::time::Duration;
#[derive(Debug)]
struct Job {
id: u32,
payload: String,
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Job>(100);
// CONSUMER task — single instance, xử lý job tuần tự
let consumer = tokio::spawn(async move {
while let Some(job) = rx.recv().await {
println!("xử lý job {}: {}", job.id, job.payload);
// Giả lập I/O nặng — không block worker vì là async
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("consumer: channel đóng, kết thúc");
});
// N PRODUCER task — mỗi task clone tx riêng và gửi job
let mut handles = Vec::new();
for i in 0..3 {
let tx = tx.clone();
handles.push(tokio::spawn(async move {
for j in 0..5 {
let job = Job {
id: i * 10 + j,
payload: format!("from-producer-{}", i),
};
tx.send(job).await.unwrap();
}
}));
}
drop(tx); // Drop tx gốc — chờ tất cả producer clone hoàn tất rồi consumer tự kết thúc
for h in handles { h.await.unwrap(); }
consumer.await.unwrap();
}
Phân tích: 3 producer chạy đồng thời gửi tổng 15 job vào channel có capacity 100. Consumer single-thread tiêu thụ tuần tự — đúng 1 job mỗi 100ms. Khi 3 producer xong và drop clone tx, cộng với drop(tx) gốc → channel đóng → rx.recv() trả None sau khi xử lý hết buffer → consumer thoát loop.
Mở rộng dễ dàng: muốn N consumer (worker pool thực sự) thì dùng broadcast hoặc spawn nhiều consumer cùng đọc qua Mutex<Receiver> — nhưng đây là pattern phức tạp hơn, sẽ bàn ở bài broadcast.
Multi-Producer: Clone tx
Chữ MP trong "mpsc" = multi-producer. Tạo nhiều producer rất rẻ: chỉ cần .clone() instance Sender:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(50);
// Spawn 5 producer task, mỗi task có clone riêng
for worker_id in 0..5 {
let tx = tx.clone(); // CHEAP — chỉ tăng atomic refcount
tokio::spawn(async move {
for msg_id in 0..3 {
let msg = format!("worker-{} msg-{}", worker_id, msg_id);
tx.send(msg).await.unwrap();
}
// tx clone drop ở đây khi task xong
});
}
drop(tx); // Drop bản gốc — channel sẽ đóng khi tất cả clone drop
while let Some(msg) = rx.recv().await {
println!("{}", msg);
}
}
// In ra 15 message từ 5 worker, thứ tự đan xen (concurrent)
Đặc điểm quan trọng:
Sender::clone()không tạo channel mới — chỉ tăng atomic reference count đến cùng một channel.- Tất cả clone gửi vào cùng buffer, consumer thấy luồng message đan xen theo thứ tự đến.
- Channel không đóng cho tới khi toàn bộ clone drop. Đây là lý do phải
drop(tx)bản gốc ở main. - Capacity được chia sẻ: nếu buffer 50 đầy, tất cả sender gọi
send().awaitđều bị chờ (fair queue).
Ngược lại, Receiver không implement Clone — đó là chữ SC (single-consumer). Muốn nhiều consumer, dùng tokio::sync::broadcast (B257) hoặc bọc Receiver trong Arc<Mutex<_>> nếu thật sự cần work-stealing.
Tổng Kết
tokio::sync::mpsclà async-aware channel — không block worker thread khi đầy/rỗng, chỉ yield task. Tuyệt đối không dùngstd::sync::mpsctrong task tokio.- Bounded qua
mpsc::channel(capacity)— có backpressure tự nhiên,send().awaitchờ slot trống. Mặc định nên chọn loại này. - Unbounded qua
mpsc::unbounded_channel()— buffer không giới hạn,send()không cần await. Rủi ro out-of-memory nếu producer nhanh hơn consumer. - Send:
tx.send(v).await -> Result<(), SendError>. Recv:rx.recv().await -> Option<T>. Variant non-blocking:try_send,try_recv. - Channel đóng theo cơ chế RAII: khi tất cả
Senderdrop,rx.recv()trảNone. Pitfall: quên droptxgốc ở main → loop chờ mãi. - Multi-producer dễ:
tx.clone()chỉ tăng atomic refcount; tất cả clone gửi vào cùng buffer. - Single-consumer cứng:
Receiverkhông implementClone; muốn nhiều consumer dùngbroadcasthoặcArc<Mutex<Receiver>>. - Idiom worker pool = 1 consumer + N producer qua channel, phù hợp khi consumer truy cập resource serial (DB connection, file).
Bài Tập Củng Cố
Tự trả lời, đáp án ở cuối:
- Vì sao không nên dùng
std::sync::mpsc::Receiver::recv()trong task tokio? Hậu quả cụ thể là gì khi cả runtime chỉ có 4 worker thread mà 4 task đều block trênrecv()sync? - Khác biệt API giữa
mpsc::channel(100)vàmpsc::unbounded_channel()trên phương thứcsend? Lý do API khác nhau? - Một dev viết code:
for i in 0..1000 { tx.send(vec![0u8; 1_000_000]).unwrap(); }vớiunbounded_channel, consumer xử lý mỗi message tốn 100ms. Mô tả điều gì sẽ xảy ra trên process Linux 8 GB RAM? - Viết worker pool: 4 producer task gửi tổng cộng 100 số ngẫu nhiên vào channel bounded(20), 1 consumer in tổng cộng đã nhận. Channel đóng đúng cách để consumer thoát loop.
- Pattern
let (tx, mut rx) = mpsc::channel(10); for _ in 0..3 { let tx = tx.clone(); spawn(async move { ... }); }rồiwhile let Some(_) = rx.recv().await {}— vì sao loop không bao giờ kết thúc dù 3 task spawn đã xong? Cách fix? - Khi nào chọn
try_sendthay vìsend().await? Đánh đổi gì khi chọntry_sendtrên bounded channel?
Đáp án
std::sync::mpsc::Receiver::recv()block OS thread — trong tokio mỗi worker thread phục vụ hàng nghìn task qua cooperative scheduling. Block 1 worker = đứng yên hàng nghìn task chia sẻ worker đó. Nếu 4/4 worker đều block trênrecv()sync → cả runtime đứng hình, không task nào tiến triển, bao gồm cả task lẽ ra đáng gửi message vào channel — deadlock. Dùngtokio::sync::mpsc::Receiver::recv().awaitđể task yield thay vì block.- Bounded:
tx.send(v).await— async, vì khi buffer đầy phải chờ slot trống (chờ = await). Unbounded:tx.send(v)— sync, vì buffer không giới hạn, không bao giờ phải chờ (chỉ fail nếu receiver đã drop). API khác để phản ánh chính xác semantics: bounded có thể block (cần yield point), unbounded không bao giờ block (không cần yield point). - Producer push 1000 message × 1 MB = 1 GB vào buffer trong vài mili giây. Consumer xử lý 100ms/message = cần 100 giây để clear. Trong khoảng đó buffer giữ ~1 GB liên tục. Nếu producer tiếp tục push, hoặc có nhiều channel tương tự, RAM nhanh chóng cạn — OOM killer của Linux kill process. Fix: dùng bounded với capacity hợp lý (vd 10) — producer bị backpressure, tự chờ consumer kịp.
let (tx, mut rx) = mpsc::channel::<u32>(20); let mut handles = Vec::new(); for w in 0..4 { let tx = tx.clone(); handles.push(tokio::spawn(async move { for _ in 0..25 { tx.send(rand::random::<u32>()).await.unwrap(); } })); } drop(tx); let consumer = tokio::spawn(async move { let mut count = 0; while let Some(_) = rx.recv().await { count += 1; } println!("nhận {} message", count); }); for h in handles { h.await.unwrap(); } consumer.await.unwrap();— chú ýdrop(tx)sau khi clone xong.- Channel chỉ đóng khi tất cả
Senderdrop. Pattern trên còn giữ bản gốctxtrong main scope (chưa drop) — dù 3 clone đã drop khi task xong, bản gốc vẫn alive → channel không đóng →rx.recv()chờ mãi. Fix: gọidrop(tx)ngay sau vòng for spawn, hoặc bọc spawn trong block{ }đểtxra khỏi scope. try_senddùng khi muốn drop message thay vì chờ — phù hợp telemetry/metric không quan trọng, nơi mất một vài data point chấp nhận được. Đánh đổi: mất backpressure. Nếu consumer chậm, message bị drop liên tục — producer không bị chậm theo nên không biết đang có vấn đề. Phải log/metric riêng số message drop để monitor. Mặc định nênsend().awaitđể giữ backpressure.
Bài Tiếp Theo
Bài 256: tokio::sync::oneshot Channel — giới thiệu channel chuyên dụng cho pattern spawn task lấy 1 kết quả: chỉ gửi đúng 1 message, tx bị consume sau send. Use case kinh điển: spawn task tính toán heavy, dùng oneshot để chờ kết quả trả về thay vì Vec hoặc Option global state. Bạn cũng sẽ thấy oneshot là building block của tokio::spawn(...).await dưới capo.
