Mục lục
Mục Tiêu Bài Học
Sau bài học, bạn sẽ:
- Hiểu triết lý "share memory by communicating" (CSP của Tony Hoare, kế thừa bởi Go và Rust): thay vì nhiều thread đụng chung một biến, một thread sở hữu state và các thread khác chỉ gửi tin nhắn.
- Viết được cặp
let (tx, rx) = mpsc::channel();và truyềntx/rxvào các thread đúng cách (ownership move). - Phân biệt
tx.send(v)(chuyển ownershipvvào channel, trảResult) vớirx.recv()(block đợi, trảResult<T, RecvError>). - Tạo multi-producer bằng
tx.clone()cho mỗi thread — biết quy tắc "channel đóng khi tất cả bản saotxbị drop". - Chọn đúng giữa
recv()(blocking),try_recv()(non-blocking), và iteratorfor msg in &rx(tự dừng khi đóng). - Biết khi nào dùng
mpsc::sync_channel(n)thaympsc::channel()để có bounded buffer + backpressure. - Biết khi nào channel tốt hơn
Mutex(worker pool, pipeline, fan-in event) và ngược lại.
Channel Là Gì — Share By Communicating
Đến giờ, mọi cách share state giữa thread mà ta học (Mutex, RwLock, Atomic) đều theo mô hình shared memory: nhiều thread cùng nhìn vào một ô nhớ, ta dựng hàng rào để chúng không đạp lên nhau. Mô hình này mạnh nhưng khó suy luận — mỗi lock thêm vào là một cơ hội deadlock, mỗi atomic mới là một câu hỏi về memory ordering.
Có một mô hình thứ hai: message passing. Mỗi thread chỉ giữ state riêng của mình; muốn nói với thread khác, gửi một message qua một kênh giao tiếp. Người nhận đọc message theo thứ tự, xử lý, có khi gửi trả lời. Không có biến nào nhiều thread cùng đụng — không cần lock. Tony Hoare phát biểu mô hình này (CSP, 1978) bằng một câu trở thành slogan: "Do not communicate by sharing memory; instead, share memory by communicating."
Go làm phổ biến slogan này với chan. Rust hiện thực nó trong std::sync::mpsc. Tên mpsc nói thẳng kiểu channel: multi-producer single-consumer — nhiều thread cùng send, một thread duy nhất receive. Đủ cho phần lớn pattern thực tế: worker pool gửi kết quả về dispatcher, nhiều cảm biến gửi event về aggregator, nhiều scraper gửi item về writer.
Điểm tinh tế của Rust: khi bạn send(value), ownership của value bị chuyển vào channel — sender không còn dùng được nó nữa, receiver lấy ra là chủ mới. Đây không chỉ là thiết kế thẩm mỹ; nó là compile-time guarantee rằng hai thread không cùng đụng vào một String qua channel. Race condition trên message bị bóc gọn bằng borrow checker.
std::sync::mpsc Basics
Tạo channel bằng mpsc::channel() — trả về tuple (Sender<T>, Receiver<T>). Type T được suy luận từ message đầu tiên bạn send:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Thread con gửi một message rồi kết thúc
thread::spawn(move || {
let msg = String::from("hello from worker");
tx.send(msg).unwrap();
// ở đây không thể dùng msg nữa — ownership đã chuyển vào channel
});
// Thread chính nhận
let received = rx.recv().unwrap();
println!("got: {received}");
}
Quan sát ba điểm. (1) tx được move vào closure của thread::spawn — đó là cách hợp lệ để thread con sở hữu sender. (2) tx.send(msg) trả Result<(), SendError<T>>; SendError chỉ xảy ra khi receiver đã bị drop — không có chỗ để message đi nữa. (3) rx.recv() block đến khi có message; trả Ok(msg) khi nhận được, trả Err(RecvError) khi channel đã đóng và không còn message nào.
Channel là unbounded theo mặc định: send không bao giờ block vì hết chỗ — buffer co giãn theo bộ nhớ. Đây tiện nhưng nguy hiểm nếu producer nhanh hơn consumer rất nhiều: bộ nhớ cứ phình ra cho đến khi OOM. Mục 7 ta sẽ học bản bounded để chống vấn đề này.
Một điểm lưu ý quan trọng: Receiver không Sync và không Clone — đây là chữ single-consumer trong "mpsc". Nếu cần nhiều consumer, dùng crate ngoài crossbeam-channel (mpmc).
Multi-Producer: Clone tx
Chữ multi-producer nằm ở chỗ Sender<T> implement Clone. Mỗi lần tx.clone() bạn có thêm một sender mới chia sẻ cùng channel. Mỗi thread cầm một bản, gửi độc lập, receiver nhận tất cả vào cùng một hàng đợi FIFO:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Spawn 4 producer, mỗi producer gửi 3 message
for id in 0..4 {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..3 {
let msg = format!("worker-{id} msg-{i}");
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(10));
}
// tx bị drop ở cuối closure — bớt một bản sao
});
}
// Quan trọng: drop bản tx gốc ở thread chính,
// nếu không vòng for dưới sẽ chờ mãi vì channel chưa đóng.
drop(tx);
// Iterator tự dừng khi tất cả tx bị drop
for msg in rx {
println!("got: {msg}");
}
}
Quan sát bốn điểm. (1) tx.clone() rẻ — bên trong là Arc, clone chỉ tăng refcount. (2) Thứ tự message giữa các producer không xác định (interleave theo lịch OS); chỉ thứ tự trong cùng một sender là FIFO. (3) for msg in rx tương đương while let Ok(msg) = rx.recv() — tự kết thúc khi recv trả Err. (4) Channel chỉ đóng khi tất cả bản sao tx đã drop; đó là vì sao ta phải drop(tx) ở main — bản gốc vẫn sống nếu không.
Pitfall phổ biến: quên drop bản tx ở thread chính rồi ngạc nhiên vì chương trình treo mãi. Quy tắc: nếu thread chính chỉ làm consumer, hãy clone tx ra rồi drop bản gốc ngay sau khi spawn xong producer.
recv() Blocking vs try_recv() Non-Blocking
Receiver có ba cách lấy message tuỳ nhu cầu:
rx.recv()— block đến khi có message hoặc channel đóng. TrảResult<T, RecvError>. Phù hợp khi consumer chỉ làm mỗi việc đọc channel.rx.try_recv()— non-blocking, trả ngay.Ok(msg)nếu có,Err(TryRecvError::Empty)nếu rỗng nhưng channel còn sống,Err(TryRecvError::Disconnected)nếu đã đóng. Phù hợp khi consumer còn việc khác để làm, chỉ poll channel xen kẽ.rx.recv_timeout(Duration)— block tối đaDuration; trảErr(RecvTimeoutError::Timeout)nếu hết giờ. Phù hợp khi muốn vừa đợi vừa có deadline.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel::<u32>();
thread::spawn(move || {
for i in 1..=3 {
tx.send(i).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
// Poll non-blocking trong khi vẫn làm việc khác
loop {
match rx.try_recv() {
Ok(n) => println!("got {n}"),
Err(mpsc::TryRecvError::Empty) => {
// không có gì, làm việc khác rồi quay lại
thread::sleep(Duration::from_millis(10));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("channel closed");
break;
}
}
}
}
Quy tắc chọn: nếu consumer là worker chuyên ăn message → recv() trong vòng while let Ok. Nếu consumer là event loop đang tick mỗi 16 ms (game, UI, real-time) → try_recv. Nếu muốn shutdown êm sau N giây không có message → recv_timeout.
Channel Đóng — Drop tx → recv Trả Err
Channel không có method close() tường minh. Vòng đời của nó tự suy ra từ ownership:
- Khi tất cả bản sao
Senderbị drop, không ai có thểsendnữa.rx.recv()sau khi đã drain hết buffer sẽ trảErr(RecvError). - Khi
Receiverbị drop, mọitx.send(v)tiếp theo trảErr(SendError(v))— message được trả lại để bạn xử lý.
Đây là idiom kết thúc tự nhiên của pipeline. Producer xong việc thì closure kết thúc, tx drop. Khi cái cuối cùng drop, receiver tự biết "không còn ai gửi nữa", vòng for kết thúc êm:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let producer = thread::spawn(move || {
for i in 1..=3 {
tx.send(i).unwrap();
}
// tx drop ở đây → channel đóng
});
// Iterator dừng khi channel đóng — không cần "sentinel value" như None
for msg in rx {
println!("got: {msg}");
}
producer.join().unwrap();
}
Pattern này tránh được "magic value" kiểu send(None) để báo hết. Ownership của Rust tự làm signal — đẹp và an toàn.
Pitfall: nếu bạn không chuyển tx vào closure (vô tình giữ lại ngoài main), bản đó sống đến hết main, vòng for không bao giờ dừng. Khi viết code phức tạp, kiểm tra "ai đang giữ tx?" là việc số một khi debug treo channel.
sync_channel — Bounded Và Backpressure
Hàm mpsc::channel() trả về channel unbounded: send luôn thành công ngay, message xếp hàng vô hạn. Khi consumer chậm hơn producer, hàng đợi phình lên không giới hạn — kịch bản OOM kinh điển.
Giải pháp là mpsc::sync_channel(capacity) — bounded. Buffer chứa tối đa capacity message; producer thứ capacity+1 sẽ bị block trong send đến khi consumer lấy ra một chỗ. Đây gọi là backpressure: hệ thống tự ép producer chạy chậm bằng tốc độ consumer.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Buffer 2 message; producer thứ 3 bị block trong send
let (tx, rx) = mpsc::sync_channel::<u32>(2);
thread::spawn(move || {
for i in 1..=5 {
println!("producer: trying send {i}");
tx.send(i).unwrap(); // block khi buffer đầy
println!("producer: sent {i}");
}
});
// Consumer chậm
thread::sleep(Duration::from_millis(200));
for msg in rx {
println!("consumer: got {msg}");
thread::sleep(Duration::from_millis(100));
}
}
Output cho thấy producer in "sent 1", "sent 2" ngay rồi treo ở "trying send 3" cho đến khi consumer ăn bớt một message. Đó là backpressure đang làm việc — không phải code chậm, mà là thiết kế chậm để giữ bộ nhớ ổn định.
Trường hợp đặc biệt: sync_channel(0) là rendezvous channel — không có buffer, send chỉ trả về khi đồng thời có recv đang đợi. Producer và consumer "bắt tay" trực tiếp, hữu ích khi muốn handoff đồng bộ tuyệt đối.
Quy tắc tay vịn: mặc định dùng sync_channel(N) với N là số message tối đa bạn chịu được khi consumer ngừng vài giây. Chỉ chọn channel() unbounded khi bạn chắc producer giới hạn (ví dụ chỉ gửi một lần khi shutdown).
Khi Nào Channel Tốt Hơn Mutex
Channel và Mutex giải hai bài toán khác nhau; hiểu sai chỗ này dẫn đến code "đúng nhưng xấu". Vài hướng dẫn thực dụng:
Dùng channel khi:
- Pipeline có nhiều stage — stage A đọc input, gửi sang B parse, B gửi sang C ghi DB. Mỗi stage là một thread, channel nối hai stage. Code đọc thẳng từ trên xuống, không lock chồng chéo.
- Worker pool fan-in — N worker xử lý task song song, kết quả dồn về một dispatcher qua channel. Mỗi worker giữ bản
tx.clone(), dispatcher giữrx. - Event aggregation — nhiều nguồn sự kiện (sensor, network, timer) dồn vào một thread tổng hợp.
- Khi state cần truyền là một "đơn vị công việc" hoàn chỉnh (request, task struct, parsed record) chứ không phải trường nhỏ cần cập nhật tại chỗ.
Vẫn dùng Mutex khi:
- State nhỏ, truy cập từ mọi phía — ví dụ cache
HashMap, config dynamic. Đi qua channel kiểu "gửi xin / nhận trả lời" tốn round-trip latency, không đáng. - Cần truy cập đồng bộ ngay tại chỗ — kiểu
balance += amountkhông thể chờ một thread quản lý balance trả lời. - Nhiều consumer cùng đọc một dataset —
RwLockphù hợp hơn; channel chỉ single-consumer.
Ví dụ pipeline hai stage (read → upper-case → print) qua channel — bạn sẽ thấy code "tuyến tính" như nói chuyện:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx_in, rx_in) = mpsc::sync_channel::<String>(8); // stage A → B
let (tx_out, rx_out) = mpsc::sync_channel::<String>(8); // stage B → C
// Stage A: producer
thread::spawn(move || {
for line in ["hello", "rust", "mpsc"].iter() {
tx_in.send(line.to_string()).unwrap();
}
});
// Stage B: transform
thread::spawn(move || {
for s in rx_in {
tx_out.send(s.to_uppercase()).unwrap();
}
// rx_in đóng → for kết thúc → tx_out drop → rx_out cũng đóng theo
});
// Stage C: consumer (main)
for s in rx_out {
println!("{s}");
}
}
Không có lock. Không có shared state. Mỗi stage là một thread sở hữu state nội bộ riêng. Khi nguồn cạn, signal "đóng" tự lan dọc pipeline. Đây là sức mạnh của message passing.
Tổng Kết
- mpsc Channel = multi-producer single-consumer: nhiều thread
send, một threadrecv. Hiện thực Rust của triết lý CSP "share memory by communicating" thay vì "communicate by sharing memory". - Tạo:
let (tx, rx) = mpsc::channel();. Gửi:tx.send(v)— chuyển ownershipvvào channel, trảResult. Nhận:rx.recv()block đợi, trảResult<T, RecvError>. - Multi-producer:
tx.clone()cho mỗi thread. Channel đóng khi tất cả bản saotxbị drop — nhớdrop(tx)bản gốc ở main, kẻo for-loop chờ mãi. - Ba cách nhận:
recv()block,try_recv()non-blocking,recv_timeout(d)block có deadline. Iteratorfor msg in rxtự dừng khi channel đóng. - Channel đóng = tất cả sender drop.
recvtrảErr(RecvError). Sender cònsendsau khi receiver drop sẽ nhậnErr(SendError(v))— message trả lại. - sync_channel(n): bounded buffer dung lượng
n;sendblock khi đầy → backpressure.sync_channel(0)= rendezvous (handoff đồng bộ). - Channel hợp khi: pipeline nhiều stage, worker pool fan-in, event aggregation, đơn vị công việc hoàn chỉnh.
- Mutex hợp khi: state nhỏ truy cập từ mọi phía, cần truy cập đồng bộ ngay tại chỗ, nhiều consumer đọc cùng dataset.
- Cần multi-consumer (mpmc)?
std::sync::mpsckhông hỗ trợ — dùng cratecrossbeam-channel.
Bài Tập Củng Cố
Tự trả lời, đáp án ở cuối:
- Slogan "share memory by communicating" có ý gì? Tại sao mô hình message passing dễ suy luận hơn shared-memory + lock?
- Vì sao khi
tx.send(v)rồi, bạn không thể dùngvnữa? Borrow checker đảm bảo điều gì khi message qua channel? - Bạn spawn 4 producer, mỗi producer clone
txriêng, nhưng vòngfor msg in rxở main không bao giờ kết thúc. Lý do thường gặp nhất là gì? Cách fix? - Khác biệt giữa
rx.recv()vàrx.try_recv()? Khi nào chọn cái nào — cho một use case cụ thể cho mỗi cái. - So sánh
mpsc::channel()vàmpsc::sync_channel(100)ở mặt buffering và backpressure. Vì saosync_channelthường an toàn hơn cho production? - Cho ví dụ một bài toán nên dùng channel (không nên dùng
Mutex), và ngược lại một bài toán nên dùngMutex(không nên dùng channel).
Đáp án
- "Share memory by communicating" = thay vì cho nhiều thread cùng đụng vào một biến (phải bảo vệ bằng lock), một thread sở hữu state đó, thread khác chỉ gửi message tới nó. Dễ suy luận hơn vì: (a) không có race condition trên message (ownership chuyển nguyên), (b) flow data tuyến tính theo channel — đọc code thấy ngay ai gửi cho ai, (c) không lo deadlock từ lock-ordering vì không có lock.
- Vì
sendnhậnvtheo value (move) — ownership củavđã chuyển vào channel rồi qua tay receiver. Borrow checker đảm bảo compile-time rằng sender và receiver không cùng đụng vào dữ liệu — race condition trên content message bị bóc gọn ngay khi compile, không cần runtime check. - Lý do thường gặp: bản
txgốc ở main chưa bị drop. Channel chỉ đóng khi tất cả sender drop; bản gốc còn sống thìrx.recv()trong vòng for vẫn block đợi. Fix: gọidrop(tx);ngay sau khi spawn xong producer cuối cùng — hoặc đặt tx gốc trong scope hẹp{ }để nó drop sớm. recv()block đến khi có message hoặc channel đóng — phù hợp consumer chuyên ăn message (worker pool dispatcher).try_recv()trả ngay, có hoặcEmpty/Disconnected— phù hợp event loop game/UI tick 16 ms, vừa poll channel vừa làm việc khác.channel()unbounded —sendkhông bao giờ block, buffer phình theo memory; nếu producer nhanh hơn consumer lâu dài → OOM.sync_channel(100)bounded —sendblock khi buffer chứa đủ 100 message; tự ép producer chạy bằng tốc consumer (backpressure). Production thường ưu tiênsync_channelvì giới hạn được footprint memory, biến lỗi "consumer chậm" thành "producer chạy chậm hơn" thay vì "OOM crash".- Channel: pipeline ETL — stage A đọc file, B parse, C ghi DB; mỗi stage một thread, channel nối hai stage. Dùng Mutex sẽ phải share một buffer chung phức tạp, dễ deadlock. Mutex: cache
HashMap<Key, Value>đọc/ghi từ mọi thread của HTTP server — kiểu access "gửi xin / nhận trả lời" qua channel quá đắt (round-trip),Mutex<HashMap>hoặcRwLock<HashMap>hợp hơn nhiều.
Bài Tiếp Theo
Bài 238: Deadlock — Lock Order Problem — bài tiếp quay lại với shared state và Mutex để mổ xẻ kẻ thù số một của concurrent code: deadlock. Bạn sẽ thấy chỉ cần hai thread lock hai Mutex theo thứ tự ngược nhau là cả hệ thống đứng hình, học quy tắc lock ordering để phòng, và biết vì sao channel — mô hình ở bài này — gần như miễn dịch với loại lỗi đó.
