Mục lục
Mục Tiêu Bài Học
Sau bài học, bạn sẽ:
- Hiểu
tokio::sync::oneshotlà channel single-shot: gửi đúng 1 message,txbị consume sau khisend, không thể gửi lần thứ hai. - Viết được pattern khởi tạo:
let (tx, rx) = oneshot::channel::<T>();trả về một cặp sender/receiver gắn liền. - Hiểu
tx.send(v)không cần.await— đồng bộ, trảResulttức thì (vì không có buffer cần chờ). - Viết được idiom request-response giữa task: spawn task, gửi
txvào task,rx.awaitở caller để chờ kết quả. - Phân biệt
rx.awaittrảOk(T)(nhận được) vsErr(RecvError)(tx bị drop trước khi send) — và xử lý đúng cả hai. - So sánh oneshot với
mpsc::channel(1)— vì sao oneshot tốt hơn cho use case 1-send 1-recv (rẻ hơn, semantics rõ hơn, compile-time enforce).
oneshot Là Gì?
tokio::sync::oneshot là single-producer single-consumer single-message channel: chỉ một Sender, một Receiver, và chỉ truyền tải đúng một giá trị. Sau khi message được gửi, channel coi như kết thúc nhiệm vụ — tx không còn tồn tại để gửi tiếp, rx sau khi nhận xong cũng vô dụng.
Khác biệt so với mpsc (đã học ở Bài 255):
- oneshot: 1 send, 1 recv. Không buffer (capacity = 1 cố định), không clone
Sender. - mpsc: N send (clone
Sender), 1 recv, buffer có capacity, có thể send nhiều lần.
Vì semantics đơn giản hơn, oneshot có triển khai cực rẻ: chỉ một slot atomic và một Waker. Không cần lock, không cần queue, không cần backpressure logic. Đây là building block cơ bản nhất của tokio — bản thân tokio::spawn(...).await bên trong cũng dùng oneshot để truyền kết quả task ra JoinHandle.
Đặc tính then chốt cần nhớ: tx bị consume sau send. Cú pháp Rust phản ánh điều này — send(self, ...) nhận self by value (không phải &self), nghĩa là gọi xong thì tx ra khỏi scope vĩnh viễn. Compiler chặn ngay lúc compile-time nếu bạn cố gắng gửi lần thứ hai.
API: oneshot::channel::<T>()
Khởi tạo trả về tuple (Sender<T>, Receiver<T>):
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
// Tạo cặp tx/rx — không cần capacity vì chỉ 1 message
let (tx, rx) = oneshot::channel::<i32>();
// Spawn task gửi giá trị
tokio::spawn(async move {
// send(self, value) — consume tx, KHÔNG cần .await
// Trả Result<(), T> — Err(value) nếu rx đã drop
let _ = tx.send(42);
});
// Await rx ở main task để nhận giá trị
// rx.await trả Result<T, RecvError>
match rx.await {
Ok(value) => println!("nhận được: {}", value),
Err(_) => println!("tx đã drop trước khi send"),
}
}
// In: nhận được: 42
Một vài điểm cần để ý ngay từ snippet đầu tiên:
- Không có turbofish bắt buộc — kiểu
Tthường được suy luận từsendhoặc từ pattern match trênrx.await. Trong ví dụ trên, ghi rõ::<i32>()chỉ để minh hoạ. rxkhông cầnmutở khai báo (khác vớirxcủa mpsc), vìrx.awaitconsume luônself.- Không có method
close(), không có methodis_empty(). Vòng đời channel cực ngắn: tạo → send → await → vứt.
Method khác hữu ích: tx.is_closed() kiểm tra rx đã drop chưa (caller không còn quan tâm), tx.closed().await chờ event rx drop để hủy việc tính toán sớm (cancellation pattern).
Use Case: Request-Response Giữa Task
Use case kinh điển: caller spawn task tính toán nặng, chờ kết quả. Vì tokio::spawn đã có JoinHandle::await trả luôn kết quả, oneshot thật sự toả sáng khi pattern phức tạp hơn — ví dụ task đang sẵn có vòng đời dài và caller chỉ muốn "đặt câu hỏi" rồi nhận trả lời:
use tokio::sync::{mpsc, oneshot};
// Worker nhận command, mỗi command kèm 1 oneshot::Sender để trả kết quả
enum Command {
Compute {
input: u32,
resp: oneshot::Sender<u64>,
},
}
#[tokio::main]
async fn main() {
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command>(10);
// Worker task chạy nền, xử lý command tuần tự
tokio::spawn(async move {
while let Some(cmd) = cmd_rx.recv().await {
match cmd {
Command::Compute { input, resp } => {
let result = (input as u64).pow(2);
// Gửi kết quả qua oneshot, consume resp
let _ = resp.send(result);
}
}
}
});
// Caller: đặt câu hỏi và chờ trả lời
let (resp_tx, resp_rx) = oneshot::channel();
cmd_tx.send(Command::Compute { input: 9, resp: resp_tx }).await.unwrap();
let answer = resp_rx.await.unwrap();
println!("9^2 = {}", answer); // In: 9^2 = 81
}
Pattern này là backbone của nhiều thư viện thực tế: actor model (actix), database driver (sqlx connection pool), Redis client. Mỗi request là một oneshot độc lập — không lẫn lộn response giữa các caller, không cần map request_id → response.
Ưu điểm cụ thể của approach này:
- Worker không cần biết caller là ai — chỉ
resp.send(result)là xong. - Caller block đúng future cần thiết — không cần polling, không cần shared state.
- Tài nguyên tự dọn: sau khi send/await, cả
txvàrxdrop, không leak.
tx.send() Trả Result — Non-Blocking
Signature chính xác:
pub fn send(self, t: T) -> Result<(), T>
Ba điểm quan trọng:
- Không
.await: vì không có buffer để chờ. Channel chỉ chứa đúng 1 slot, vàsendchỉ gọi đúng 1 lần — không có khái niệm "buffer đầy". - Consume
tx: nhậnselfby value. Sau khi gọi,txra khỏi scope. Cố gọi lần thứ hai = compile error "value used here after move". - Trả
Result<(), T>:Ok(())khi gửi thành công;Err(value)khiReceiverđã drop — value không gửi đi được trả lại nguyên vẹn cho caller xử lý (vd log, retry với channel khác).
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<String>();
// Drop rx ngay - mô phỏng caller không còn quan tâm
drop(rx);
// tx.send trả Err vì rx đã drop, value bị trả lại
match tx.send("hello".to_string()) {
Ok(()) => println!("gửi OK"),
Err(value) => println!("rx đã drop, value bị trả lại: {}", value),
}
}
// In: rx đã drop, value bị trả lại: hello
Vì send không async, có thể gọi từ context sync — ví dụ trong callback non-async, trong Drop::drop, hay trong code C FFI. Đặc tính này khiến oneshot trở thành cầu nối thuận lợi giữa sync world và async world.
rx.await Là Future
Receiver<T> tự nó là một Future với Output = Result<T, RecvError>. Cú pháp gọn:
let value: Result<i32, _> = rx.await;
Không có method recv() riêng — rx là future, dùng trực tiếp với .await. Điều này cũng có nghĩa rx có thể truyền vào tokio::select!, tokio::join!, tokio::time::timeout như bất kỳ future nào khác:
use tokio::sync::oneshot;
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let _ = tx.send(100);
});
// Bọc rx trong timeout 500ms — sẽ hết hạn trước khi tx send
match timeout(Duration::from_millis(500), rx).await {
Ok(Ok(value)) => println!("nhận được: {}", value),
Ok(Err(_)) => println!("tx drop"),
Err(_) => println!("timeout sau 500ms"),
}
}
// In: timeout sau 500ms
Một rx chỉ await được một lần (vì await consume future). Nếu cần chia kết quả cho nhiều consumer, dùng broadcast (sẽ học ở bài tiếp) hoặc clone giá trị sau khi nhận xong.
Lỗi RecvError — tx Drop Trước Khi Send
RecvError chỉ có một lý do duy nhất: Sender bị drop mà không gọi send. Đây là tín hiệu "task định trả lời đã chết hoặc hủy giữa chừng — không bao giờ có câu trả lời":
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
// Task panic giữa chừng, tx bị drop mà chưa kịp send
panic!("worker crashed");
// let _ = tx.send("never".to_string()); // không bao giờ chạy
});
match rx.await {
Ok(value) => println!("nhận: {}", value),
Err(e) => println!("lỗi: {} (tx drop trước khi send)", e),
}
}
// In: lỗi: channel closed (tx drop trước khi send)
Pitfall cần để ý: RecvError không phân biệt được giữa "task panic" và "task hủy bỏ có kiểm soát" — cả hai đều rơi vào nhánh Err. Nếu cần biết rõ lý do, gói thêm dữ liệu trong tuple (JoinHandle, oneshot::Receiver) rồi join! cả hai.
Ngược lại, cũng có trường hợp cố ý drop tx để báo "hủy yêu cầu": trong actor pattern, nếu actor quyết định không xử lý command (vd queue đầy, shutdown), nó chỉ cần drop resp_tx — caller tự động nhận Err(RecvError) và biết rằng yêu cầu bị từ chối.
Pattern xử lý điển hình:
let result = match resp_rx.await {
Ok(v) => v,
Err(_) => {
// Worker không trả lời — fallback hoặc bubble error
return Err("worker unavailable".into());
}
};
So Sánh mpsc(1) vs oneshot
Câu hỏi tự nhiên: mpsc::channel(1) cũng là bounded capacity 1 — sao không dùng luôn. Cùng làm được nhưng không tương đương:
// Cách thừa: dùng mpsc(1) cho single-shot
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel::<i32>(1);
tokio::spawn(async move {
tx.send(42).await.unwrap(); // CẦN .await dù chỉ 1 lần
});
let v = rx.recv().await.unwrap(); // Trả Option, phải unwrap thêm 1 lần
// Cách đúng: dùng oneshot
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel::<i32>();
tokio::spawn(async move {
tx.send(42).unwrap(); // KHÔNG .await, consume tx luôn
});
let v = rx.await.unwrap(); // Trả Result trực tiếp
So sánh trực diện:
- Cost: oneshot ~ 1 atomic slot + 1 Waker.
mpsc(1)có thêm queue logic, lock condition variable, counter. Oneshot nhanh hơn ~2-3x trong benchmark microbench. - Semantics: oneshot compile-time enforce "1 send" qua
selfconsume;mpsc(1)chỉ runtime — có thể accidentally send lần thứ hai (bịsend().awaitblock vì buffer đầy). - API: oneshot
sendkhông async, gọi từ sync context được;mpscsend().awaitbắt buộc trong async context. - Reader:
rx.awaitvới oneshot trảResulttự nhiên;mpscrecv().awaittrảOptionphải xử lý thêm None case (channel đóng vs không có message).
Quy tắc: hễ pattern là "1 send, 1 recv" → dùng oneshot. Mọi pattern khác (N send, N recv, send nhiều lần) → dùng mpsc/broadcast/watch.
Tổng Kết
tokio::sync::oneshotlà channel single-shot: 1 sender, 1 receiver, 1 message. Building block rẻ và nhanh cho async coordination.- API:
let (tx, rx) = oneshot::channel::<T>();trả tuple(Sender, Receiver)không cần capacity. tx.send(value) -> Result<(), T>: không async, consumetxsau khi gọi.Errchứa lại value nếurxđã drop.rx.await -> Result<T, RecvError>:Receiverbản thân là Future.Err(RecvError)chỉ xảy ra khitxbị drop trước khi send.- Use case kinh điển: request-response giữa caller và worker task. Worker nhận command kèm
oneshot::Senderđể trả kết quả. - So với
mpsc(1): oneshot rẻ hơn, compile-time enforce "1 send",sendkhông cần await, semantics rõ ràng hơn. - Compose với
tokio::time::timeout,tokio::select!dễ dàng vìReceiverlà Future tiêu chuẩn. tx.closed().awaitgiúp worker phát hiện caller hủy yêu cầu sớm để bỏ tính toán không cần thiết (cancellation).
Bài Tập Củng Cố
Tự trả lời, đáp án ở cuối:
- Vì sao
oneshot::Sender::sendkhông cần.awaittrong khimpsc::Sender::sendbounded thì cần? Mô tả khác biệt internal mechanism. - Đoạn code
let (tx, rx) = oneshot::channel::<i32>(); tx.send(1).unwrap(); tx.send(2).unwrap();sẽ fail như thế nào — compile-time hay runtime? Vì sao? - Trong pattern request-response qua mpsc + oneshot, vì sao oneshot phải gắn trong từng
Commandmà không thể tạo một oneshot duy nhất ngoài worker rồi dùng lại? - Viết code: spawn task tính số Fibonacci thứ 30, dùng oneshot để main task await kết quả. Xử lý cả trường hợp task panic giữa chừng (trả về
Err(RecvError)). - So sánh: dùng
tokio::spawn(async {...}).awaittrực tiếp vs dùngoneshot::channelđể truyền kết quả ra. Khi nào pattern oneshot có lợi thế? - Method
tx.closed().awaitdùng để làm gì? Viết ví dụ worker tính toán nặng, kiểm traclosed()giữa các bước để hủy sớm nếu caller không còn quan tâm.
Đáp án
- oneshot không có buffer cần chờ — channel chỉ chứa 1 slot và
sendchỉ gọi 1 lần, không bao giờ phải đợi "slot trống" như mpsc bounded. Internal: oneshot dùng 1 atomic slot + 1 Waker;sendchỉ swap atomic và đánh thức Waker → đồng bộ, tức thì. mpsc bounded có queue + condition variable; khi buffer đầy,sendphải đăng ký Waker và yield task chờ slot trống → bắt buộc async. - Compile-time fail với lỗi "value used here after move".
send(self, ...)nhậnselfby value — sau dòngtx.send(1),txđã bị move (consumed), dòngtx.send(2)không còntxđể gọi. Đây chính là điểm mạnh oneshot so với mpsc(1) — compiler enforce "1 send" thay vì runtime. - Vì
oneshot::Senderbị consume sausendđầu tiên. Nếu dùng chung 1 oneshot cho nhiều command, command thứ 2 sẽ không cótxđể trả lời. Hơn nữa, response của command 1 và command 2 sẽ lẫn lộn nếu nhiều caller cùng chờ trên cùng 1rx(mà oneshot rx chỉ await được 1 lần). Mỗi command tạo oneshot riêng đảm bảo response đúng caller, đúng request. use tokio::sync::oneshot;
fn fib(n: u32) -> u64 { if n < 2 { n as u64 } else { fib(n-1) + fib(n-2) } }
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u64>();
tokio::spawn(async move {
let result = fib(30);
let _ = tx.send(result);
});
match rx.await {
Ok(v) => println!("fib(30) = {}", v),
Err(_) => eprintln!("worker chết trước khi trả kết quả"),
}
}spawn().awaitquaJoinHandleđủ dùng khi caller chính là chủ task. oneshot có lợi thế khi: (a) task đã chạy nền sẵn (vd worker dài hạn) và caller chỉ "gửi câu hỏi"; (b) cần truyền tx vào nhiều middleware layer trước khi tới worker; (c) cần phân biệt "worker chưa trả lời" vs "worker đã trả lời nhưng panic" — JoinHandle gộp cả hai vào JoinError, oneshot tách rõ qua RecvError vs giá trị trả về.tx.closed().awaittrả future hoàn thành khirxbị drop — cho phép worker phát hiện sớm rằng caller không còn quan tâm và hủy tính toán. Ví dụ:tokio::select! { _ = tx.closed() => { println!("caller huỷ, dừng tính toán"); return; }, result = heavy_compute() => { let _ = tx.send(result); } }— worker chạy đua giữa "tính xong" và "caller bỏ"; bên nào tới trước thắng, branch còn lại bị cancel an toàn.
Bài Tiếp Theo
Bài 257: tokio::sync::broadcast Channel — giới thiệu channel nhiều sender nhiều receiver, mỗi receiver nhận đủ mọi message (pub/sub local). Bạn sẽ học về RecvError::Lagged khi consumer chậm hơn buffer size, cách clone Receiver ngược lại với mpsc, và use case shutdown signal phát đồng loạt tới tất cả task con.
