Mục lục
Mục Tiêu Bài Học
Sau bài học, bạn sẽ:
- Hiểu trong async Rust, cancel một future chính là drop nó — không có exception, không có signal, chỉ là
Drop::dropchạy ở bất kỳ.awaitpoint nào. - Nắm cách RAII (Drop trait) tự dọn dẹp tài nguyên khi future bị cancel — file đóng, lock release, socket close, không leak.
- Phân biệt API cancel-safe (drop ở giữa không mất dữ liệu) và cancel-unsafe (drop giữa chừng mất luôn phần đã làm).
- Nhận diện tình huống thực tế gây bug: nhánh
select!thua cuộc đua bị drop khi đangread()nửa chừng → mất nửa byte không bao giờ thấy lại. - Đọc và nhớ danh sách API cancel-safe vs unsafe phổ biến trong
tokio(mpsc::Receiver::recvOK,AsyncReadExt::readNOT). - Áp dụng pattern fix: pin future ra ngoài bằng
tokio::pin!để giữ state qua nhiều vòngselect!, hoặc tránh dùng API unsafe trong loop có cancel point.
Cancel Future = Drop Future
Trong nhiều ngôn ngữ, cancel một tác vụ async có nghĩa là gửi tín hiệu, ném exception, hoặc gọi CancellationToken. Trong Rust thì rất đơn giản: không bao giờ poll future nữa, hoặc thả nó ra khỏi scope. Vì future là một value bình thường, hủy nó = drop nó.
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let task = async {
println!("start");
sleep(Duration::from_secs(5)).await; // điểm await
println!("done"); // KHÔNG bao giờ in
};
// Tạo future nhưng chỉ poll 100ms rồi thả ra
tokio::select! {
_ = task => {}
_ = sleep(Duration::from_millis(100)) => {
println!("timeout, drop task");
}
}
// task bị drop ở đây — future biến mất, "done" không bao giờ in
}
Future task bị drop ở cuối select!. Không có signal, không có flag — chỉ là biến chứa state machine của future đi ra khỏi scope, destructor chạy, hết. Đây là cơ chế của Rust: cancel không cần API riêng vì đã có sẵn từ ownership.
Điểm cần khắc cốt: future có thể bị drop ở bất kỳ .await point nào. Code bạn viết tuần tự nhưng compiler chia thành state machine — mỗi .await là một state, và giữa các state có thể bị "xoá sổ". Không có cách nào trong async code để "không cho phép cancel" — caller luôn có quyền drop future.
Resource Cleanup Qua Drop (RAII)
Tin tốt là nhờ RAII (Resource Acquisition Is Initialization) của Rust, dọn dẹp tài nguyên khi cancel xảy ra tự động. Mọi tài nguyên giữ qua .await đều nằm trong field của state machine — khi future bị drop, các field đó chạy Drop::drop theo thứ tự.
use tokio::sync::mpsc;
// Guard tự đóng channel khi out-of-scope
struct ChannelGuard {
name: String,
tx: mpsc::Sender<String>,
}
impl Drop for ChannelGuard {
fn drop(&mut self) {
// Báo cho phía nhận biết worker này dừng
let _ = self.tx.try_send(format!("{} disconnected", self.name));
println!("ChannelGuard drop: {}", self.name);
}
}
async fn worker(tx: mpsc::Sender<String>) {
let _guard = ChannelGuard {
name: "worker-1".into(),
tx: tx.clone(),
};
// Nếu future này bị cancel ở .await, _guard tự drop → "disconnected" được gửi
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
Khi worker bị drop ở giữa sleep, field _guard nằm trong state machine cũng bị drop, kéo theo Drop::drop của ChannelGuard chạy. File handle, MutexGuard, TcpStream, transaction database — tất cả các kiểu RAII chuẩn đều dọn dẹp đúng kể cả khi cancel xảy ra bất ngờ.
Đây là khác biệt lớn so với ngôn ngữ có exception async: ở Java/C# nếu task bị cancel, code cleanup phải nằm trong finally hoặc using. Rust có Drop chạy luôn — không cần nhớ.
Vấn Đề: Một Số API KHÔNG Cancel-Safe
RAII đảm bảo tài nguyên hệ thống không bị leak, nhưng không đảm bảo logic dữ liệu đúng. Đây là chỗ cancellation safety trở thành khái niệm riêng cần học.
Định nghĩa: một future được gọi là cancel-safe nếu khi nó bị drop trước khi return Ready, không có dữ liệu hay tiến độ nào bị mất vĩnh viễn — lần sau gọi lại từ đầu sẽ thấy đúng dữ liệu cũ. Ngược lại là cancel-unsafe.
Ví dụ kinh điển: tokio::io::AsyncReadExt::read không cancel-safe. Bên trong, nó có thể đã gọi read(2) syscall, kernel đã pop dữ liệu ra khỏi buffer TCP nội bộ, copy vào buffer trung gian. Nếu future bị drop trước khi copy xong vào buffer của user, kernel coi như đã giao nhưng user code chưa thấy — dữ liệu mất luôn, không cách nào lấy lại.
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
async fn buggy(stream: &mut TcpStream) {
let mut buf = vec![0u8; 1024];
loop {
tokio::select! {
// Nhánh này có thể drop giữa chừng read() → mất byte!
n = stream.read(&mut buf) => {
let n = n.unwrap();
println!("got {} bytes", n);
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
println!("tick");
}
}
}
}
Mỗi vòng select!, nếu nhánh sleep hoàn thành trước, future stream.read(...) đang dang dở sẽ bị drop. Lần sau vào select! lại tạo future read() mới, gọi read(2) mới — kernel có thể đã pop dữ liệu cho future cũ rồi, future mới không thấy. Bytes mất.
Khi Nào Gặp: select! Branch Mất Cuộc Đua
Cancellation safety chỉ là vấn đề khi có ai đó thực sự cancel future. Có ba ngữ cảnh phổ biến:
tokio::select!— đây là nguồn gốc 90% bug cancel-unsafe. Mỗi vòngselect!, các nhánh chạy cùng lúc, nhánh nào ready trước thì branch còn lại bị drop. Nếu lại đặt nguyên cụm trongloop, mỗi iter tạo lại future mới — dễ mất dữ liệu.tokio::time::timeout— wrap future bằng timeout, khi timer hết thì future bị drop. Nếu future bên trong là cancel-unsafe và timeout xảy ra đúng lúc nửa chừng, hậu quả tương tự.JoinHandle::aborthoặc task bị drop khi handle ra khỏi scope — cancel toàn bộ task.
Trong tất cả các trường hợp này, future bị thả ra ngoài mà không hoàn thành. Với select! trong loop, vấn đề đặc biệt nguy hiểm vì future mới được tạo lại mỗi vòng — tất cả state nội bộ của future cũ (con trỏ đọc, buffer trung gian) đều mất.
Cùng ví dụ trên, nếu peer gửi 4 byte mỗi 600ms còn sleep đặt 1s:
t=0 : enter select, read() pending
t=600ms : peer gửi 4 byte, read() trên đường trả về
t=1000ms : sleep ready TRƯỚC, read() bị drop! 4 byte trong kernel buffer
đã copy nửa chừng vào read()'s internal buffer — MẤT.
t=1000ms+ : vào lại loop, tạo read() mới, syscall mới
kernel TCP buffer giờ rỗng hoặc thiếu, dữ liệu cũ vĩnh viễn lost
Khó debug vì không có error message — kết quả chỉ là parse JSON lỗi, packet thiếu header, MD5 không khớp. Triệu chứng có vẻ ngẫu nhiên, thực ra là race condition giữa hai nhánh select!.
Idiom: Cancellation-Safe API vs Unsafe API
Tokio docs ghi rõ tính cancel-safe của từng method. Cheat sheet thường dùng:
Cancel-SAFE (drop ở giữa không mất dữ liệu, lần sau gọi lại OK):
tokio::sync::mpsc::Receiver::recv— message vẫn ở trong channel, lần saurecv()lại sẽ lấy được.tokio::sync::broadcast::Receiver::recvtokio::sync::Notify::notified— chưa nhận notification thì waker chỉ "register", drop không mất event đã pending.tokio::sync::watch::Receiver::changedtokio::time::sleepvàtokio::time::sleep_until— đơn giản chờ timer, không có state cần giữ.tokio::net::TcpListener::accept— connection vẫn nằm trong queue của kernel.tokio::sync::Mutex::lock— chưa lấy được lock thì drop sẽ remove khỏi queue, lần sau lock lại.
KHÔNG cancel-safe (drop giữa chừng có thể mất dữ liệu/tiến độ):
tokio::io::AsyncReadExt::read,read_exact,read_to_end— có thể đã pop bytes từ kernel.tokio::io::AsyncWriteExt::write,write_all— có thể đã ghi một phần, lần sau gọi lại sẽ ghi lại phần đó, gây trùng lặp.tokio::io::AsyncBufReadExt::read_line,read_until— buffer trung gian có thể chứa nửa dòng.tokio_stream::StreamExt::nextvới stream có internal state (ví dụ stream từ codec) — phụ thuộc cụ thể stream.tokio::time::interval::tick— sau cancel có thể skip tick tiếp theo (gọi là "tick loss").
Nguyên tắc nhớ nhanh: nếu API ghi vào buffer trung gian hoặc consume từ source bên ngoài, khả năng cao là cancel-unsafe. Channel và signal primitives của tokio::sync được thiết kế cẩn thận để cancel-safe — đó là lý do nên ưu tiên kiến trúc message-passing qua mpsc thay vì share state.
Pattern Fix: Hoàn Tất Operation Trước Cancel Point
Khi buộc phải dùng API cancel-unsafe trong select!, có hai pattern fix chuẩn.
Pattern 1: Pin future ra ngoài loop bằng tokio::pin!. Thay vì tạo lại future mỗi vòng, tạo một future duy nhất sống qua nhiều iteration. Khi select! không chọn branch đó, future vẫn còn nguyên chứ không bị drop — lần sau poll tiếp từ state cũ.
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
async fn fixed(stream: &mut TcpStream) {
let mut buf = vec![0u8; 1024];
loop {
// Tạo future MỘT LẦN, pin để giữ qua nhiều vòng select
let read_fut = stream.read(&mut buf);
tokio::pin!(read_fut);
tokio::select! {
// read_fut được poll trên cùng một state machine, không bị tạo lại
n = &mut read_fut => {
let n = n.unwrap();
println!("got {} bytes", n);
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
println!("tick");
// QUAN TRỌNG: KHÔNG break, tiếp tục loop với cùng read_fut
// Nhưng pattern này khó vì biến read_fut sẽ bị tái khai báo
// → cách phổ biến hơn: pin ngoài cùng loop, dùng &mut read_fut
}
}
}
}
Lưu ý đoạn trên minh hoạ ý tưởng — implementation đúng cần pin bên ngoài loop và tạo future mới chỉ khi previous future đã Ready. Một cách viết sạch hơn dùng futures::pin_mut! hoặc tách read thành task riêng đẩy data qua mpsc.
Pattern 2: Tránh API unsafe trong nhánh select! — chuyển sang message-passing. Đẩy việc read() vào một task riêng, task này gửi dữ liệu qua mpsc. select! lúc đó chỉ chứa rx.recv() (cancel-safe).
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
async fn safe_via_mpsc(mut stream: TcpStream) {
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(8);
// Reader task — đọc tuần tự, không bị cancel ngẫu nhiên
tokio::spawn(async move {
let mut buf = vec![0u8; 1024];
loop {
match stream.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx.send(buf[..n].to_vec()).await.is_err() {
break;
}
}
Err(_) => break,
}
}
});
// Consumer dùng select! AN TOÀN vì recv() cancel-safe
loop {
tokio::select! {
Some(data) = rx.recv() => {
println!("got {} bytes", data.len());
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
println!("tick");
}
}
}
}
Đây là idiom được khuyến nghị nhất: cô lập API cancel-unsafe trong task riêng không có select!, giao tiếp ra ngoài qua channel cancel-safe. Vừa tránh bug vừa làm code dễ test hơn.
Tổng Kết
- Cancel một future trong Rust = drop nó. Không có exception, không có signal — chỉ là value đi ra khỏi scope.
- Nhờ RAII, tài nguyên hệ thống (file, lock, socket, tx channel) tự động dọn dẹp qua
Drop::dropkể cả khi future bị cancel ở.awaitpoint. - Cancellation safety là khái niệm về tính đúng đắn dữ liệu: future được gọi là cancel-safe nếu drop giữa chừng không mất dữ liệu/tiến độ vĩnh viễn.
- Cancel-safe:
mpsc::recv,broadcast::recv,Notify::notified,watch::changed,sleep,TcpListener::accept,Mutex::lock. - KHÔNG cancel-safe:
AsyncReadExt::read/read_exact/read_to_end,AsyncWriteExt::write/write_all,read_line, một sốStream::next,interval::tick. - Bug điển hình: nhánh
select!chứaread()bị nhánh khác thắng cuộc đua → futureread()drop giữa chừng, nửa byte đã pop từ kernel mất luôn, không có error log. - Pattern fix: (1) pin future ra ngoài bằng
tokio::pin!để giữ state qua nhiều vòngselect!; (2) tốt hơn — cô lập API unsafe trong task riêng, giao tiếp qua channel cancel-safe.
Bài Tập Củng Cố
Tự trả lời, đáp án ở cuối:
- Cancel một future trong Rust thực sự là gì? Có cần API riêng như
CancellationTokenkhông? - Vì sao một
MutexGuardnắm giữ qua.awaitvẫn được release đúng khi future bị cancel? - Định nghĩa cancel-safe theo cách của bạn. Vì sao
mpsc::Receiver::recvcancel-safe nhưngAsyncReadExt::readthì không? - Một developer viết loop
select!với hai nhánh:stream.read(&mut buf)vàsleep(1s). Mô tả đúng cơ chế mất dữ liệu khi sleep thắng cuộc đua. - Liệt kê 3 API tokio cancel-safe và 3 API không cancel-safe.
- Đề xuất hai pattern fix khi buộc phải đọc TCP trong loop có timeout. Pattern nào được khuyến nghị hơn và vì sao?
Đáp án
- Cancel = drop future. Future là value bình thường — out of scope hoặc không poll nữa thì destructor chạy, hết. Rust không cần
CancellationTokennhư C# hay Go context, vì cơ chế ownership + Drop đã đủ. Vẫn có thể dùngtokio_util::sync::CancellationTokenđể truyền tín hiệu rõ ràng giữa các task, nhưng đó là utility, không phải bắt buộc về mặt ngôn ngữ. - Khi future bị drop, tất cả field trong state machine của nó cũng bị drop theo thứ tự ngược lại với declaration.
MutexGuardlà field giữ qua.awaitnên nằm trong state machine — drop của nó gọiunlock()tự động. Đây là RAII áp dụng cho async, hoạt động giống hệt như drop ở code sync. - Cancel-safe = drop ở giữa không mất dữ liệu/tiến độ vĩnh viễn, lần sau gọi lại từ đầu vẫn thấy đúng dữ liệu.
mpsc::recvchỉ đăng ký waker rồi đợi — chưa pop message, drop chỉ unregister waker, message vẫn ở channel.AsyncReadExt::readđã có thể gọi syscallread(2), kernel pop bytes khỏi TCP buffer, copy nửa chừng vào buffer trung gian — drop làm mất phần copy đó, kernel không trả lại được. - Mỗi vòng
select!tạo futurestream.read(...)mới. Khi sleep ready trước, futureread()bị drop. Nếu trước đó kernel đã pop một số bytes vào buffer nội bộ củaread()(do TCP có data sẵn ngay khi sleep cũng ready), những bytes đó mất luôn. Vòng sau tạoread()mới, gọiread(2)mới — kernel buffer giờ rỗng hoặc thiếu. Triệu chứng: parse lỗi, header thiếu, hash không khớp, không có error log. - Cancel-SAFE:
tokio::sync::mpsc::Receiver::recv,tokio::sync::Notify::notified,tokio::time::sleep(cũngwatch::changed,broadcast::recv,TcpListener::accept,Mutex::lock). KHÔNG cancel-safe:tokio::io::AsyncReadExt::read,tokio::io::AsyncWriteExt::write_all,tokio::io::AsyncBufReadExt::read_line(cũngread_exact,interval::tick). - (1) Pin future ra ngoài loop bằng
tokio::pin!, mỗi vòngselect!dùng&mutđể poll tiếp future cũ thay vì tạo mới. (2) Cô lậpread()trong task riêng spawn bằngtokio::spawn, đẩy dữ liệu quampsc::channel;select!ngoài chỉ chứarx.recv()vốn cancel-safe. Pattern (2) được khuyến nghị hơn vì code dễ hiểu hơn, dễ test, tránh được toàn bộ class bug pin/lifetime, và phù hợp với triết lý "share memory by communicating".
Bài Tiếp Theo
Bài 248: Send Future Và Non-Send Future — vì sao một future có thể là Send hay không, dấu hiệu nhận biết, và tại sao việc giữ Rc hoặc MutexGuard qua .await sẽ khiến compiler từ chối spawn future trên multi-thread runtime của tokio. Bài học sẽ làm rõ liên hệ giữa scheduler work-stealing và auto-trait Send.
