Danh sách bài viết

Bài 38: Streaming Body — body::Body

Bài 38 của series Rust RESTful API — chi tiết axum::body::Body extractor cho streaming body từng chunk khi đến thay vì load FULL body vào memory như buffered extractor (Bytes B37, Json<T> B15, Form<T> B35); cốt lõi khác biệt buffered ổn định peak memory bằng body size còn streaming peak memory bằng 1 chunk (~8KB-1MB tùy config) — file 1GB buffered nuốt 1GB RAM một mạch dễ OOM single-pod, streaming chỉ ~1MB RAM tại mỗi thời điểm xử lý song song với download tiếp tục từ client. Decision rule lock vĩnh viễn Shop API: file ≤5MB dùng buffered (Bytes/Json/AppMultipart) simpler code parse 1 lần error handling đơn giản, file >5MB dùng streaming (Body) memory bounded code phức tạp hơn error giữa stream khó recover; ngưỡng 5MB align body size limit product image lock B36 + DefaultBodyLimit::max(5 * 1024 * 1024) route layer; webhook payload Stripe ~10-100KB stay buffered B37, admin CSV export 1M order row 200MB MANDATORY streaming. axum::body::Body extractor có sẵn axum core là wrapper trait http_body::Body (từ http crate ecosystem tokio-rs maintain) — pattern consume: convert qua body.into_data_stream() trả impl Stream<Item = Result<Bytes, axum::Error>> rồi iterate qua StreamExt::next().await trong loop xử lý từng chunk Bytes cheap-clone Arc internal (lock B37) — chunk size phụ thuộc hyper transport layer (HTTP/1.1 chunked encoding ~8-32KB typical, HTTP/2 frame ~16KB) không control trực tiếp được; workspace.dependencies sẽ add khi B62 implement: http-body-util = "0.1" (trait helper BodyExt::into_data_stream), futures = "0.3" (trait StreamExt::next), tokio-util = "0.7" feature io (cầu nối AsyncRead ↔ Stream<Bytes> qua ReaderStream). Upload streaming pattern lock vĩnh viễn 4 bước MANDATORY: (1) extract Body raw đặt CUỐI arg list theo lock B31 (cùng quy tắc Json/Form/Multipart/Bytes B32/B35/B36/B37); (2) body.into_data_stream() convert sang Stream; (3) loop while let Some(chunk_result) = stream.next().await consume từng chunk + incremental size check total_bytes > MAX_UPLOAD_SIZE reject 413 ngay khi vượt KHÔNG đợi load hết body; (4) save disk qua tokio::fs::File::write_all hoặc upload S3 multipart buffer mỗi 5MB minimum part size qua aws-sdk-s3 hoặc object_store (lean về object_store cho R2 swap lock B36). Pitfall: extract Bytes cho file 1GB → axum load full Body vào memory rồi materialize Bytes → OOM single-pod kill bởi K8s; fix bằng Body streaming. Download streaming pattern lock vĩnh viễn: tạo Stream<Item = Result<Bytes, E>> rồi wrap qua Body::from_stream(stream) trả response với Transfer-Encoding: chunked tự động axum + hyper handle; cầu nối từ tokio::fs::File hoặc TcpStream qua tokio_util::io::ReaderStream::new(reader) implement Stream<Item = Result<Bytes, std::io::Error>> chunk size mặc định 8KB cấu hình qua with_capacity(); browser/curl nhận chunked transfer encoding incremental không phải đợi full file; use case Shop API: invoice PDF download stream từ S3 không buffer toàn bytes, log file download admin, video file thumbnail. Backpressure cơ chế MANDATORY chống OOM khi client slow consumer: pitfall server đọc file 10GB nhanh từ disk/S3 nhưng client download chậm qua mobile network → server buffer hết 10GB chờ client → OOM; fix qua bounded channel tokio::sync::mpsc::channel(capacity) với capacity nhỏ (default 10 chunk) — producer task tx.send(chunk).await tự block khi receiver chưa consume (channel full) → server tự nhiên dừng đọc khi client lag, memory cap predictable = capacity × chunk_size (vd 10 × 1MB = 10MB peak per request regardless file size); pattern axum + hyper tự handle pull-based qua tower::Service nhưng explicit bounded channel cho fine control critical cho large file stream. Server-Sent Events (SSE) pattern: built-in axum::response::sse::Sse module wrap stream của Event trả Content-Type text/event-stream format data: <json>\n\n mỗi event; use case Shop API: real-time order status update qua Redis pub/sub (admin xem order list live update khi customer checkout/cancel — B189 implement với fred Redis pub/sub subscribe channel order:status + forward event sang SSE stream + keepalive ping 30s tránh proxy NGINX/CloudFlare timeout); chi tiết SSE pattern + keepalive deep dive B50 module riêng. CSV export pattern lock vĩnh viễn Shop API B197: admin endpoint GET /api/v1/admin/orders/export với RequireRole<"admin"> B135 export 1M+ order rows CSV; pattern MANDATORY streaming KHÔNG buffer — sqlx .fetch() trả impl Stream<Item = Result<Row, sqlx::Error>> lazy load từng row từ Postgres cursor qua transaction không materialize Vec; map mỗi row format CSV string qua csv::Writer hoặc thủ công format!("{},{},{}\n", id, total, status); wrap Stream qua Body::from_stream response với header Content-Type: text/csv + Content-Disposition: attachment; filename="orders-{date}.csv" browser tự download; peak memory bounded ~1 row × number_of_chunks_in_channel (vd 100KB regardless 1M row), client tải về incremental không phải đợi server build full file 200MB (giảm Time to First Byte TTFB từ 30s xuống 50ms). Use case Shop API lock vĩnh viễn: (i) admin CSV export 1M+ row B197 — streaming MANDATORY peak memory ~100KB; (ii) order status real-time update qua SSE B189 — Redis pub/sub forward sang SSE Stream với keepalive 30s; (iii) product image upload large file B62 — multipart streaming với object_store multipart S3 minimum 5MB part size; (iv) invoice PDF download stream từ S3 future — tokio_util::io::ReaderStream wrap S3 GetObject stream. Workspace State KHÔNG đổi (B38 conceptual + preview pattern, KHÔNG tạo file thực tế); file crates/shop-api/src/routes/admin/orders.rs handler export_orders_csv + crates/shop-api/src/routes/admin/notifications.rs handler SSE order updates + workspace dep http-body-util/futures/tokio-util sẽ add ở B62/B189/B197.

14/06/2026
10 phút đọc
2 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu khác biệt buffered body (Bytes, Json<T>) load full vào memory vs streaming body (Body) xử lý từng chunk khi đến.
  • Biết extract streaming body qua axum::body::Body extractor + BodyExt::into_data_stream() consume từng chunk qua StreamExt::next().await.
  • Áp dụng pattern stream upload large file với incremental size check (reject 413 ngay khi vượt, KHÔNG đợi load full).
  • Hiểu Body::from_stream(stream) cho download response chunked transfer encoding tự động — wrap file qua tokio_util::io::ReaderStream.
  • Nắm cơ chế backpressure qua bounded mpsc channel: server CHỈ đọc khi client ack chunk trước → memory cap predictable.
  • Biết Server-Sent Events (SSE) qua axum::response::sse::Sse built-in cho real-time push event tới client.
  • Pattern Shop API: admin CSV export 1M order rows streaming qua sqlx .fetch() B197, order updates SSE B189, product image upload B62.
2

Buffered vs Streaming — Khác Biệt Cốt Lõi

Hai mô hình xử lý body khác nhau căn bản về memory profile và độ phức tạp code. Buffered (Bytes, String, Json<T>, Form<T>) load FULL body vào memory rồi parse 1 lần — đơn giản, error handling rõ ràng (parse fail trả 422 ngay), nhưng peak memory = body size → file 1GB nuốt 1GB RAM dễ OOM single-pod K8s. Streaming (Body) xử lý từng chunk khi đến — memory bounded ~1 chunk (8KB-1MB), latency thấp (xử lý song song với download tiếp tục), nhưng code phức tạp hơn và error giữa stream khó recover (đã ghi 500MB vào disk thì abort thế nào).

Buffered (Bytes/Json):
   [chunk1][chunk2][chunk3] → [FULL BODY trong RAM] → parse → handler
                              (peak memory = body size)

Streaming (Body):
   [chunk1] → handler.process(chunk1) → drop chunk1
   [chunk2] → handler.process(chunk2) → drop chunk2
   [chunk3] → handler.process(chunk3) → drop chunk3
              (peak memory = 1 chunk ~8KB-1MB)

Decision rule lock vĩnh viễn Shop API:

  • File ≤ 5MB → buffered (Bytes/Json/AppMultipart B36) — code đơn giản parse 1 lần, error handling clean qua ? operator, peak memory predictable trong giới hạn cap. Use case: webhook payload Stripe ~10-100KB stay buffered (lock B37), product image admin upload ≤5MB (lock B36), JSON API endpoint baseline ≤2MB default.
  • File > 5MB → streaming (Body) MANDATORY — memory bounded regardless file size, peak ~1MB chunk. Use case: admin CSV export 1M+ order rows ~200MB (B197), invoice PDF stream từ S3 future, log file download admin.
  • Ngưỡng 5MB align DefaultBodyLimit::max(5 * 1024 * 1024) product image route layer lock B36 — vượt ngưỡng phải explicit chuyển sang streaming pattern hoặc tăng limit per route.

Streaming KHÔNG free lunch — trade-off với buffered: (a) error mid-stream khó recover (đã ghi 500MB vào S3 multipart upload rồi server lỗi → phải abort_multipart_upload cleanup tránh orphan part charge phí), (b) validate format khó (chỉ có chunk hiện tại, không biết toàn thể), (c) backpressure phải explicit handle (chi tiết bước 6). Decision rule trên cân bằng simplicity (buffered cho 90% endpoint) vs scalability (streaming cho 10% endpoint critical).

3

axum::body::Body Extractor

axum cung cấp axum::body::Body wrapper trait http_body::Body (từ http crate ecosystem tokio-rs) cho stream từng chunk. Pattern consume: convert qua body.into_data_stream() trả impl Stream<Item = Result<Bytes, axum::Error>> rồi loop iterate qua trait StreamExt::next().await:

// File: crates/shop-api/src/routes/admin/uploads.rs (B62 implement)
use axum::body::Body;
use axum::http::StatusCode;
use http_body_util::BodyExt;
use futures::stream::StreamExt;
use shop_common::error::{AppError, AppResult};

const MAX_UPLOAD_SIZE: u64 = 100 * 1024 * 1024; // 100MB

async fn upload_large_file(body: Body) -> AppResult<StatusCode> {
    let mut stream = body.into_data_stream();
    let mut total_bytes = 0u64;
    let mut file = tokio::fs::File::create("/tmp/upload.bin").await
        .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;

    while let Some(chunk_result) = stream.next().await {
        let chunk = chunk_result
            .map_err(|e| AppError::BadRequest(format!("stream error: {}", e)))?;

        total_bytes += chunk.len() as u64;
        if total_bytes > MAX_UPLOAD_SIZE {
            return Err(AppError::BadRequest("file too large".to_string()));
        }

        tokio::io::AsyncWriteExt::write_all(&mut file, &chunk).await
            .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;
    }

    Ok(StatusCode::CREATED)
}

Điểm chú ý:

  • body: Body đặt CUỐI arg list theo lock B31 (cùng quy tắc Json/Form/Multipart/Bytes B32/B35/B36/B37) — Body consume request body one-shot qua FromRequest.
  • body.into_data_stream() đến từ trait http_body_util::BodyExt — workspace dep sẽ add http-body-util = "0.1" khi B62 implement.
  • StreamExt::next() đến từ futures::stream::StreamExt trait — workspace dep futures = "0.3"; alternative tokio_stream::StreamExt compat tương đương.
  • Mỗi chunkbytes::Bytes cheap-clone Arc internal (đã giới thiệu B37) — chunk size phụ thuộc hyper transport layer (HTTP/1.1 chunked encoding ~8-32KB typical, HTTP/2 frame ~16KB) KHÔNG control trực tiếp từ application.
  • Incremental size check qua total_bytes > MAX_UPLOAD_SIZE reject ngay khi vượt — KHÔNG đợi load hết body như Bytes extractor (tiết kiệm bandwidth + memory cho attack DoS spam request lớn).
  • DefaultBodyLimit::max(N) route layer vẫn áp dụng cho Body extractor — server abort connection khi vượt limit trước cả khi handler thấy chunk.
4

Stream Upload Large File — Pattern

Pitfall lớn nhất: handler extract Bytes cho file 1GB — axum load full Body vào memory rồi materialize thành bytes::Bytes → process bị OOM kill bởi K8s (memory limit container thường 512MB-2GB). Pitfall xảy ra khi developer copy-paste pattern Bytes từ webhook B37 cho mọi endpoint upload mà không kiểm tra size budget.

Fix: extract Body → stream chunk → incremental size check (đã thấy ở bước 3). Save destination 2 lựa chọn:

(a) Save to disk qua tokio::fs::File + AsyncWriteExt::write_all — phù hợp dev/single-pod tạm; production tránh vì disk local pod restart mất file (lock B36 disk local CHỈ dev).

(b) Upload S3 multipart streaming — pattern production: chia file thành parts ≥5MB (S3 minimum part size hard limit) upload đồng thời, complete khi xong:

// File: crates/shop-api/src/routes/admin/uploads.rs (B62 implement)
// Pseudo-code S3 multipart streaming — thực tế B62 dùng object_store crate
// abstract chi tiết (lock B36 lean về object_store cho R2 swap)

const PART_SIZE: usize = 5 * 1024 * 1024; // 5MB S3 minimum

async fn upload_to_s3_streaming(
    body: Body,
    s3: &S3Client,
    bucket: &str,
    key: &str,
) -> AppResult<()> {
    let upload_id = s3.create_multipart_upload(bucket, key).await?;
    let mut stream = body.into_data_stream();
    let mut buffer: Vec<u8> = Vec::with_capacity(PART_SIZE);
    let mut part_number = 1i32;
    let mut completed_parts = Vec::new();

    while let Some(chunk_result) = stream.next().await {
        let chunk = chunk_result?;
        buffer.extend_from_slice(&chunk);

        // Upload mỗi 5MB (S3 minimum part size, trừ part cuối)
        if buffer.len() >= PART_SIZE {
            let etag = s3.upload_part(
                bucket,
                key,
                &upload_id,
                part_number,
                buffer.clone().into(),
            ).await?;
            completed_parts.push((part_number, etag));
            buffer.clear();
            part_number += 1;
        }
    }

    // Upload buffer còn lại (part cuối có thể nhỏ hơn 5MB)
    if !buffer.is_empty() {
        let etag = s3.upload_part(bucket, key, &upload_id, part_number, buffer.into()).await?;
        completed_parts.push((part_number, etag));
    }

    s3.complete_multipart_upload(bucket, key, &upload_id, completed_parts).await?;
    Ok(())
}

Shop API B62 implement product image upload qua object_store crate đã handle multipart S3 abstraction tự động (lock B36) — handler chỉ stream chunk vào BufWriter wrap object_store::PutMultipartWriter, object_store quản phần boundary 5MB internal. Tránh tự implement multipart trừ khi cần fine-grained control (vd parallel upload nhiều part đồng thời tăng throughput).

Error mid-stream lưu ý cleanup orphan part qua s3.abort_multipart_upload(upload_id) nếu lỗi sau khi đã upload vài part — S3 charge phí storage cho orphan part đến khi cleanup (default lifecycle policy 7 ngày).

5

Body::from_stream Cho Download Response

Streaming KHÔNG chỉ cho upload — response cũng có thể stream chunk thay vì 1 response body lớn. Use case: large file download (video, log, PDF invoice 100MB+), Server-Sent Events real-time push. Pattern: tạo Stream<Item = Result<Bytes, E>> rồi wrap qua Body::from_stream(stream):

// File: crates/shop-api/src/routes/admin/logs.rs (future implement)
use axum::body::Body;
use axum::http::header;
use axum::response::IntoResponse;
use tokio_util::io::ReaderStream;
use shop_common::error::{AppError, AppResult};

async fn download_log() -> AppResult<impl IntoResponse> {
    let log_path = "/var/log/shop-api.log";
    let file = tokio::fs::File::open(log_path).await
        .map_err(|_| AppError::NotFound)?;

    // ReaderStream: AsyncRead → Stream<Item = Result<Bytes, io::Error>>
    let stream = ReaderStream::new(file);
    let body = Body::from_stream(stream);

    Ok((
        [
            (header::CONTENT_TYPE, "text/plain"),
            (header::CONTENT_DISPOSITION, "attachment; filename=\"shop-api.log\""),
        ],
        body,
    ))
}

Điểm chú ý:

  • tokio_util::io::ReaderStream cầu nối AsyncRead (trait base của tokio::fs::File, tokio::net::TcpStream, S3 GetObject response stream) sang Stream<Item = Result<Bytes, std::io::Error>> — workspace dep tokio-util = { version = "0.7", features = ["io"] }.
  • Chunk size default ReaderStream 8KB; cấu hình qua ReaderStream::with_capacity(file, 64 * 1024) nếu cần balance throughput vs memory.
  • Body::from_stream(stream) wrap Stream thành axum::body::Body implement IntoResponse; axum + hyper tự set Transfer-Encoding: chunked response header — KHÔNG cần Content-Length (server không biết trước file size).
  • Browser/curl nhận chunked transfer encoding incremental — TTFB (Time to First Byte) chỉ ~50ms (open file + send headers), KHÔNG phải đợi server đọc xong full file mới gửi response.
  • Content-Disposition: attachment; filename="..." trigger browser download dialog thay vì inline render (security: chống XSS HTML upload tương tự lock B36).

Use case Shop API future: invoice PDF download stream từ S3 — s3.get_object(bucket, key) trả ByteStream implement Stream<Bytes>, wrap thẳng qua Body::from_stream proxy bytes từ S3 sang client KHÔNG buffer toàn file vào RAM server.

6

Backpressure — Server KHÔNG OOM Khi Client Slow

Pitfall: server đọc file 10GB nhanh từ disk/S3 (1GB/s SSD NVMe) nhưng client download chậm qua mobile network (1MB/s) → nếu server tự ý đọc trước hết file vào buffer chờ client → server buffer hết 10GB RAM → OOM. Vấn đề kinh điển fast producer + slow consumer.

Backpressure = cơ chế producer (server đọc file) tự dừng đọc khi consumer (client download) chưa kịp xử lý chunk trước. axum + hyper tự handle qua tower::Service design pull-based (consumer pull data thay vì producer push) — khi consumer chưa poll_next Stream thì producer task tự suspend tại tx.send().await. Nhưng để có fine-grained control + memory cap predictable, dùng explicit bounded channel tokio mpsc:

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use axum::body::Body;
use bytes::Bytes;

async fn stream_large_file_with_backpressure() -> impl IntoResponse {
    // Bounded channel: capacity 10 chunk → memory cap = 10 × chunk_size
    let (tx, rx) = mpsc::channel::<Result<Bytes, std::io::Error>>(10);

    // Spawn producer task đọc file vào channel
    tokio::spawn(async move {
        let mut file = tokio::fs::File::open("/data/big.bin").await.unwrap();
        let mut buf = vec![0u8; 1024 * 1024]; // 1MB chunk

        loop {
            let n = match tokio::io::AsyncReadExt::read(&mut file, &mut buf).await {
                Ok(0) => break, // EOF
                Ok(n) => n,
                Err(e) => { let _ = tx.send(Err(e)).await; return; }
            };
            // send().await tự block khi rx chưa consume (channel full)
            // → backpressure tự nhiên: producer dừng khi consumer chậm
            if tx.send(Ok(Bytes::copy_from_slice(&buf[..n]))).await.is_err() {
                return; // receiver dropped (client disconnect)
            }
        }
    });

    // ReceiverStream: mpsc::Receiver → Stream
    let stream = ReceiverStream::new(rx);
    Body::from_stream(stream)
}

Phân tích:

  • Bounded channel capacity 10 — producer task block tại tx.send().await khi đã có 10 chunk pending trong channel, đợi consumer (client) lấy chunk ra qua poll Stream mới send tiếp.
  • Memory cap predictable = capacity × chunk_size = 10 × 1MB = 10MB peak per request regardless file size (10GB hay 100MB đều cap 10MB).
  • tokio_stream::wrappers::ReceiverStream wrap mpsc::Receiver implement Stream — axum poll Stream qua hyper response writer, client TCP slow → hyper slow poll → ReceiverStream slow poll → channel chậm drain → producer block — backpressure propagate end-to-end tự động.
  • Client disconnect → axum drop response body → hyper drop ReceiverStream → rx drop → tx.send() trả Err → producer task return cleanup (giải phóng file handle + memory ngay).
  • Capacity 10 là default Shop API lock — balance throughput (lớn hơn 1 cho phép producer làm việc trước consumer) vs memory cap (nhỏ hơn 100 tránh peak quá lớn). Tăng capacity nếu cần throughput cao với file nhỏ, giảm nếu memory critical.

Backpressure quan trọng cho large file stream + SSE long-lived connection — nếu KHÔNG bound, attacker mở 100 connection download chậm sẽ ép server allocate 100 × file_size RAM → DoS.

7

Server-Sent Events (SSE) Pattern

SSE (Server-Sent Events, chuẩn HTML5 W3C 2015) là pattern server push events tới client liên tục qua HTTP streaming long-lived connection. Khác WebSocket: SSE 1 chiều server → client (đủ cho notification/update real-time), reuse HTTP/1.1 + HTTP/2 native (proxy/CDN/firewall friendly), browser tự reconnect khi disconnect, format text-based plain dễ debug.

Content-Type response: text/event-stream. Format mỗi event: data: <json>\n\n (2 newline kết thúc event). axum có module built-in axum::response::sse::Sse wrap Stream của Event trả response chuẩn:

// File: crates/shop-api/src/routes/admin/notifications.rs (B189 implement)
use axum::extract::{Path, State};
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use std::convert::Infallible;
use std::time::Duration;
use crate::AppState;

async fn order_updates(
    State(state): State<AppState>,
    Path(order_id): Path<i64>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    // Subscribe Redis pub/sub channel "order:status:{id}" (B189 implement)
    let mut rx = state.redis.subscribe(&format!("order:status:{}", order_id)).await;

    let stream = async_stream::stream! {
        while let Some(payload) = rx.recv().await {
            let event = Event::default().data(payload);
            yield Ok(event);
        }
    };

    Sse::new(stream).keep_alive(
        KeepAlive::new()
            .interval(Duration::from_secs(30))
            .text("keep-alive")
    )
}

Điểm chú ý:

  • Sse::new(stream) trả response Content-Type: text/event-stream + Cache-Control: no-cache tự động — KHÔNG cần build header manual.
  • KeepAlive::interval(Duration::from_secs(30)) gửi comment line : keep-alive\n\n mỗi 30s để chống proxy NGINX/CloudFlare/HAProxy idle timeout default 60s đóng connection — MANDATORY cho SSE long-lived connection production.
  • Item type Result<Event, Infallible> vì stream KHÔNG fail (Redis subscribe loop forever); nếu Redis error retry transparent qua client browser tự reconnect mà.
  • Client browser dùng new EventSource("/api/v1/notifications/order/123") nhận event qua onmessage handler — auto reconnect khi connection drop với last-event-id resume.
  • Backpressure SSE: nếu Redis pub publish nhanh hơn client consume → bounded channel mpsc capacity nhỏ giữa Redis subscriber và Sse stream (pattern bước 6) tránh OOM.

Shop API use case lock: real-time order status update B189 — customer/admin xem order list page với SSE stream từ Redis pub/sub channel khi order chuyển trạng thái (pending → paid → shipping → delivered), notification toast hiện ngay không cần F5 reload. SSE pattern + keepalive deep dive chi tiết B50 module riêng.

8

Pattern Shop API: CSV Export Streaming

Use case kinh điển streaming response Shop API: admin export 1M+ order rows ra CSV. Pattern buffered (build String 200MB rồi gửi) → OOM single-pod K8s + TTFB 30s+ (admin nhìn loading spinner mãi mới thấy download bắt đầu). Pattern streaming MANDATORY:

// File: crates/shop-api/src/routes/admin/orders.rs (B197 implement)
use axum::body::Body;
use axum::extract::State;
use axum::http::header;
use axum::response::IntoResponse;
use bytes::Bytes;
use futures::stream::StreamExt;
use shop_common::error::AppError;
use crate::extractors::RequireRole; // B135 RBAC
use crate::AppState;

async fn export_orders_csv(
    State(state): State<AppState>,
    _admin: RequireRole<"admin">,
) -> impl IntoResponse {
    // sqlx .fetch() trả Stream lazy load từng row từ Postgres cursor
    let row_stream = sqlx::query!(
        "SELECT id, customer_id, total, status, created_at \
         FROM orders ORDER BY created_at DESC"
    )
    .fetch(&state.pool);

    // Stream header CSV trước, sau đó stream từng row
    let header_row = "id,customer_id,total,status,created_at\n".to_string();
    let header_stream = futures::stream::once(async move {
        Ok::<_, AppError>(Bytes::from(header_row))
    });

    let body_stream = row_stream.map(|row_result| {
        let row = row_result
            .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;
        let csv_row = format!(
            "{},{},{},{},{}\n",
            row.id, row.customer_id, row.total, row.status,
            row.created_at.to_rfc3339(),
        );
        Ok::<_, AppError>(Bytes::from(csv_row))
    });

    let full_stream = header_stream.chain(body_stream);

    let filename = format!("orders-{}.csv", chrono::Utc::now().format("%Y%m%d"));
    (
        [
            (header::CONTENT_TYPE, "text/csv; charset=utf-8"),
            (
                header::CONTENT_DISPOSITION,
                &format!("attachment; filename=\"{}\"", filename),
            ),
        ],
        Body::from_stream(full_stream),
    )
}

Phân tích:

  • sqlx::query!().fetch(&pool) trả impl Stream<Item = Result<Row, sqlx::Error>> — lazy load từng row qua Postgres cursor transaction, KHÔNG materialize Vec 1M row vào RAM. So với .fetch_all() trả Vec<Row> load full → OOM trên 1M row.
  • Memory bounded ~100KB peak (vài chục row chunk cộng channel buffer) regardless 1M hay 10M row.
  • TTFB ~50ms — server gửi header response + CSV header row ngay sau query setup, KHÔNG đợi load full DB.
  • Client browser tải về incremental, hiện progress bar bytes downloaded, save file disk từng phần — admin thấy file đang download ngay.
  • RequireRole<"admin"> extractor RBAC B135 enforce quyền admin trước khi run query expensive.
  • Production lưu ý: query DB hold connection lâu (1M row × 1ms/row = 1000s = ~17 phút) → set statement_timeout Postgres lớn hơn, dùng read replica tránh ảnh hưởng pool primary, monitor pool starvation Prometheus.

Pattern lock vĩnh viễn Shop API: mọi export admin lớn (orders, customers, payments, inventory transactions) MANDATORY streaming qua sqlx .fetch() + Body::from_stream, KHÔNG buffer toàn data. Crate csv = "1" alternative format chuẩn handle escape comma/quote/newline đúng — B197 chọn cuối khi implement.

9

Tổng Kết

  • Buffered (Bytes, Json<T>, Form<T>): load FULL body vào RAM → simple, OK cho file ≤5MB (webhook, JSON API endpoint, product image lock B36).
  • Streaming (Body): xử lý từng chunk khi đến → memory bounded ~1 chunk (8KB-1MB), OK cho file >5MB; trade-off code phức tạp + error mid-stream khó recover.
  • Decision rule lock vĩnh viễn: ≤5MB → buffered, >5MB → streaming MANDATORY; ngưỡng align body size limit product image B36.
  • axum::body::Body extractor + BodyExt::into_data_stream() + loop StreamExt::next().await consume từng chunk Bytes cheap-clone.
  • Upload streaming pattern 4 bước: extract Body CUỐI arg list → into_data_stream → loop chunk + incremental size check (reject 413 ngay khi vượt) → save disk hoặc S3 multipart 5MB minimum part size qua object_store (lock B36).
  • Download streaming pattern: Body::from_stream(stream) wrap Stream → axum + hyper set Transfer-Encoding: chunked tự động; cầu nối từ file/TcpStream/S3 qua tokio_util::io::ReaderStream::new(reader).
  • Backpressure: bounded mpsc channel tokio::sync::mpsc::channel(10) → producer block khi consumer chậm → memory cap predictable = capacity × chunk_size (vd 10 × 1MB = 10MB peak per request).
  • SSE: axum::response::sse::Sse built-in wrap Stream của Event + KeepAlive::interval(30s) chống proxy idle timeout — B50 chi tiết.
  • CSV export pattern: sqlx .fetch() lazy stream row từ Postgres cursor → map format CSV → Body::from_stream → peak memory ~100KB regardless 1M row, TTFB ~50ms.
  • Pattern Shop API lock: admin CSV export B197 (streaming MANDATORY), order updates SSE B189 (Redis pub/sub forward), product image upload streaming B62 (object_store multipart S3).
  • Workspace.dependencies preview B62/B189/B197 add: http-body-util = "0.1" (BodyExt::into_data_stream), futures = "0.3" (StreamExt::next), tokio-util = { version = "0.7", features = ["io"] } (ReaderStream cầu nối AsyncRead ↔ Stream).
  • B38 conceptual + preview pattern, KHÔNG tạo file thực tế ở Shop API (Workspace State KHÔNG đổi). File implementation + workspace dep sẽ add ở B62 (product image), B189 (SSE order), B197 (CSV export).
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. Phân biệt buffered (Bytes) vs streaming (Body). Peak memory mỗi loại bằng gì? Decision rule Shop API khi nào dùng cái nào và tại sao ngưỡng 5MB?
  2. Upload file 1GB với Bytes extractor → vấn đề gì? Fix bằng pattern nào? Liệt kê 4 bước pattern fix MANDATORY.
  3. Body::from_stream(stream) cần Stream type với Item là gì? Header response axum + hyper tự set là gì? Cầu nối tokio::fs::File → Stream qua type nào?
  4. Backpressure là gì? Tại sao fast producer + slow consumer gây OOM? Bounded mpsc channel giải quyết thế nào? Memory cap predictable bằng công thức nào?
  5. CSV export 1M order rows. Pattern streaming so với load full vào RAM khác nhau gì về peak memory + TTFB + risk pod K8s? sqlx .fetch() vs .fetch_all() khác gì?
Đáp án
  1. Buffered vs streaming khác biệt cốt lõi: (a) Buffered (Bytes, String, Json<T>, Form<T>, AppMultipart<T>) load FULL body vào memory rồi parse 1 lần; axum nhận chunk từ hyper TCP → tích lũy vào Vec<u8> internal → khi full body materialize bytes::Bytes hoặc deserialize struct → trả handler. Peak memory = body size — file 1GB nuốt 1GB RAM một mạch. Pros: code đơn giản (parse 1 lần, không loop chunk), error handling clean (fail → 422 ngay), validate format dễ (có toàn body so check schema). Cons: large file OOM single-pod (K8s memory limit 512MB-2GB kill container), TTFB cao (server đợi full body mới process), bandwidth waste cho attack DoS spam request lớn. (b) Streaming (Body) xử lý từng chunk khi đến qua body.into_data_stream() + loop StreamExt::next().await; chunk size hyper transport layer ~8KB-1MB tùy HTTP/1.1 chunked encoding hay HTTP/2 frame. Peak memory ~1 chunk (8KB-1MB) — file 1GB hay 100MB cùng cap chunk size. Pros: memory bounded regardless body size, TTFB thấp (xử lý song song với download tiếp tục), reject 413 incremental ngay khi vượt size check không waste bandwidth. Cons: code phức tạp (manual loop + state machine), error mid-stream khó recover (đã ghi 500MB vào S3 multipart phải abort_multipart_upload cleanup orphan part), validate format khó (chỉ chunk hiện tại không biết toàn thể), backpressure phải explicit handle (bounded channel mpsc bước 6). Decision rule Shop API lock vĩnh viễn: ≤5MB → buffered, >5MB → streaming MANDATORY. Use case buffered: webhook payload Stripe ~10-100KB (B37 raw Bytes verify HMAC), product image admin upload ≤5MB (B36 multipart với magic bytes verify), JSON API endpoint ≤2MB default. Use case streaming: admin CSV export 1M+ order ~200MB (B197), invoice PDF stream từ S3 future, log file download admin, video file thumbnail. Tại sao ngưỡng 5MB: (i) align DefaultBodyLimit::max(5 * 1024 * 1024) product image route layer lock B36 — consistency cross-endpoint không phải định nghĩa nhiều ngưỡng khác nhau dev confused; (ii) 5MB là sweet spot S3 multipart minimum part size (S3 yêu cầu part >= 5MB trừ part cuối) — file <5MB upload single-shot PUT object đủ, không cần multipart logic phức tạp; (iii) typical K8s memory limit container 512MB - 2GB chứa được 100+ concurrent buffered request 5MB không OOM, vượt 5MB risk; (iv) trade-off learning curve — dev mới chỉ cần học buffered cho 90% endpoint, streaming chỉ cho endpoint critical (CSV export, large file) tránh over-engineering toàn bộ API.
  2. Upload file 1GB với Bytes extractor — vấn đề OOM kinh điển: axum Bytes extractor implement FromRequest consume body one-shot — pattern: nhận chunk từ hyper TCP → tích lũy vào Vec<u8> internal → khi Body::Data stream EOF materialize bytes::Bytes wrap Arc<Vec<u8>> → trả handler. Quá trình tích lũy này load FULL 1GB vào RAM một mạch — peak memory = 1GB. Container K8s thường có memory limit 512MB-2GB (cost-efficient), 1GB request lập tức trigger OOM Killer kill pod (kernel panic exit code 137); pod restart 30-60s rolling deploy → mất tất cả request đang xử lý concurrent. Worse: nếu DefaultBodyLimit::max(N) chưa override per route, axum default cap 2MB → request 1GB reject 413 sớm OK; nhưng nếu route đã override lớn hơn (vd 100MB cho upload feature) thì OOM gây cascade failure ảnh hưởng mọi endpoint shared pod. Pitfall xảy ra khi developer copy-paste pattern Bytes từ webhook B37 (~100KB OK) cho endpoint upload large file mà không kiểm tra size budget. Fix bằng pattern streaming với Body extractor: peak memory ~1 chunk (1MB nếu config chunk size 1MB) regardless file size 1GB hay 10GB. 4 bước pattern MANDATORY lock vĩnh viễn Shop API: (1) Extract body: Body đặt CUỐI arg list theo lock B31 (cùng quy tắc Json/Form/Multipart/Bytes body extractor B32/B35/B36/B37 vì consume body one-shot — sai vị trí compile error). (2) Convert sang Stream qua body.into_data_stream() — trait http_body_util::BodyExt workspace dep http-body-util = "0.1"; trả impl Stream<Item = Result<Bytes, axum::Error>> lazy chunk-by-chunk. (3) Loop consume chunk qua while let Some(chunk_result) = stream.next().await + incremental size check total_bytes += chunk.len() as u64; if total_bytes > MAX_UPLOAD_SIZE { return Err(AppError::BadRequest("file too large")); } reject 413 ngay khi vượt KHÔNG đợi load hết — quan trọng cho DoS protection (attacker spam request 10GB sẽ bị reject sau 1 chunk đầu, không waste bandwidth + memory). (4) Save destination: (a) disk local qua tokio::fs::File::create + AsyncWriteExt::write_all(&chunk) — CHỈ dev (lock B36 disk local pod restart mất file); (b) S3 multipart streaming production — buffer mỗi 5MB (S3 minimum part size hard limit AWS) upload qua upload_part(upload_id, part_number, buffer), complete khi xong qua complete_multipart_upload(parts), abort cleanup orphan khi error mid-stream qua abort_multipart_upload tránh charge phí storage 7 ngày default lifecycle. Shop API B62 product image upload qua object_store crate abstract multipart S3 tự động — lean về object_store cho R2 swap (lock B36) thay aws-sdk-s3 (dependency lớn ~30 transitive AWS lock-in). Workspace dep preview: http-body-util = "0.1", futures = "0.3", object_store = "0.11" hoặc aws-sdk-s3 = "1" (TBD lock B62).
  3. Body::from_stream(stream) requirement Item type: Stream phải có Item = Result<T, E> với T: Into<Bytes> + E: Into<BoxError> (BoxError = Box<dyn std::error::Error + Send + Sync>). Typical: Item = Result<Bytes, std::io::Error> cho file stream, Item = Result<Bytes, sqlx::Error> cho DB stream, Item = Result<Bytes, AppError> cho custom domain error map. Header response axum + hyper tự set: (a) Transfer-Encoding: chunked response header — vì server KHÔNG biết trước Content-Length (Stream lazy unknown size); HTTP/1.1 chunked encoding format <hex_size>\r\n<chunk_bytes>\r\n0\r\n\r\n (size 0 báo EOF). HTTP/2 không cần chunked vì frame format built-in length per frame. (b) Content-Type default application/octet-stream nếu KHÔNG override — developer phải set explicit (header::CONTENT_TYPE, "text/csv") hoặc "text/plain" tùy use case qua tuple (headers, body) IntoResponse. (c) Content-Length axum KHÔNG set vì unknown size — browser/curl handle chunked transfer encoding incremental tự động không cần Content-Length. Cầu nối tokio::fs::File → Stream: tokio_util::io::ReaderStream::new(reader) wrap bất kỳ type implement tokio::io::AsyncRead trait sang Stream<Item = Result<Bytes, std::io::Error>> — workspace dep tokio-util = { version = "0.7", features = ["io"] } (feature io mandatory cho ReaderStream + StreamReader). Chunk size default 8KB cấu hình qua ReaderStream::with_capacity(file, 64 * 1024) balance throughput (larger chunk = fewer syscall read) vs memory (larger chunk = larger peak). Type implement AsyncRead phổ biến: tokio::fs::File file local, tokio::net::TcpStream proxy upstream, aws_sdk_s3::primitives::ByteStream S3 GetObject response, tokio::io::BufReader wrap stream khác để giảm syscall, std::io::Cursor wrap Vec<u8> in-memory dùng cho test. Pattern Shop API future invoice PDF download: let resp = s3.get_object(bucket, key).await?; let body = Body::from_stream(resp.body.into_async_read()); Ok((headers, body)) — proxy bytes từ S3 sang client KHÔNG buffer toàn file vào RAM server, peak memory ~chunk size + Time to First Byte ~50ms (S3 GetObject latency).
  4. Backpressure định nghĩa: cơ chế producer (server đọc file/DB) tự động dừng đọc khi consumer (client download) chưa kịp xử lý chunk trước, đảm bảo memory bounded không leak. Pull-based design: consumer chủ động pull data khi sẵn sàng thay vì producer push tuyến tính bất kể consumer state. Tại sao fast producer + slow consumer gây OOM: pitfall kinh điển server đọc file 10GB nhanh từ disk SSD NVMe (~1GB/s) nhưng client download chậm qua mobile network (~1MB/s). Tỷ lệ chênh 1000x → trong 10s server đọc xong toàn 10GB file vào buffer chờ client, client mới consume được 10MB → server hold 10GB RAM chờ → OOM Killer K8s kill pod. Variant worse: attacker mở 100 connection download chậm đồng thời, mỗi connection hold partial file → server allocate 100 × N GB → DoS chỉ với ~10 client. Vấn đề universal mọi system queue-based: log pipeline (producer ghi log nhanh hơn consumer tail), Kafka topic (producer publish nhanh hơn consumer subscribe), TCP socket buffer (sender ghi nhanh hơn receiver read). Bounded mpsc channel giải quyết thế nào: let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, io::Error>>(10) — channel với bounded capacity 10; producer task tx.send(chunk).await tự suspend khi channel đã có 10 chunk pending (chưa được consumer lấy ra qua rx.recv().await) — tokio scheduler park task tại send().await đợi capacity available, KHÔNG block worker thread (cooperative scheduling). Khi consumer (axum poll Stream qua ReceiverStream wrap rx) lấy 1 chunk ra → capacity freed 1 slot → producer wake up gửi tiếp. Backpressure propagate end-to-end: client TCP slow read → hyper chậm poll response writer → axum chậm poll Stream → ReceiverStream chậm poll rx → channel chậm drain → producer block tại tx.send. Producer task tự nhiên rate-limit theo tốc độ consumer. Client disconnect → axum drop response → hyper drop ReceiverStream → rx drop → tx.send() trả Err(SendError) → producer task return cleanup ngay (giải phóng file handle + memory). Memory cap predictable công thức: memory_peak = capacity × chunk_size. Vd capacity 10 × chunk 1MB = 10MB peak per request regardless file size (1GB, 10GB đều cap 10MB). Áp dụng cho 100 concurrent connection: 100 × 10MB = 1GB total memory budget — fit container 2GB limit OK. So capacity 1000 × 1MB = 1GB per request → 100 connection = 100GB → OOM ngay. Decision lock Shop API capacity 10 default: cân bằng (a) throughput (capacity 1 quá nhỏ producer luôn block dù consumer chỉ chậm 1 chunk lag, latency cao), (b) memory cap (capacity 100+ peak quá lớn, không buffer được attack), (c) typical chunk size 1MB cho large file stream. Tune theo workload: file nhỏ + throughput cao → capacity 50, large file + memory critical → capacity 5.
  5. CSV export 1M order rows — streaming vs buffered: (a) Buffered pattern (anti-pattern OOM risk): let rows: Vec<Order> = sqlx::query_as::<_, Order>("SELECT * FROM orders").fetch_all(&pool).await?; let csv = build_csv(rows); Ok(csv) — bước 1 load 1M Order struct vào Vec ~500MB RAM (struct ~500 bytes × 1M = 500MB), bước 2 build CSV String 200MB, total peak ~700MB RAM. Container K8s limit 2GB còn handle được nhưng risk concurrent 3 admin export đồng thời = 2.1GB → OOM. TTFB ~30s+ (admin nhìn loading spinner đợi server load full DB + build CSV mới thấy download bắt đầu) — UX kém. (b) Streaming pattern (MANDATORY lock Shop API B197): let row_stream = sqlx::query!("SELECT id, total, status FROM orders").fetch(&state.pool); let body_stream = row_stream.map(|row_result| { let row = row_result?; let csv_row = format!("{},{},{}\n", row.id, row.total, row.status); Ok(Bytes::from(csv_row)) }); Body::from_stream(body_stream) — server gửi response header + first chunk ngay sau query setup ~50ms, sau đó stream từng row format CSV bytes incremental. Peak memory ~100KB (vài chục row chunk + channel buffer + sqlx cursor state) regardless 1M hay 10M row. TTFB ~50ms — browser hiện download progress bar ngay, admin biết file đang tải xuống. Risk pod K8s: streaming KHÔNG ăn RAM nên KHÔNG OOM dù 100 concurrent export đồng thời (100 × 100KB = 10MB) — vẫn cần monitor pool DB connection (1M row hold 1 connection ~17 phút, set statement_timeout Postgres lớn hơn + dùng read replica tránh ảnh hưởng primary pool, monitor pool starvation Prometheus alert khi connection >80% pool size). sqlx .fetch() vs .fetch_all() khác biệt cốt lõi: (a) .fetch(&pool) trả impl Stream<Item = Result<Row, sqlx::Error>> — lazy load row-by-row qua Postgres cursor declared trong transaction; mỗi StreamExt::next().await fetch 1 row (hoặc batch nhỏ tùy driver tune) từ DB cursor, drop sau xử lý → peak memory ~1 row. Hold 1 connection từ pool suốt query duration. Phù hợp large result set (1M+ row). Pitfall: nếu query có ORDER BY phức tạp, Postgres vẫn build full result internal trước stream → memory DB-side cao, statement timeout risk; mitigation: ORDER BY trên indexed column, use cursor cho keyset pagination thay OFFSET. (b) .fetch_all(&pool) trả Result<Vec<Row>, sqlx::Error> — eager load full result vào Vec rồi return, peak memory = sum tất cả row. Phù hợp small result set (<10k row, <10MB RAM). Pitfall: vô tình dùng cho large query → OOM. (c) .fetch_one(&pool) trả Result<Row, sqlx::Error> single row, error nếu 0 hoặc >1 row. (d) .fetch_optional(&pool) trả Result<Option<Row>, sqlx::Error> 0 hoặc 1 row — pattern lock G7 Shop API cho read-by-id (B63 implement). Pattern lock vĩnh viễn Shop API: mọi export admin lớn (orders, customers, payments, inventory transactions, audit_logs) MANDATORY .fetch() stream + Body::from_stream KHÔNG .fetch_all() buffer; small list endpoint user-facing pagination 20 row/page dùng .fetch_all() OK. Crate csv = "1" alternative format chuẩn handle escape comma/quote/newline đúng RFC 4180 — B197 chọn cuối khi implement quyết định manual format!() đơn giản hay csv::Writer đầy đủ.
11

Bài Tiếp Theo

— chi tiết Extension<T> set qua middleware (X-Request-Id middleware sẽ implement đầy đủ với tower-http::SetRequestIdLayer + custom layer wrap), extract trong handler qua Extension(req_id): Extension<RequestId>; so sánh với State<T> (Extension dynamic per-request lifetime, State persistent global Arc cloned cross-handler); use case: request_id propagation, trace context OpenTelemetry, current_user injection sau auth middleware, A/B test variant context.