Danh sách bài viết

Bài 237: mpsc Channel — Multi-Producer Single-Consumer

Bài 237 của series Rust Cơ Bản — mpsc Channel (multi-producer single-consumer) là cơ chế giao tiếp giữa các thread trong std::sync::mpsc: nhiều thread cùng gửi (send) message vào một channel chung, và một thread duy nhất nhận (receive) chúng theo thứ tự FIFO. Đây là hiện thực Rust hoá triết lý "Do not communicate by sharing memory; instead, share memory by communicating" — thay vì để nhiều thread cùng động vào một biến (rồi phải bảo vệ bằng Mutex), bạn để một thread sở hữu state đó và mọi thread khác chỉ gửi tin nhắn tới nó. Cú pháp gọn: let (tx, rx) = mpsc::channel(); tạo cặp sender và receiver. tx.send(value) chuyển ownership của value vào channel và trả về ngay; rx.recv() block đến khi có message mới (trả Result<T, RecvError>), rx.try_recv() non-blocking trả luôn Err nếu rỗng. Để có nhiều producer, tx.clone() cho mỗi thread một bản sao — channel chỉ đóng khi tất cả bản sao bị drop. Iterator for msg in &rx tự dừng khi channel đóng. Biến thể mpsc::sync_channel(n) là bounded channel — buffer chỉ chứa tối đa n message, sender bị block khi đầy, tạo backpressure tự nhiên cho producer-consumer pipeline. Channel toả sáng cho worker pool, pipeline nhiều stage, và fan-in event; Mutex vẫn hợp khi state nhỏ và truy cập thường xuyên từ mọi phía.

10/06/2026
10 phút đọc
1 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu triết lý "share memory by communicating" (CSP của Tony Hoare, kế thừa bởi Go và Rust): thay vì nhiều thread đụng chung một biến, một thread sở hữu state và các thread khác chỉ gửi tin nhắn.
  • Viết được cặp let (tx, rx) = mpsc::channel(); và truyền tx/rx vào các thread đúng cách (ownership move).
  • Phân biệt tx.send(v) (chuyển ownership v vào channel, trả Result) với rx.recv() (block đợi, trả Result<T, RecvError>).
  • Tạo multi-producer bằng tx.clone() cho mỗi thread — biết quy tắc "channel đóng khi tất cả bản sao tx bị drop".
  • Chọn đúng giữa recv() (blocking), try_recv() (non-blocking), và iterator for msg in &rx (tự dừng khi đóng).
  • Biết khi nào dùng mpsc::sync_channel(n) thay mpsc::channel() để có bounded buffer + backpressure.
  • Biết khi nào channel tốt hơn Mutex (worker pool, pipeline, fan-in event) và ngược lại.
2

Channel Là Gì — Share By Communicating

Đến giờ, mọi cách share state giữa thread mà ta học (Mutex, RwLock, Atomic) đều theo mô hình shared memory: nhiều thread cùng nhìn vào một ô nhớ, ta dựng hàng rào để chúng không đạp lên nhau. Mô hình này mạnh nhưng khó suy luận — mỗi lock thêm vào là một cơ hội deadlock, mỗi atomic mới là một câu hỏi về memory ordering.

Có một mô hình thứ hai: message passing. Mỗi thread chỉ giữ state riêng của mình; muốn nói với thread khác, gửi một message qua một kênh giao tiếp. Người nhận đọc message theo thứ tự, xử lý, có khi gửi trả lời. Không có biến nào nhiều thread cùng đụng — không cần lock. Tony Hoare phát biểu mô hình này (CSP, 1978) bằng một câu trở thành slogan: "Do not communicate by sharing memory; instead, share memory by communicating."

Go làm phổ biến slogan này với chan. Rust hiện thực nó trong std::sync::mpsc. Tên mpsc nói thẳng kiểu channel: multi-producer single-consumer — nhiều thread cùng send, một thread duy nhất receive. Đủ cho phần lớn pattern thực tế: worker pool gửi kết quả về dispatcher, nhiều cảm biến gửi event về aggregator, nhiều scraper gửi item về writer.

Điểm tinh tế của Rust: khi bạn send(value), ownership của value bị chuyển vào channel — sender không còn dùng được nó nữa, receiver lấy ra là chủ mới. Đây không chỉ là thiết kế thẩm mỹ; nó là compile-time guarantee rằng hai thread không cùng đụng vào một String qua channel. Race condition trên message bị bóc gọn bằng borrow checker.

3

std::sync::mpsc Basics

Tạo channel bằng mpsc::channel() — trả về tuple (Sender<T>, Receiver<T>). Type T được suy luận từ message đầu tiên bạn send:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Thread con gửi một message rồi kết thúc
    thread::spawn(move || {
        let msg = String::from("hello from worker");
        tx.send(msg).unwrap();
        // ở đây không thể dùng msg nữa — ownership đã chuyển vào channel
    });

    // Thread chính nhận
    let received = rx.recv().unwrap();
    println!("got: {received}");
}

Quan sát ba điểm. (1) tx được move vào closure của thread::spawn — đó là cách hợp lệ để thread con sở hữu sender. (2) tx.send(msg) trả Result<(), SendError<T>>; SendError chỉ xảy ra khi receiver đã bị drop — không có chỗ để message đi nữa. (3) rx.recv() block đến khi có message; trả Ok(msg) khi nhận được, trả Err(RecvError) khi channel đã đóng và không còn message nào.

Channel là unbounded theo mặc định: send không bao giờ block vì hết chỗ — buffer co giãn theo bộ nhớ. Đây tiện nhưng nguy hiểm nếu producer nhanh hơn consumer rất nhiều: bộ nhớ cứ phình ra cho đến khi OOM. Mục 7 ta sẽ học bản bounded để chống vấn đề này.

Một điểm lưu ý quan trọng: Receiver không Synckhông Clone — đây là chữ single-consumer trong "mpsc". Nếu cần nhiều consumer, dùng crate ngoài crossbeam-channel (mpmc).

4

Multi-Producer: Clone tx

Chữ multi-producer nằm ở chỗ Sender<T> implement Clone. Mỗi lần tx.clone() bạn có thêm một sender mới chia sẻ cùng channel. Mỗi thread cầm một bản, gửi độc lập, receiver nhận tất cả vào cùng một hàng đợi FIFO:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Spawn 4 producer, mỗi producer gửi 3 message
    for id in 0..4 {
        let tx = tx.clone();
        thread::spawn(move || {
            for i in 0..3 {
                let msg = format!("worker-{id} msg-{i}");
                tx.send(msg).unwrap();
                thread::sleep(Duration::from_millis(10));
            }
            // tx bị drop ở cuối closure — bớt một bản sao
        });
    }

    // Quan trọng: drop bản tx gốc ở thread chính,
    // nếu không vòng for dưới sẽ chờ mãi vì channel chưa đóng.
    drop(tx);

    // Iterator tự dừng khi tất cả tx bị drop
    for msg in rx {
        println!("got: {msg}");
    }
}

Quan sát bốn điểm. (1) tx.clone() rẻ — bên trong là Arc, clone chỉ tăng refcount. (2) Thứ tự message giữa các producer không xác định (interleave theo lịch OS); chỉ thứ tự trong cùng một sender là FIFO. (3) for msg in rx tương đương while let Ok(msg) = rx.recv() — tự kết thúc khi recv trả Err. (4) Channel chỉ đóng khi tất cả bản sao tx đã drop; đó là vì sao ta phải drop(tx) ở main — bản gốc vẫn sống nếu không.

Pitfall phổ biến: quên drop bản tx ở thread chính rồi ngạc nhiên vì chương trình treo mãi. Quy tắc: nếu thread chính chỉ làm consumer, hãy clone tx ra rồi drop bản gốc ngay sau khi spawn xong producer.

5

recv() Blocking vs try_recv() Non-Blocking

Receiver có ba cách lấy message tuỳ nhu cầu:

  • rx.recv()block đến khi có message hoặc channel đóng. Trả Result<T, RecvError>. Phù hợp khi consumer chỉ làm mỗi việc đọc channel.
  • rx.try_recv()non-blocking, trả ngay. Ok(msg) nếu có, Err(TryRecvError::Empty) nếu rỗng nhưng channel còn sống, Err(TryRecvError::Disconnected) nếu đã đóng. Phù hợp khi consumer còn việc khác để làm, chỉ poll channel xen kẽ.
  • rx.recv_timeout(Duration) — block tối đa Duration; trả Err(RecvTimeoutError::Timeout) nếu hết giờ. Phù hợp khi muốn vừa đợi vừa có deadline.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel::<u32>();

    thread::spawn(move || {
        for i in 1..=3 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_millis(50));
        }
    });

    // Poll non-blocking trong khi vẫn làm việc khác
    loop {
        match rx.try_recv() {
            Ok(n) => println!("got {n}"),
            Err(mpsc::TryRecvError::Empty) => {
                // không có gì, làm việc khác rồi quay lại
                thread::sleep(Duration::from_millis(10));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("channel closed");
                break;
            }
        }
    }
}

Quy tắc chọn: nếu consumer là worker chuyên ăn messagerecv() trong vòng while let Ok. Nếu consumer là event loop đang tick mỗi 16 ms (game, UI, real-time) → try_recv. Nếu muốn shutdown êm sau N giây không có message → recv_timeout.

6

Channel Đóng — Drop tx → recv Trả Err

Channel không có method close() tường minh. Vòng đời của nó tự suy ra từ ownership:

  • Khi tất cả bản sao Sender bị drop, không ai có thể send nữa. rx.recv() sau khi đã drain hết buffer sẽ trả Err(RecvError).
  • Khi Receiver bị drop, mọi tx.send(v) tiếp theo trả Err(SendError(v)) — message được trả lại để bạn xử lý.

Đây là idiom kết thúc tự nhiên của pipeline. Producer xong việc thì closure kết thúc, tx drop. Khi cái cuối cùng drop, receiver tự biết "không còn ai gửi nữa", vòng for kết thúc êm:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    let producer = thread::spawn(move || {
        for i in 1..=3 {
            tx.send(i).unwrap();
        }
        // tx drop ở đây → channel đóng
    });

    // Iterator dừng khi channel đóng — không cần "sentinel value" như None
    for msg in rx {
        println!("got: {msg}");
    }

    producer.join().unwrap();
}

Pattern này tránh được "magic value" kiểu send(None) để báo hết. Ownership của Rust tự làm signal — đẹp và an toàn.

Pitfall: nếu bạn không chuyển tx vào closure (vô tình giữ lại ngoài main), bản đó sống đến hết main, vòng for không bao giờ dừng. Khi viết code phức tạp, kiểm tra "ai đang giữ tx?" là việc số một khi debug treo channel.

7

sync_channel — Bounded Và Backpressure

Hàm mpsc::channel() trả về channel unbounded: send luôn thành công ngay, message xếp hàng vô hạn. Khi consumer chậm hơn producer, hàng đợi phình lên không giới hạn — kịch bản OOM kinh điển.

Giải pháp là mpsc::sync_channel(capacity)bounded. Buffer chứa tối đa capacity message; producer thứ capacity+1 sẽ bị block trong send đến khi consumer lấy ra một chỗ. Đây gọi là backpressure: hệ thống tự ép producer chạy chậm bằng tốc độ consumer.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Buffer 2 message; producer thứ 3 bị block trong send
    let (tx, rx) = mpsc::sync_channel::<u32>(2);

    thread::spawn(move || {
        for i in 1..=5 {
            println!("producer: trying send {i}");
            tx.send(i).unwrap(); // block khi buffer đầy
            println!("producer: sent {i}");
        }
    });

    // Consumer chậm
    thread::sleep(Duration::from_millis(200));
    for msg in rx {
        println!("consumer: got {msg}");
        thread::sleep(Duration::from_millis(100));
    }
}

Output cho thấy producer in "sent 1", "sent 2" ngay rồi treo ở "trying send 3" cho đến khi consumer ăn bớt một message. Đó là backpressure đang làm việc — không phải code chậm, mà là thiết kế chậm để giữ bộ nhớ ổn định.

Trường hợp đặc biệt: sync_channel(0)rendezvous channel — không có buffer, send chỉ trả về khi đồng thờirecv đang đợi. Producer và consumer "bắt tay" trực tiếp, hữu ích khi muốn handoff đồng bộ tuyệt đối.

Quy tắc tay vịn: mặc định dùng sync_channel(N) với N là số message tối đa bạn chịu được khi consumer ngừng vài giây. Chỉ chọn channel() unbounded khi bạn chắc producer giới hạn (ví dụ chỉ gửi một lần khi shutdown).

8

Khi Nào Channel Tốt Hơn Mutex

Channel và Mutex giải hai bài toán khác nhau; hiểu sai chỗ này dẫn đến code "đúng nhưng xấu". Vài hướng dẫn thực dụng:

Dùng channel khi:

  • Pipeline có nhiều stage — stage A đọc input, gửi sang B parse, B gửi sang C ghi DB. Mỗi stage là một thread, channel nối hai stage. Code đọc thẳng từ trên xuống, không lock chồng chéo.
  • Worker pool fan-in — N worker xử lý task song song, kết quả dồn về một dispatcher qua channel. Mỗi worker giữ bản tx.clone(), dispatcher giữ rx.
  • Event aggregation — nhiều nguồn sự kiện (sensor, network, timer) dồn vào một thread tổng hợp.
  • Khi state cần truyền là một "đơn vị công việc" hoàn chỉnh (request, task struct, parsed record) chứ không phải trường nhỏ cần cập nhật tại chỗ.

Vẫn dùng Mutex khi:

  • State nhỏ, truy cập từ mọi phía — ví dụ cache HashMap, config dynamic. Đi qua channel kiểu "gửi xin / nhận trả lời" tốn round-trip latency, không đáng.
  • Cần truy cập đồng bộ ngay tại chỗ — kiểu balance += amount không thể chờ một thread quản lý balance trả lời.
  • Nhiều consumer cùng đọc một datasetRwLock phù hợp hơn; channel chỉ single-consumer.

Ví dụ pipeline hai stage (read → upper-case → print) qua channel — bạn sẽ thấy code "tuyến tính" như nói chuyện:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx_in, rx_in)   = mpsc::sync_channel::<String>(8); // stage A → B
    let (tx_out, rx_out) = mpsc::sync_channel::<String>(8); // stage B → C

    // Stage A: producer
    thread::spawn(move || {
        for line in ["hello", "rust", "mpsc"].iter() {
            tx_in.send(line.to_string()).unwrap();
        }
    });

    // Stage B: transform
    thread::spawn(move || {
        for s in rx_in {
            tx_out.send(s.to_uppercase()).unwrap();
        }
        // rx_in đóng → for kết thúc → tx_out drop → rx_out cũng đóng theo
    });

    // Stage C: consumer (main)
    for s in rx_out {
        println!("{s}");
    }
}

Không có lock. Không có shared state. Mỗi stage là một thread sở hữu state nội bộ riêng. Khi nguồn cạn, signal "đóng" tự lan dọc pipeline. Đây là sức mạnh của message passing.

9

Tổng Kết

  • mpsc Channel = multi-producer single-consumer: nhiều thread send, một thread recv. Hiện thực Rust của triết lý CSP "share memory by communicating" thay vì "communicate by sharing memory".
  • Tạo: let (tx, rx) = mpsc::channel();. Gửi: tx.send(v) — chuyển ownership v vào channel, trả Result. Nhận: rx.recv() block đợi, trả Result<T, RecvError>.
  • Multi-producer: tx.clone() cho mỗi thread. Channel đóng khi tất cả bản sao tx bị drop — nhớ drop(tx) bản gốc ở main, kẻo for-loop chờ mãi.
  • Ba cách nhận: recv() block, try_recv() non-blocking, recv_timeout(d) block có deadline. Iterator for msg in rx tự dừng khi channel đóng.
  • Channel đóng = tất cả sender drop. recv trả Err(RecvError). Sender còn send sau khi receiver drop sẽ nhận Err(SendError(v)) — message trả lại.
  • sync_channel(n): bounded buffer dung lượng n; send block khi đầy → backpressure. sync_channel(0) = rendezvous (handoff đồng bộ).
  • Channel hợp khi: pipeline nhiều stage, worker pool fan-in, event aggregation, đơn vị công việc hoàn chỉnh.
  • Mutex hợp khi: state nhỏ truy cập từ mọi phía, cần truy cập đồng bộ ngay tại chỗ, nhiều consumer đọc cùng dataset.
  • Cần multi-consumer (mpmc)? std::sync::mpsc không hỗ trợ — dùng crate crossbeam-channel.
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. Slogan "share memory by communicating" có ý gì? Tại sao mô hình message passing dễ suy luận hơn shared-memory + lock?
  2. Vì sao khi tx.send(v) rồi, bạn không thể dùng v nữa? Borrow checker đảm bảo điều gì khi message qua channel?
  3. Bạn spawn 4 producer, mỗi producer clone tx riêng, nhưng vòng for msg in rx ở main không bao giờ kết thúc. Lý do thường gặp nhất là gì? Cách fix?
  4. Khác biệt giữa rx.recv()rx.try_recv()? Khi nào chọn cái nào — cho một use case cụ thể cho mỗi cái.
  5. So sánh mpsc::channel()mpsc::sync_channel(100) ở mặt buffering và backpressure. Vì sao sync_channel thường an toàn hơn cho production?
  6. Cho ví dụ một bài toán nên dùng channel (không nên dùng Mutex), và ngược lại một bài toán nên dùng Mutex (không nên dùng channel).
Đáp án
  1. "Share memory by communicating" = thay vì cho nhiều thread cùng đụng vào một biến (phải bảo vệ bằng lock), một thread sở hữu state đó, thread khác chỉ gửi message tới nó. Dễ suy luận hơn vì: (a) không có race condition trên message (ownership chuyển nguyên), (b) flow data tuyến tính theo channel — đọc code thấy ngay ai gửi cho ai, (c) không lo deadlock từ lock-ordering vì không có lock.
  2. send nhận v theo value (move) — ownership của v đã chuyển vào channel rồi qua tay receiver. Borrow checker đảm bảo compile-time rằng sender và receiver không cùng đụng vào dữ liệu — race condition trên content message bị bóc gọn ngay khi compile, không cần runtime check.
  3. Lý do thường gặp: bản tx gốc ở main chưa bị drop. Channel chỉ đóng khi tất cả sender drop; bản gốc còn sống thì rx.recv() trong vòng for vẫn block đợi. Fix: gọi drop(tx); ngay sau khi spawn xong producer cuối cùng — hoặc đặt tx gốc trong scope hẹp { } để nó drop sớm.
  4. recv() block đến khi có message hoặc channel đóng — phù hợp consumer chuyên ăn message (worker pool dispatcher). try_recv() trả ngay, có hoặc Empty/Disconnected — phù hợp event loop game/UI tick 16 ms, vừa poll channel vừa làm việc khác.
  5. channel() unbounded — send không bao giờ block, buffer phình theo memory; nếu producer nhanh hơn consumer lâu dài → OOM. sync_channel(100) bounded — send block khi buffer chứa đủ 100 message; tự ép producer chạy bằng tốc consumer (backpressure). Production thường ưu tiên sync_channel vì giới hạn được footprint memory, biến lỗi "consumer chậm" thành "producer chạy chậm hơn" thay vì "OOM crash".
  6. Channel: pipeline ETL — stage A đọc file, B parse, C ghi DB; mỗi stage một thread, channel nối hai stage. Dùng Mutex sẽ phải share một buffer chung phức tạp, dễ deadlock. Mutex: cache HashMap<Key, Value> đọc/ghi từ mọi thread của HTTP server — kiểu access "gửi xin / nhận trả lời" qua channel quá đắt (round-trip), Mutex<HashMap> hoặc RwLock<HashMap> hợp hơn nhiều.
11

Bài Tiếp Theo

Bài 238: Deadlock — Lock Order Problem — bài tiếp quay lại với shared state và Mutex để mổ xẻ kẻ thù số một của concurrent code: deadlock. Bạn sẽ thấy chỉ cần hai thread lock hai Mutex theo thứ tự ngược nhau là cả hệ thống đứng hình, học quy tắc lock ordering để phòng, và biết vì sao channel — mô hình ở bài này — gần như miễn dịch với loại lỗi đó.