Danh sách bài viết

Bài 247: Cancellation Safety — Khái Niệm

Bài 247 của series Rust Cơ Bản — trong async Rust, cancel một future = drop nó giữa chừng. Không có exception ném ra, không có CancellationToken bắt buộc, chỉ đơn giản là biến chứa future đi ra khỏi scope hoặc không bao giờ được poll nữa. Nhờ RAII (Drop trait), tài nguyên như file handle, lock, socket được dọn dẹp tự động. Nhưng có một loại bug ngấm ngầm: một số API không cancel-safe — nếu bị drop giữa chừng, dữ liệu đã đọc nửa chừng sẽ mất luôn. Ví dụ kinh điển: tokio::io::AsyncReadExt::read trong nhánh select! thua cuộc đua sẽ vứt đi nửa byte đã pull từ kernel mà không có cách lấy lại. Bài này giải thích khái niệm, chỉ ra các API cancel-safe (tokio::sync::mpsc::Receiver::recv, Notify::notified) và các API không cancel-safe (AsyncReadExt::read, StreamExt::next với một số stream), và đưa pattern fix: pin future ra ngoài bằng tokio::pin! để giữ state qua các vòng select!, hoặc hoàn tất operation trước khi đến cancel point.

10/06/2026
9 phút đọc
1 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu trong async Rust, cancel một future chính là drop nó — không có exception, không có signal, chỉ là Drop::drop chạy ở bất kỳ .await point nào.
  • Nắm cách RAII (Drop trait) tự dọn dẹp tài nguyên khi future bị cancel — file đóng, lock release, socket close, không leak.
  • Phân biệt API cancel-safe (drop ở giữa không mất dữ liệu) và cancel-unsafe (drop giữa chừng mất luôn phần đã làm).
  • Nhận diện tình huống thực tế gây bug: nhánh select! thua cuộc đua bị drop khi đang read() nửa chừng → mất nửa byte không bao giờ thấy lại.
  • Đọc và nhớ danh sách API cancel-safe vs unsafe phổ biến trong tokio (mpsc::Receiver::recv OK, AsyncReadExt::read NOT).
  • Áp dụng pattern fix: pin future ra ngoài bằng tokio::pin! để giữ state qua nhiều vòng select!, hoặc tránh dùng API unsafe trong loop có cancel point.
2

Cancel Future = Drop Future

Trong nhiều ngôn ngữ, cancel một tác vụ async có nghĩa là gửi tín hiệu, ném exception, hoặc gọi CancellationToken. Trong Rust thì rất đơn giản: không bao giờ poll future nữa, hoặc thả nó ra khỏi scope. Vì future là một value bình thường, hủy nó = drop nó.

use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let task = async {
        println!("start");
        sleep(Duration::from_secs(5)).await; // điểm await
        println!("done"); // KHÔNG bao giờ in
    };

    // Tạo future nhưng chỉ poll 100ms rồi thả ra
    tokio::select! {
        _ = task => {}
        _ = sleep(Duration::from_millis(100)) => {
            println!("timeout, drop task");
        }
    }
    // task bị drop ở đây — future biến mất, "done" không bao giờ in
}

Future task bị drop ở cuối select!. Không có signal, không có flag — chỉ là biến chứa state machine của future đi ra khỏi scope, destructor chạy, hết. Đây là cơ chế của Rust: cancel không cần API riêng vì đã có sẵn từ ownership.

Điểm cần khắc cốt: future có thể bị drop ở bất kỳ .await point nào. Code bạn viết tuần tự nhưng compiler chia thành state machine — mỗi .await là một state, và giữa các state có thể bị "xoá sổ". Không có cách nào trong async code để "không cho phép cancel" — caller luôn có quyền drop future.

3

Resource Cleanup Qua Drop (RAII)

Tin tốt là nhờ RAII (Resource Acquisition Is Initialization) của Rust, dọn dẹp tài nguyên khi cancel xảy ra tự động. Mọi tài nguyên giữ qua .await đều nằm trong field của state machine — khi future bị drop, các field đó chạy Drop::drop theo thứ tự.

use tokio::sync::mpsc;

// Guard tự đóng channel khi out-of-scope
struct ChannelGuard {
    name: String,
    tx: mpsc::Sender<String>,
}

impl Drop for ChannelGuard {
    fn drop(&mut self) {
        // Báo cho phía nhận biết worker này dừng
        let _ = self.tx.try_send(format!("{} disconnected", self.name));
        println!("ChannelGuard drop: {}", self.name);
    }
}

async fn worker(tx: mpsc::Sender<String>) {
    let _guard = ChannelGuard {
        name: "worker-1".into(),
        tx: tx.clone(),
    };
    // Nếu future này bị cancel ở .await, _guard tự drop → "disconnected" được gửi
    tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}

Khi worker bị drop ở giữa sleep, field _guard nằm trong state machine cũng bị drop, kéo theo Drop::drop của ChannelGuard chạy. File handle, MutexGuard, TcpStream, transaction database — tất cả các kiểu RAII chuẩn đều dọn dẹp đúng kể cả khi cancel xảy ra bất ngờ.

Đây là khác biệt lớn so với ngôn ngữ có exception async: ở Java/C# nếu task bị cancel, code cleanup phải nằm trong finally hoặc using. Rust có Drop chạy luôn — không cần nhớ.

4

Vấn Đề: Một Số API KHÔNG Cancel-Safe

RAII đảm bảo tài nguyên hệ thống không bị leak, nhưng không đảm bảo logic dữ liệu đúng. Đây là chỗ cancellation safety trở thành khái niệm riêng cần học.

Định nghĩa: một future được gọi là cancel-safe nếu khi nó bị drop trước khi return Ready, không có dữ liệu hay tiến độ nào bị mất vĩnh viễn — lần sau gọi lại từ đầu sẽ thấy đúng dữ liệu cũ. Ngược lại là cancel-unsafe.

Ví dụ kinh điển: tokio::io::AsyncReadExt::read không cancel-safe. Bên trong, nó có thể đã gọi read(2) syscall, kernel đã pop dữ liệu ra khỏi buffer TCP nội bộ, copy vào buffer trung gian. Nếu future bị drop trước khi copy xong vào buffer của user, kernel coi như đã giao nhưng user code chưa thấy — dữ liệu mất luôn, không cách nào lấy lại.

use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;

async fn buggy(stream: &mut TcpStream) {
    let mut buf = vec![0u8; 1024];

    loop {
        tokio::select! {
            // Nhánh này có thể drop giữa chừng read() → mất byte!
            n = stream.read(&mut buf) => {
                let n = n.unwrap();
                println!("got {} bytes", n);
            }
            _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
                println!("tick");
            }
        }
    }
}

Mỗi vòng select!, nếu nhánh sleep hoàn thành trước, future stream.read(...) đang dang dở sẽ bị drop. Lần sau vào select! lại tạo future read() mới, gọi read(2) mới — kernel có thể đã pop dữ liệu cho future cũ rồi, future mới không thấy. Bytes mất.

5

Khi Nào Gặp: select! Branch Mất Cuộc Đua

Cancellation safety chỉ là vấn đề khi có ai đó thực sự cancel future. Có ba ngữ cảnh phổ biến:

  • tokio::select! — đây là nguồn gốc 90% bug cancel-unsafe. Mỗi vòng select!, các nhánh chạy cùng lúc, nhánh nào ready trước thì branch còn lại bị drop. Nếu lại đặt nguyên cụm trong loop, mỗi iter tạo lại future mới — dễ mất dữ liệu.
  • tokio::time::timeout — wrap future bằng timeout, khi timer hết thì future bị drop. Nếu future bên trong là cancel-unsafe và timeout xảy ra đúng lúc nửa chừng, hậu quả tương tự.
  • JoinHandle::abort hoặc task bị drop khi handle ra khỏi scope — cancel toàn bộ task.

Trong tất cả các trường hợp này, future bị thả ra ngoài mà không hoàn thành. Với select! trong loop, vấn đề đặc biệt nguy hiểm vì future mới được tạo lại mỗi vòng — tất cả state nội bộ của future cũ (con trỏ đọc, buffer trung gian) đều mất.

Cùng ví dụ trên, nếu peer gửi 4 byte mỗi 600ms còn sleep đặt 1s:

t=0       : enter select, read() pending
t=600ms   : peer gửi 4 byte, read() trên đường trả về
t=1000ms  : sleep ready TRƯỚC, read() bị drop! 4 byte trong kernel buffer
            đã copy nửa chừng vào read()'s internal buffer — MẤT.
t=1000ms+ : vào lại loop, tạo read() mới, syscall mới
            kernel TCP buffer giờ rỗng hoặc thiếu, dữ liệu cũ vĩnh viễn lost

Khó debug vì không có error message — kết quả chỉ là parse JSON lỗi, packet thiếu header, MD5 không khớp. Triệu chứng có vẻ ngẫu nhiên, thực ra là race condition giữa hai nhánh select!.

6

Idiom: Cancellation-Safe API vs Unsafe API

Tokio docs ghi rõ tính cancel-safe của từng method. Cheat sheet thường dùng:

Cancel-SAFE (drop ở giữa không mất dữ liệu, lần sau gọi lại OK):

  • tokio::sync::mpsc::Receiver::recv — message vẫn ở trong channel, lần sau recv() lại sẽ lấy được.
  • tokio::sync::broadcast::Receiver::recv
  • tokio::sync::Notify::notified — chưa nhận notification thì waker chỉ "register", drop không mất event đã pending.
  • tokio::sync::watch::Receiver::changed
  • tokio::time::sleeptokio::time::sleep_until — đơn giản chờ timer, không có state cần giữ.
  • tokio::net::TcpListener::accept — connection vẫn nằm trong queue của kernel.
  • tokio::sync::Mutex::lock — chưa lấy được lock thì drop sẽ remove khỏi queue, lần sau lock lại.

KHÔNG cancel-safe (drop giữa chừng có thể mất dữ liệu/tiến độ):

  • tokio::io::AsyncReadExt::read, read_exact, read_to_end — có thể đã pop bytes từ kernel.
  • tokio::io::AsyncWriteExt::write, write_all — có thể đã ghi một phần, lần sau gọi lại sẽ ghi lại phần đó, gây trùng lặp.
  • tokio::io::AsyncBufReadExt::read_line, read_until — buffer trung gian có thể chứa nửa dòng.
  • tokio_stream::StreamExt::next với stream có internal state (ví dụ stream từ codec) — phụ thuộc cụ thể stream.
  • tokio::time::interval::tick — sau cancel có thể skip tick tiếp theo (gọi là "tick loss").

Nguyên tắc nhớ nhanh: nếu API ghi vào buffer trung gian hoặc consume từ source bên ngoài, khả năng cao là cancel-unsafe. Channel và signal primitives của tokio::sync được thiết kế cẩn thận để cancel-safe — đó là lý do nên ưu tiên kiến trúc message-passing qua mpsc thay vì share state.

7

Pattern Fix: Hoàn Tất Operation Trước Cancel Point

Khi buộc phải dùng API cancel-unsafe trong select!, có hai pattern fix chuẩn.

Pattern 1: Pin future ra ngoài loop bằng tokio::pin!. Thay vì tạo lại future mỗi vòng, tạo một future duy nhất sống qua nhiều iteration. Khi select! không chọn branch đó, future vẫn còn nguyên chứ không bị drop — lần sau poll tiếp từ state cũ.

use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;

async fn fixed(stream: &mut TcpStream) {
    let mut buf = vec![0u8; 1024];

    loop {
        // Tạo future MỘT LẦN, pin để giữ qua nhiều vòng select
        let read_fut = stream.read(&mut buf);
        tokio::pin!(read_fut);

        tokio::select! {
            // read_fut được poll trên cùng một state machine, không bị tạo lại
            n = &mut read_fut => {
                let n = n.unwrap();
                println!("got {} bytes", n);
            }
            _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
                println!("tick");
                // QUAN TRỌNG: KHÔNG break, tiếp tục loop với cùng read_fut
                // Nhưng pattern này khó vì biến read_fut sẽ bị tái khai báo
                // → cách phổ biến hơn: pin ngoài cùng loop, dùng &mut read_fut
            }
        }
    }
}

Lưu ý đoạn trên minh hoạ ý tưởng — implementation đúng cần pin bên ngoài loop và tạo future mới chỉ khi previous future đã Ready. Một cách viết sạch hơn dùng futures::pin_mut! hoặc tách read thành task riêng đẩy data qua mpsc.

Pattern 2: Tránh API unsafe trong nhánh select! — chuyển sang message-passing. Đẩy việc read() vào một task riêng, task này gửi dữ liệu qua mpsc. select! lúc đó chỉ chứa rx.recv() (cancel-safe).

use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc;

async fn safe_via_mpsc(mut stream: TcpStream) {
    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(8);

    // Reader task — đọc tuần tự, không bị cancel ngẫu nhiên
    tokio::spawn(async move {
        let mut buf = vec![0u8; 1024];
        loop {
            match stream.read(&mut buf).await {
                Ok(0) => break,
                Ok(n) => {
                    if tx.send(buf[..n].to_vec()).await.is_err() {
                        break;
                    }
                }
                Err(_) => break,
            }
        }
    });

    // Consumer dùng select! AN TOÀN vì recv() cancel-safe
    loop {
        tokio::select! {
            Some(data) = rx.recv() => {
                println!("got {} bytes", data.len());
            }
            _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
                println!("tick");
            }
        }
    }
}

Đây là idiom được khuyến nghị nhất: cô lập API cancel-unsafe trong task riêng không có select!, giao tiếp ra ngoài qua channel cancel-safe. Vừa tránh bug vừa làm code dễ test hơn.

8

Tổng Kết

  • Cancel một future trong Rust = drop nó. Không có exception, không có signal — chỉ là value đi ra khỏi scope.
  • Nhờ RAII, tài nguyên hệ thống (file, lock, socket, tx channel) tự động dọn dẹp qua Drop::drop kể cả khi future bị cancel ở .await point.
  • Cancellation safety là khái niệm về tính đúng đắn dữ liệu: future được gọi là cancel-safe nếu drop giữa chừng không mất dữ liệu/tiến độ vĩnh viễn.
  • Cancel-safe: mpsc::recv, broadcast::recv, Notify::notified, watch::changed, sleep, TcpListener::accept, Mutex::lock.
  • KHÔNG cancel-safe: AsyncReadExt::read / read_exact / read_to_end, AsyncWriteExt::write / write_all, read_line, một số Stream::next, interval::tick.
  • Bug điển hình: nhánh select! chứa read() bị nhánh khác thắng cuộc đua → future read() drop giữa chừng, nửa byte đã pop từ kernel mất luôn, không có error log.
  • Pattern fix: (1) pin future ra ngoài bằng tokio::pin! để giữ state qua nhiều vòng select!; (2) tốt hơn — cô lập API unsafe trong task riêng, giao tiếp qua channel cancel-safe.
9

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. Cancel một future trong Rust thực sự là gì? Có cần API riêng như CancellationToken không?
  2. Vì sao một MutexGuard nắm giữ qua .await vẫn được release đúng khi future bị cancel?
  3. Định nghĩa cancel-safe theo cách của bạn. Vì sao mpsc::Receiver::recv cancel-safe nhưng AsyncReadExt::read thì không?
  4. Một developer viết loop select! với hai nhánh: stream.read(&mut buf)sleep(1s). Mô tả đúng cơ chế mất dữ liệu khi sleep thắng cuộc đua.
  5. Liệt kê 3 API tokio cancel-safe và 3 API không cancel-safe.
  6. Đề xuất hai pattern fix khi buộc phải đọc TCP trong loop có timeout. Pattern nào được khuyến nghị hơn và vì sao?
Đáp án
  1. Cancel = drop future. Future là value bình thường — out of scope hoặc không poll nữa thì destructor chạy, hết. Rust không cần CancellationToken như C# hay Go context, vì cơ chế ownership + Drop đã đủ. Vẫn có thể dùng tokio_util::sync::CancellationToken để truyền tín hiệu rõ ràng giữa các task, nhưng đó là utility, không phải bắt buộc về mặt ngôn ngữ.
  2. Khi future bị drop, tất cả field trong state machine của nó cũng bị drop theo thứ tự ngược lại với declaration. MutexGuard là field giữ qua .await nên nằm trong state machine — drop của nó gọi unlock() tự động. Đây là RAII áp dụng cho async, hoạt động giống hệt như drop ở code sync.
  3. Cancel-safe = drop ở giữa không mất dữ liệu/tiến độ vĩnh viễn, lần sau gọi lại từ đầu vẫn thấy đúng dữ liệu. mpsc::recv chỉ đăng ký waker rồi đợi — chưa pop message, drop chỉ unregister waker, message vẫn ở channel. AsyncReadExt::read đã có thể gọi syscall read(2), kernel pop bytes khỏi TCP buffer, copy nửa chừng vào buffer trung gian — drop làm mất phần copy đó, kernel không trả lại được.
  4. Mỗi vòng select! tạo future stream.read(...) mới. Khi sleep ready trước, future read() bị drop. Nếu trước đó kernel đã pop một số bytes vào buffer nội bộ của read() (do TCP có data sẵn ngay khi sleep cũng ready), những bytes đó mất luôn. Vòng sau tạo read() mới, gọi read(2) mới — kernel buffer giờ rỗng hoặc thiếu. Triệu chứng: parse lỗi, header thiếu, hash không khớp, không có error log.
  5. Cancel-SAFE: tokio::sync::mpsc::Receiver::recv, tokio::sync::Notify::notified, tokio::time::sleep (cũng watch::changed, broadcast::recv, TcpListener::accept, Mutex::lock). KHÔNG cancel-safe: tokio::io::AsyncReadExt::read, tokio::io::AsyncWriteExt::write_all, tokio::io::AsyncBufReadExt::read_line (cũng read_exact, interval::tick).
  6. (1) Pin future ra ngoài loop bằng tokio::pin!, mỗi vòng select! dùng &mut để poll tiếp future cũ thay vì tạo mới. (2) Cô lập read() trong task riêng spawn bằng tokio::spawn, đẩy dữ liệu qua mpsc::channel; select! ngoài chỉ chứa rx.recv() vốn cancel-safe. Pattern (2) được khuyến nghị hơn vì code dễ hiểu hơn, dễ test, tránh được toàn bộ class bug pin/lifetime, và phù hợp với triết lý "share memory by communicating".
10

Bài Tiếp Theo

Bài 248: Send Future Và Non-Send Future — vì sao một future có thể là Send hay không, dấu hiệu nhận biết, và tại sao việc giữ Rc hoặc MutexGuard qua .await sẽ khiến compiler từ chối spawn future trên multi-thread runtime của tokio. Bài học sẽ làm rõ liên hệ giữa scheduler work-stealing và auto-trait Send.