Danh sách bài viết

Bài 256: tokio::sync::oneshot Channel

Bài 256 của series Rust Cơ Bản — tokio::sync::oneshot là channel chuyên dụng cho pattern chỉ gửi đúng 1 message: tx.send(value) không .await (đồng bộ, non-blocking) và consume luôn tx sau khi gọi — không thể gửi lần thứ hai. rx.await trả Result<T, RecvError>: Ok khi nhận được giá trị, Err(RecvError) khi tx bị drop trước khi send. Use case kinh điển là request-response giữa task: caller tạo oneshot::channel(), đẩy tx vào task worker đang spawn, rồi await rx để nhận kết quả mà không cần shared state hay Vec<Option<T>> global. Oneshot rẻ hơn nhiều so với mpsc::channel(1) vì semantics đơn giản hơn (không cần capacity tracking, không cần lock condition variable) và compile-time đảm bảo "chỉ 1 lần". Bài này demo gửi/nhận cơ bản, pattern spawn-compute-await, lỗi RecvError khi tx drop, và so sánh trực diện với mpsc(1).

10/06/2026
9 phút đọc
1 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu tokio::sync::oneshot là channel single-shot: gửi đúng 1 message, tx bị consume sau khi send, không thể gửi lần thứ hai.
  • Viết được pattern khởi tạo: let (tx, rx) = oneshot::channel::<T>(); trả về một cặp sender/receiver gắn liền.
  • Hiểu tx.send(v) không cần .await — đồng bộ, trả Result tức thì (vì không có buffer cần chờ).
  • Viết được idiom request-response giữa task: spawn task, gửi tx vào task, rx.await ở caller để chờ kết quả.
  • Phân biệt rx.await trả Ok(T) (nhận được) vs Err(RecvError) (tx bị drop trước khi send) — và xử lý đúng cả hai.
  • So sánh oneshot với mpsc::channel(1) — vì sao oneshot tốt hơn cho use case 1-send 1-recv (rẻ hơn, semantics rõ hơn, compile-time enforce).
2

oneshot Là Gì?

tokio::sync::oneshotsingle-producer single-consumer single-message channel: chỉ một Sender, một Receiver, và chỉ truyền tải đúng một giá trị. Sau khi message được gửi, channel coi như kết thúc nhiệm vụ — tx không còn tồn tại để gửi tiếp, rx sau khi nhận xong cũng vô dụng.

Khác biệt so với mpsc (đã học ở Bài 255):

  • oneshot: 1 send, 1 recv. Không buffer (capacity = 1 cố định), không clone Sender.
  • mpsc: N send (clone Sender), 1 recv, buffer có capacity, có thể send nhiều lần.

Vì semantics đơn giản hơn, oneshot có triển khai cực rẻ: chỉ một slot atomic và một Waker. Không cần lock, không cần queue, không cần backpressure logic. Đây là building block cơ bản nhất của tokio — bản thân tokio::spawn(...).await bên trong cũng dùng oneshot để truyền kết quả task ra JoinHandle.

Đặc tính then chốt cần nhớ: tx bị consume sau send. Cú pháp Rust phản ánh điều này — send(self, ...) nhận self by value (không phải &self), nghĩa là gọi xong thì tx ra khỏi scope vĩnh viễn. Compiler chặn ngay lúc compile-time nếu bạn cố gắng gửi lần thứ hai.

3

API: oneshot::channel::<T>()

Khởi tạo trả về tuple (Sender<T>, Receiver<T>):

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    // Tạo cặp tx/rx — không cần capacity vì chỉ 1 message
    let (tx, rx) = oneshot::channel::<i32>();

    // Spawn task gửi giá trị
    tokio::spawn(async move {
        // send(self, value) — consume tx, KHÔNG cần .await
        // Trả Result<(), T> — Err(value) nếu rx đã drop
        let _ = tx.send(42);
    });

    // Await rx ở main task để nhận giá trị
    // rx.await trả Result<T, RecvError>
    match rx.await {
        Ok(value) => println!("nhận được: {}", value),
        Err(_) => println!("tx đã drop trước khi send"),
    }
}
// In: nhận được: 42

Một vài điểm cần để ý ngay từ snippet đầu tiên:

  • Không có turbofish bắt buộc — kiểu T thường được suy luận từ send hoặc từ pattern match trên rx.await. Trong ví dụ trên, ghi rõ ::<i32>() chỉ để minh hoạ.
  • rx không cần mut ở khai báo (khác với rx của mpsc), vì rx.await consume luôn self.
  • Không có method close(), không có method is_empty(). Vòng đời channel cực ngắn: tạo → send → await → vứt.

Method khác hữu ích: tx.is_closed() kiểm tra rx đã drop chưa (caller không còn quan tâm), tx.closed().await chờ event rx drop để hủy việc tính toán sớm (cancellation pattern).

4

Use Case: Request-Response Giữa Task

Use case kinh điển: caller spawn task tính toán nặng, chờ kết quả. Vì tokio::spawn đã có JoinHandle::await trả luôn kết quả, oneshot thật sự toả sáng khi pattern phức tạp hơn — ví dụ task đang sẵn có vòng đời dài và caller chỉ muốn "đặt câu hỏi" rồi nhận trả lời:

use tokio::sync::{mpsc, oneshot};

// Worker nhận command, mỗi command kèm 1 oneshot::Sender để trả kết quả
enum Command {
    Compute {
        input: u32,
        resp: oneshot::Sender<u64>,
    },
}

#[tokio::main]
async fn main() {
    let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command>(10);

    // Worker task chạy nền, xử lý command tuần tự
    tokio::spawn(async move {
        while let Some(cmd) = cmd_rx.recv().await {
            match cmd {
                Command::Compute { input, resp } => {
                    let result = (input as u64).pow(2);
                    // Gửi kết quả qua oneshot, consume resp
                    let _ = resp.send(result);
                }
            }
        }
    });

    // Caller: đặt câu hỏi và chờ trả lời
    let (resp_tx, resp_rx) = oneshot::channel();
    cmd_tx.send(Command::Compute { input: 9, resp: resp_tx }).await.unwrap();
    let answer = resp_rx.await.unwrap();
    println!("9^2 = {}", answer); // In: 9^2 = 81
}

Pattern này là backbone của nhiều thư viện thực tế: actor model (actix), database driver (sqlx connection pool), Redis client. Mỗi request là một oneshot độc lập — không lẫn lộn response giữa các caller, không cần map request_id → response.

Ưu điểm cụ thể của approach này:

  • Worker không cần biết caller là ai — chỉ resp.send(result) là xong.
  • Caller block đúng future cần thiết — không cần polling, không cần shared state.
  • Tài nguyên tự dọn: sau khi send/await, cả txrx drop, không leak.
5

tx.send() Trả Result — Non-Blocking

Signature chính xác:

pub fn send(self, t: T) -> Result<(), T>

Ba điểm quan trọng:

  • Không .await: vì không có buffer để chờ. Channel chỉ chứa đúng 1 slot, và send chỉ gọi đúng 1 lần — không có khái niệm "buffer đầy".
  • Consume tx: nhận self by value. Sau khi gọi, tx ra khỏi scope. Cố gọi lần thứ hai = compile error "value used here after move".
  • Trả Result<(), T>: Ok(()) khi gửi thành công; Err(value) khi Receiver đã drop — value không gửi đi được trả lại nguyên vẹn cho caller xử lý (vd log, retry với channel khác).
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<String>();

    // Drop rx ngay - mô phỏng caller không còn quan tâm
    drop(rx);

    // tx.send trả Err vì rx đã drop, value bị trả lại
    match tx.send("hello".to_string()) {
        Ok(()) => println!("gửi OK"),
        Err(value) => println!("rx đã drop, value bị trả lại: {}", value),
    }
}
// In: rx đã drop, value bị trả lại: hello

send không async, có thể gọi từ context sync — ví dụ trong callback non-async, trong Drop::drop, hay trong code C FFI. Đặc tính này khiến oneshot trở thành cầu nối thuận lợi giữa sync world và async world.

6

rx.await Là Future

Receiver<T> tự nó một Future với Output = Result<T, RecvError>. Cú pháp gọn:

let value: Result<i32, _> = rx.await;

Không có method recv() riêng — rx future, dùng trực tiếp với .await. Điều này cũng có nghĩa rx có thể truyền vào tokio::select!, tokio::join!, tokio::time::timeout như bất kỳ future nào khác:

use tokio::sync::oneshot;
use tokio::time::{timeout, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<u32>();

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        let _ = tx.send(100);
    });

    // Bọc rx trong timeout 500ms — sẽ hết hạn trước khi tx send
    match timeout(Duration::from_millis(500), rx).await {
        Ok(Ok(value)) => println!("nhận được: {}", value),
        Ok(Err(_)) => println!("tx drop"),
        Err(_) => println!("timeout sau 500ms"),
    }
}
// In: timeout sau 500ms

Một rx chỉ await được một lần (vì await consume future). Nếu cần chia kết quả cho nhiều consumer, dùng broadcast (sẽ học ở bài tiếp) hoặc clone giá trị sau khi nhận xong.

7

Lỗi RecvError — tx Drop Trước Khi Send

RecvError chỉ có một lý do duy nhất: Sender bị drop mà không gọi send. Đây là tín hiệu "task định trả lời đã chết hoặc hủy giữa chừng — không bao giờ có câu trả lời":

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        // Task panic giữa chừng, tx bị drop mà chưa kịp send
        panic!("worker crashed");
        // let _ = tx.send("never".to_string()); // không bao giờ chạy
    });

    match rx.await {
        Ok(value) => println!("nhận: {}", value),
        Err(e) => println!("lỗi: {} (tx drop trước khi send)", e),
    }
}
// In: lỗi: channel closed (tx drop trước khi send)

Pitfall cần để ý: RecvError không phân biệt được giữa "task panic" và "task hủy bỏ có kiểm soát" — cả hai đều rơi vào nhánh Err. Nếu cần biết rõ lý do, gói thêm dữ liệu trong tuple (JoinHandle, oneshot::Receiver) rồi join! cả hai.

Ngược lại, cũng có trường hợp cố ý drop tx để báo "hủy yêu cầu": trong actor pattern, nếu actor quyết định không xử lý command (vd queue đầy, shutdown), nó chỉ cần drop resp_tx — caller tự động nhận Err(RecvError) và biết rằng yêu cầu bị từ chối.

Pattern xử lý điển hình:

let result = match resp_rx.await {
    Ok(v) => v,
    Err(_) => {
        // Worker không trả lời — fallback hoặc bubble error
        return Err("worker unavailable".into());
    }
};
8

So Sánh mpsc(1) vs oneshot

Câu hỏi tự nhiên: mpsc::channel(1) cũng là bounded capacity 1 — sao không dùng luôn. Cùng làm được nhưng không tương đương:

// Cách thừa: dùng mpsc(1) cho single-shot
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel::<i32>(1);
tokio::spawn(async move {
    tx.send(42).await.unwrap(); // CẦN .await dù chỉ 1 lần
});
let v = rx.recv().await.unwrap(); // Trả Option, phải unwrap thêm 1 lần

// Cách đúng: dùng oneshot
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel::<i32>();
tokio::spawn(async move {
    tx.send(42).unwrap(); // KHÔNG .await, consume tx luôn
});
let v = rx.await.unwrap(); // Trả Result trực tiếp

So sánh trực diện:

  • Cost: oneshot ~ 1 atomic slot + 1 Waker. mpsc(1) có thêm queue logic, lock condition variable, counter. Oneshot nhanh hơn ~2-3x trong benchmark microbench.
  • Semantics: oneshot compile-time enforce "1 send" qua self consume; mpsc(1) chỉ runtime — có thể accidentally send lần thứ hai (bị send().await block vì buffer đầy).
  • API: oneshot send không async, gọi từ sync context được; mpsc send().await bắt buộc trong async context.
  • Reader: rx.await với oneshot trả Result tự nhiên; mpsc recv().await trả Option phải xử lý thêm None case (channel đóng vs không có message).

Quy tắc: hễ pattern là "1 send, 1 recv" → dùng oneshot. Mọi pattern khác (N send, N recv, send nhiều lần) → dùng mpsc/broadcast/watch.

9

Tổng Kết

  • tokio::sync::oneshot là channel single-shot: 1 sender, 1 receiver, 1 message. Building block rẻ và nhanh cho async coordination.
  • API: let (tx, rx) = oneshot::channel::<T>(); trả tuple (Sender, Receiver) không cần capacity.
  • tx.send(value) -> Result<(), T>: không async, consume tx sau khi gọi. Err chứa lại value nếu rx đã drop.
  • rx.await -> Result<T, RecvError>: Receiver bản thân là Future. Err(RecvError) chỉ xảy ra khi tx bị drop trước khi send.
  • Use case kinh điển: request-response giữa caller và worker task. Worker nhận command kèm oneshot::Sender để trả kết quả.
  • So với mpsc(1): oneshot rẻ hơn, compile-time enforce "1 send", send không cần await, semantics rõ ràng hơn.
  • Compose với tokio::time::timeout, tokio::select! dễ dàng vì Receiver là Future tiêu chuẩn.
  • tx.closed().await giúp worker phát hiện caller hủy yêu cầu sớm để bỏ tính toán không cần thiết (cancellation).
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. Vì sao oneshot::Sender::send không cần .await trong khi mpsc::Sender::send bounded thì cần? Mô tả khác biệt internal mechanism.
  2. Đoạn code let (tx, rx) = oneshot::channel::<i32>(); tx.send(1).unwrap(); tx.send(2).unwrap(); sẽ fail như thế nào — compile-time hay runtime? Vì sao?
  3. Trong pattern request-response qua mpsc + oneshot, vì sao oneshot phải gắn trong từng Command mà không thể tạo một oneshot duy nhất ngoài worker rồi dùng lại?
  4. Viết code: spawn task tính số Fibonacci thứ 30, dùng oneshot để main task await kết quả. Xử lý cả trường hợp task panic giữa chừng (trả về Err(RecvError)).
  5. So sánh: dùng tokio::spawn(async {...}).await trực tiếp vs dùng oneshot::channel để truyền kết quả ra. Khi nào pattern oneshot có lợi thế?
  6. Method tx.closed().await dùng để làm gì? Viết ví dụ worker tính toán nặng, kiểm tra closed() giữa các bước để hủy sớm nếu caller không còn quan tâm.
Đáp án
  1. oneshot không có buffer cần chờ — channel chỉ chứa 1 slot và send chỉ gọi 1 lần, không bao giờ phải đợi "slot trống" như mpsc bounded. Internal: oneshot dùng 1 atomic slot + 1 Waker; send chỉ swap atomic và đánh thức Waker → đồng bộ, tức thì. mpsc bounded có queue + condition variable; khi buffer đầy, send phải đăng ký Waker và yield task chờ slot trống → bắt buộc async.
  2. Compile-time fail với lỗi "value used here after move". send(self, ...) nhận self by value — sau dòng tx.send(1), tx đã bị move (consumed), dòng tx.send(2) không còn tx để gọi. Đây chính là điểm mạnh oneshot so với mpsc(1) — compiler enforce "1 send" thay vì runtime.
  3. oneshot::Sender bị consume sau send đầu tiên. Nếu dùng chung 1 oneshot cho nhiều command, command thứ 2 sẽ không có tx để trả lời. Hơn nữa, response của command 1 và command 2 sẽ lẫn lộn nếu nhiều caller cùng chờ trên cùng 1 rx (mà oneshot rx chỉ await được 1 lần). Mỗi command tạo oneshot riêng đảm bảo response đúng caller, đúng request.
  4. use tokio::sync::oneshot;
    fn fib(n: u32) -> u64 { if n < 2 { n as u64 } else { fib(n-1) + fib(n-2) } }
    #[tokio::main]
    async fn main() {
      let (tx, rx) = oneshot::channel::<u64>();
      tokio::spawn(async move {
        let result = fib(30);
        let _ = tx.send(result);
      });
      match rx.await {
        Ok(v) => println!("fib(30) = {}", v),
        Err(_) => eprintln!("worker chết trước khi trả kết quả"),
      }
    }
  5. spawn().await qua JoinHandle đủ dùng khi caller chính là chủ task. oneshot có lợi thế khi: (a) task đã chạy nền sẵn (vd worker dài hạn) và caller chỉ "gửi câu hỏi"; (b) cần truyền tx vào nhiều middleware layer trước khi tới worker; (c) cần phân biệt "worker chưa trả lời" vs "worker đã trả lời nhưng panic" — JoinHandle gộp cả hai vào JoinError, oneshot tách rõ qua RecvError vs giá trị trả về.
  6. tx.closed().await trả future hoàn thành khi rx bị drop — cho phép worker phát hiện sớm rằng caller không còn quan tâm và hủy tính toán. Ví dụ: tokio::select! { _ = tx.closed() => { println!("caller huỷ, dừng tính toán"); return; }, result = heavy_compute() => { let _ = tx.send(result); } } — worker chạy đua giữa "tính xong" và "caller bỏ"; bên nào tới trước thắng, branch còn lại bị cancel an toàn.
11

Bài Tiếp Theo

Bài 257: tokio::sync::broadcast Channel — giới thiệu channel nhiều sender nhiều receiver, mỗi receiver nhận đủ mọi message (pub/sub local). Bạn sẽ học về RecvError::Lagged khi consumer chậm hơn buffer size, cách clone Receiver ngược lại với mpsc, và use case shutdown signal phát đồng loạt tới tất cả task con.