Mục lục
- Mục Tiêu Bài Học
- Buffered vs Streaming — Khác Biệt Cốt Lõi
axum::body::BodyExtractor- Stream Upload Large File — Pattern
Body::from_streamCho Download Response- Backpressure — Server KHÔNG OOM Khi Client Slow
- Server-Sent Events (SSE) Pattern
- Pattern Shop API: CSV Export Streaming
- Tổng Kết
- Bài Tập Củng Cố
- Bài Tiếp Theo
Mục Tiêu Bài Học
Sau bài học, bạn sẽ:
- Hiểu khác biệt buffered body (
Bytes,Json<T>) load full vào memory vs streaming body (Body) xử lý từng chunk khi đến. - Biết extract streaming body qua
axum::body::Bodyextractor +BodyExt::into_data_stream()consume từng chunk quaStreamExt::next().await. - Áp dụng pattern stream upload large file với incremental size check (reject 413 ngay khi vượt, KHÔNG đợi load full).
- Hiểu
Body::from_stream(stream)cho download response chunked transfer encoding tự động — wrap file quatokio_util::io::ReaderStream. - Nắm cơ chế backpressure qua bounded mpsc channel: server CHỈ đọc khi client ack chunk trước → memory cap predictable.
- Biết Server-Sent Events (SSE) qua
axum::response::sse::Ssebuilt-in cho real-time push event tới client. - Pattern Shop API: admin CSV export 1M order rows streaming qua sqlx
.fetch()B197, order updates SSE B189, product image upload B62.
Buffered vs Streaming — Khác Biệt Cốt Lõi
Hai mô hình xử lý body khác nhau căn bản về memory profile và độ phức tạp code. Buffered (Bytes, String, Json<T>, Form<T>) load FULL body vào memory rồi parse 1 lần — đơn giản, error handling rõ ràng (parse fail trả 422 ngay), nhưng peak memory = body size → file 1GB nuốt 1GB RAM dễ OOM single-pod K8s. Streaming (Body) xử lý từng chunk khi đến — memory bounded ~1 chunk (8KB-1MB), latency thấp (xử lý song song với download tiếp tục), nhưng code phức tạp hơn và error giữa stream khó recover (đã ghi 500MB vào disk thì abort thế nào).
Buffered (Bytes/Json):
[chunk1][chunk2][chunk3] → [FULL BODY trong RAM] → parse → handler
(peak memory = body size)
Streaming (Body):
[chunk1] → handler.process(chunk1) → drop chunk1
[chunk2] → handler.process(chunk2) → drop chunk2
[chunk3] → handler.process(chunk3) → drop chunk3
(peak memory = 1 chunk ~8KB-1MB)
Decision rule lock vĩnh viễn Shop API:
- File ≤ 5MB → buffered (
Bytes/Json/AppMultipartB36) — code đơn giản parse 1 lần, error handling clean qua?operator, peak memory predictable trong giới hạn cap. Use case: webhook payload Stripe ~10-100KB stay buffered (lock B37), product image admin upload ≤5MB (lock B36), JSON API endpoint baseline ≤2MB default. - File > 5MB → streaming (
Body) MANDATORY — memory bounded regardless file size, peak ~1MB chunk. Use case: admin CSV export 1M+ order rows ~200MB (B197), invoice PDF stream từ S3 future, log file download admin. - Ngưỡng 5MB align DefaultBodyLimit::max(5 * 1024 * 1024) product image route layer lock B36 — vượt ngưỡng phải explicit chuyển sang streaming pattern hoặc tăng limit per route.
Streaming KHÔNG free lunch — trade-off với buffered: (a) error mid-stream khó recover (đã ghi 500MB vào S3 multipart upload rồi server lỗi → phải abort_multipart_upload cleanup tránh orphan part charge phí), (b) validate format khó (chỉ có chunk hiện tại, không biết toàn thể), (c) backpressure phải explicit handle (chi tiết bước 6). Decision rule trên cân bằng simplicity (buffered cho 90% endpoint) vs scalability (streaming cho 10% endpoint critical).
axum::body::Body Extractor
axum cung cấp axum::body::Body wrapper trait http_body::Body (từ http crate ecosystem tokio-rs) cho stream từng chunk. Pattern consume: convert qua body.into_data_stream() trả impl Stream<Item = Result<Bytes, axum::Error>> rồi loop iterate qua trait StreamExt::next().await:
// File: crates/shop-api/src/routes/admin/uploads.rs (B62 implement)
use axum::body::Body;
use axum::http::StatusCode;
use http_body_util::BodyExt;
use futures::stream::StreamExt;
use shop_common::error::{AppError, AppResult};
const MAX_UPLOAD_SIZE: u64 = 100 * 1024 * 1024; // 100MB
async fn upload_large_file(body: Body) -> AppResult<StatusCode> {
let mut stream = body.into_data_stream();
let mut total_bytes = 0u64;
let mut file = tokio::fs::File::create("/tmp/upload.bin").await
.map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result
.map_err(|e| AppError::BadRequest(format!("stream error: {}", e)))?;
total_bytes += chunk.len() as u64;
if total_bytes > MAX_UPLOAD_SIZE {
return Err(AppError::BadRequest("file too large".to_string()));
}
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk).await
.map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;
}
Ok(StatusCode::CREATED)
}
Điểm chú ý:
body: Bodyđặt CUỐI arg list theo lock B31 (cùng quy tắcJson/Form/Multipart/BytesB32/B35/B36/B37) —Bodyconsume request body one-shot quaFromRequest.body.into_data_stream()đến từ traithttp_body_util::BodyExt— workspace dep sẽ addhttp-body-util = "0.1"khi B62 implement.StreamExt::next()đến từfutures::stream::StreamExttrait — workspace depfutures = "0.3"; alternativetokio_stream::StreamExtcompat tương đương.- Mỗi
chunklàbytes::Bytescheap-clone Arc internal (đã giới thiệu B37) — chunk size phụ thuộc hyper transport layer (HTTP/1.1 chunked encoding ~8-32KB typical, HTTP/2 frame ~16KB) KHÔNG control trực tiếp từ application. - Incremental size check qua
total_bytes > MAX_UPLOAD_SIZEreject ngay khi vượt — KHÔNG đợi load hết body nhưBytesextractor (tiết kiệm bandwidth + memory cho attack DoS spam request lớn). DefaultBodyLimit::max(N)route layer vẫn áp dụng choBodyextractor — server abort connection khi vượt limit trước cả khi handler thấy chunk.
Stream Upload Large File — Pattern
Pitfall lớn nhất: handler extract Bytes cho file 1GB — axum load full Body vào memory rồi materialize thành bytes::Bytes → process bị OOM kill bởi K8s (memory limit container thường 512MB-2GB). Pitfall xảy ra khi developer copy-paste pattern Bytes từ webhook B37 cho mọi endpoint upload mà không kiểm tra size budget.
Fix: extract Body → stream chunk → incremental size check (đã thấy ở bước 3). Save destination 2 lựa chọn:
(a) Save to disk qua tokio::fs::File + AsyncWriteExt::write_all — phù hợp dev/single-pod tạm; production tránh vì disk local pod restart mất file (lock B36 disk local CHỈ dev).
(b) Upload S3 multipart streaming — pattern production: chia file thành parts ≥5MB (S3 minimum part size hard limit) upload đồng thời, complete khi xong:
// File: crates/shop-api/src/routes/admin/uploads.rs (B62 implement)
// Pseudo-code S3 multipart streaming — thực tế B62 dùng object_store crate
// abstract chi tiết (lock B36 lean về object_store cho R2 swap)
const PART_SIZE: usize = 5 * 1024 * 1024; // 5MB S3 minimum
async fn upload_to_s3_streaming(
body: Body,
s3: &S3Client,
bucket: &str,
key: &str,
) -> AppResult<()> {
let upload_id = s3.create_multipart_upload(bucket, key).await?;
let mut stream = body.into_data_stream();
let mut buffer: Vec<u8> = Vec::with_capacity(PART_SIZE);
let mut part_number = 1i32;
let mut completed_parts = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
buffer.extend_from_slice(&chunk);
// Upload mỗi 5MB (S3 minimum part size, trừ part cuối)
if buffer.len() >= PART_SIZE {
let etag = s3.upload_part(
bucket,
key,
&upload_id,
part_number,
buffer.clone().into(),
).await?;
completed_parts.push((part_number, etag));
buffer.clear();
part_number += 1;
}
}
// Upload buffer còn lại (part cuối có thể nhỏ hơn 5MB)
if !buffer.is_empty() {
let etag = s3.upload_part(bucket, key, &upload_id, part_number, buffer.into()).await?;
completed_parts.push((part_number, etag));
}
s3.complete_multipart_upload(bucket, key, &upload_id, completed_parts).await?;
Ok(())
}
Shop API B62 implement product image upload qua object_store crate đã handle multipart S3 abstraction tự động (lock B36) — handler chỉ stream chunk vào BufWriter wrap object_store::PutMultipartWriter, object_store quản phần boundary 5MB internal. Tránh tự implement multipart trừ khi cần fine-grained control (vd parallel upload nhiều part đồng thời tăng throughput).
Error mid-stream lưu ý cleanup orphan part qua s3.abort_multipart_upload(upload_id) nếu lỗi sau khi đã upload vài part — S3 charge phí storage cho orphan part đến khi cleanup (default lifecycle policy 7 ngày).
Body::from_stream Cho Download Response
Streaming KHÔNG chỉ cho upload — response cũng có thể stream chunk thay vì 1 response body lớn. Use case: large file download (video, log, PDF invoice 100MB+), Server-Sent Events real-time push. Pattern: tạo Stream<Item = Result<Bytes, E>> rồi wrap qua Body::from_stream(stream):
// File: crates/shop-api/src/routes/admin/logs.rs (future implement)
use axum::body::Body;
use axum::http::header;
use axum::response::IntoResponse;
use tokio_util::io::ReaderStream;
use shop_common::error::{AppError, AppResult};
async fn download_log() -> AppResult<impl IntoResponse> {
let log_path = "/var/log/shop-api.log";
let file = tokio::fs::File::open(log_path).await
.map_err(|_| AppError::NotFound)?;
// ReaderStream: AsyncRead → Stream<Item = Result<Bytes, io::Error>>
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
Ok((
[
(header::CONTENT_TYPE, "text/plain"),
(header::CONTENT_DISPOSITION, "attachment; filename=\"shop-api.log\""),
],
body,
))
}
Điểm chú ý:
tokio_util::io::ReaderStreamcầu nốiAsyncRead(trait base củatokio::fs::File,tokio::net::TcpStream, S3GetObjectresponse stream) sangStream<Item = Result<Bytes, std::io::Error>>— workspace deptokio-util = { version = "0.7", features = ["io"] }.- Chunk size default ReaderStream 8KB; cấu hình qua
ReaderStream::with_capacity(file, 64 * 1024)nếu cần balance throughput vs memory. Body::from_stream(stream)wrap Stream thànhaxum::body::BodyimplementIntoResponse; axum + hyper tự setTransfer-Encoding: chunkedresponse header — KHÔNG cầnContent-Length(server không biết trước file size).- Browser/curl nhận chunked transfer encoding incremental — TTFB (Time to First Byte) chỉ ~50ms (open file + send headers), KHÔNG phải đợi server đọc xong full file mới gửi response.
Content-Disposition: attachment; filename="..."trigger browser download dialog thay vì inline render (security: chống XSS HTML upload tương tự lock B36).
Use case Shop API future: invoice PDF download stream từ S3 — s3.get_object(bucket, key) trả ByteStream implement Stream<Bytes>, wrap thẳng qua Body::from_stream proxy bytes từ S3 sang client KHÔNG buffer toàn file vào RAM server.
Backpressure — Server KHÔNG OOM Khi Client Slow
Pitfall: server đọc file 10GB nhanh từ disk/S3 (1GB/s SSD NVMe) nhưng client download chậm qua mobile network (1MB/s) → nếu server tự ý đọc trước hết file vào buffer chờ client → server buffer hết 10GB RAM → OOM. Vấn đề kinh điển fast producer + slow consumer.
Backpressure = cơ chế producer (server đọc file) tự dừng đọc khi consumer (client download) chưa kịp xử lý chunk trước. axum + hyper tự handle qua tower::Service design pull-based (consumer pull data thay vì producer push) — khi consumer chưa poll_next Stream thì producer task tự suspend tại tx.send().await. Nhưng để có fine-grained control + memory cap predictable, dùng explicit bounded channel tokio mpsc:
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use axum::body::Body;
use bytes::Bytes;
async fn stream_large_file_with_backpressure() -> impl IntoResponse {
// Bounded channel: capacity 10 chunk → memory cap = 10 × chunk_size
let (tx, rx) = mpsc::channel::<Result<Bytes, std::io::Error>>(10);
// Spawn producer task đọc file vào channel
tokio::spawn(async move {
let mut file = tokio::fs::File::open("/data/big.bin").await.unwrap();
let mut buf = vec![0u8; 1024 * 1024]; // 1MB chunk
loop {
let n = match tokio::io::AsyncReadExt::read(&mut file, &mut buf).await {
Ok(0) => break, // EOF
Ok(n) => n,
Err(e) => { let _ = tx.send(Err(e)).await; return; }
};
// send().await tự block khi rx chưa consume (channel full)
// → backpressure tự nhiên: producer dừng khi consumer chậm
if tx.send(Ok(Bytes::copy_from_slice(&buf[..n]))).await.is_err() {
return; // receiver dropped (client disconnect)
}
}
});
// ReceiverStream: mpsc::Receiver → Stream
let stream = ReceiverStream::new(rx);
Body::from_stream(stream)
}
Phân tích:
- Bounded channel capacity 10 — producer task block tại
tx.send().awaitkhi đã có 10 chunk pending trong channel, đợi consumer (client) lấy chunk ra qua poll Stream mới send tiếp. - Memory cap predictable =
capacity × chunk_size= 10 × 1MB = 10MB peak per request regardless file size (10GB hay 100MB đều cap 10MB). tokio_stream::wrappers::ReceiverStreamwrapmpsc::ReceiverimplementStream— axum poll Stream qua hyper response writer, client TCP slow → hyper slow poll → ReceiverStream slow poll → channel chậm drain → producer block — backpressure propagate end-to-end tự động.- Client disconnect → axum drop response body → hyper drop ReceiverStream → rx drop → tx.send() trả Err → producer task return cleanup (giải phóng file handle + memory ngay).
- Capacity 10 là default Shop API lock — balance throughput (lớn hơn 1 cho phép producer làm việc trước consumer) vs memory cap (nhỏ hơn 100 tránh peak quá lớn). Tăng capacity nếu cần throughput cao với file nhỏ, giảm nếu memory critical.
Backpressure quan trọng cho large file stream + SSE long-lived connection — nếu KHÔNG bound, attacker mở 100 connection download chậm sẽ ép server allocate 100 × file_size RAM → DoS.
Server-Sent Events (SSE) Pattern
SSE (Server-Sent Events, chuẩn HTML5 W3C 2015) là pattern server push events tới client liên tục qua HTTP streaming long-lived connection. Khác WebSocket: SSE 1 chiều server → client (đủ cho notification/update real-time), reuse HTTP/1.1 + HTTP/2 native (proxy/CDN/firewall friendly), browser tự reconnect khi disconnect, format text-based plain dễ debug.
Content-Type response: text/event-stream. Format mỗi event: data: <json>\n\n (2 newline kết thúc event). axum có module built-in axum::response::sse::Sse wrap Stream của Event trả response chuẩn:
// File: crates/shop-api/src/routes/admin/notifications.rs (B189 implement)
use axum::extract::{Path, State};
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use std::convert::Infallible;
use std::time::Duration;
use crate::AppState;
async fn order_updates(
State(state): State<AppState>,
Path(order_id): Path<i64>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// Subscribe Redis pub/sub channel "order:status:{id}" (B189 implement)
let mut rx = state.redis.subscribe(&format!("order:status:{}", order_id)).await;
let stream = async_stream::stream! {
while let Some(payload) = rx.recv().await {
let event = Event::default().data(payload);
yield Ok(event);
}
};
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(30))
.text("keep-alive")
)
}
Điểm chú ý:
Sse::new(stream)trả responseContent-Type: text/event-stream+Cache-Control: no-cachetự động — KHÔNG cần build header manual.KeepAlive::interval(Duration::from_secs(30))gửi comment line: keep-alive\n\nmỗi 30s để chống proxy NGINX/CloudFlare/HAProxy idle timeout default 60s đóng connection — MANDATORY cho SSE long-lived connection production.- Item type
Result<Event, Infallible>vì stream KHÔNG fail (Redis subscribe loop forever); nếu Redis error retry transparent qua client browser tự reconnect mà. - Client browser dùng
new EventSource("/api/v1/notifications/order/123")nhận event quaonmessagehandler — auto reconnect khi connection drop với last-event-id resume. - Backpressure SSE: nếu Redis pub publish nhanh hơn client consume → bounded channel mpsc capacity nhỏ giữa Redis subscriber và Sse stream (pattern bước 6) tránh OOM.
Shop API use case lock: real-time order status update B189 — customer/admin xem order list page với SSE stream từ Redis pub/sub channel khi order chuyển trạng thái (pending → paid → shipping → delivered), notification toast hiện ngay không cần F5 reload. SSE pattern + keepalive deep dive chi tiết B50 module riêng.
Pattern Shop API: CSV Export Streaming
Use case kinh điển streaming response Shop API: admin export 1M+ order rows ra CSV. Pattern buffered (build String 200MB rồi gửi) → OOM single-pod K8s + TTFB 30s+ (admin nhìn loading spinner mãi mới thấy download bắt đầu). Pattern streaming MANDATORY:
// File: crates/shop-api/src/routes/admin/orders.rs (B197 implement)
use axum::body::Body;
use axum::extract::State;
use axum::http::header;
use axum::response::IntoResponse;
use bytes::Bytes;
use futures::stream::StreamExt;
use shop_common::error::AppError;
use crate::extractors::RequireRole; // B135 RBAC
use crate::AppState;
async fn export_orders_csv(
State(state): State<AppState>,
_admin: RequireRole<"admin">,
) -> impl IntoResponse {
// sqlx .fetch() trả Stream lazy load từng row từ Postgres cursor
let row_stream = sqlx::query!(
"SELECT id, customer_id, total, status, created_at \
FROM orders ORDER BY created_at DESC"
)
.fetch(&state.pool);
// Stream header CSV trước, sau đó stream từng row
let header_row = "id,customer_id,total,status,created_at\n".to_string();
let header_stream = futures::stream::once(async move {
Ok::<_, AppError>(Bytes::from(header_row))
});
let body_stream = row_stream.map(|row_result| {
let row = row_result
.map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;
let csv_row = format!(
"{},{},{},{},{}\n",
row.id, row.customer_id, row.total, row.status,
row.created_at.to_rfc3339(),
);
Ok::<_, AppError>(Bytes::from(csv_row))
});
let full_stream = header_stream.chain(body_stream);
let filename = format!("orders-{}.csv", chrono::Utc::now().format("%Y%m%d"));
(
[
(header::CONTENT_TYPE, "text/csv; charset=utf-8"),
(
header::CONTENT_DISPOSITION,
&format!("attachment; filename=\"{}\"", filename),
),
],
Body::from_stream(full_stream),
)
}
Phân tích:
sqlx::query!().fetch(&pool)trảimpl Stream<Item = Result<Row, sqlx::Error>>— lazy load từng row qua Postgres cursor transaction, KHÔNG materialize Vec 1M row vào RAM. So với.fetch_all()trảVec<Row>load full → OOM trên 1M row.- Memory bounded ~100KB peak (vài chục row chunk cộng channel buffer) regardless 1M hay 10M row.
- TTFB ~50ms — server gửi header response + CSV header row ngay sau query setup, KHÔNG đợi load full DB.
- Client browser tải về incremental, hiện progress bar bytes downloaded, save file disk từng phần — admin thấy file đang download ngay.
RequireRole<"admin">extractor RBAC B135 enforce quyền admin trước khi run query expensive.- Production lưu ý: query DB hold connection lâu (1M row × 1ms/row = 1000s = ~17 phút) → set statement_timeout Postgres lớn hơn, dùng read replica tránh ảnh hưởng pool primary, monitor pool starvation Prometheus.
Pattern lock vĩnh viễn Shop API: mọi export admin lớn (orders, customers, payments, inventory transactions) MANDATORY streaming qua sqlx .fetch() + Body::from_stream, KHÔNG buffer toàn data. Crate csv = "1" alternative format chuẩn handle escape comma/quote/newline đúng — B197 chọn cuối khi implement.
Tổng Kết
- Buffered (
Bytes,Json<T>,Form<T>): load FULL body vào RAM → simple, OK cho file ≤5MB (webhook, JSON API endpoint, product image lock B36). - Streaming (
Body): xử lý từng chunk khi đến → memory bounded ~1 chunk (8KB-1MB), OK cho file >5MB; trade-off code phức tạp + error mid-stream khó recover. - Decision rule lock vĩnh viễn: ≤5MB → buffered, >5MB → streaming MANDATORY; ngưỡng align body size limit product image B36.
axum::body::Bodyextractor +BodyExt::into_data_stream()+ loopStreamExt::next().awaitconsume từng chunk Bytes cheap-clone.- Upload streaming pattern 4 bước: extract
BodyCUỐI arg list → into_data_stream → loop chunk + incremental size check (reject 413 ngay khi vượt) → save disk hoặc S3 multipart 5MB minimum part size quaobject_store(lock B36). - Download streaming pattern:
Body::from_stream(stream)wrap Stream → axum + hyper setTransfer-Encoding: chunkedtự động; cầu nối từ file/TcpStream/S3 quatokio_util::io::ReaderStream::new(reader). - Backpressure: bounded mpsc channel
tokio::sync::mpsc::channel(10)→ producer block khi consumer chậm → memory cap predictable = capacity × chunk_size (vd 10 × 1MB = 10MB peak per request). - SSE:
axum::response::sse::Ssebuilt-in wrap Stream củaEvent+KeepAlive::interval(30s)chống proxy idle timeout — B50 chi tiết. - CSV export pattern: sqlx
.fetch()lazy stream row từ Postgres cursor → map format CSV →Body::from_stream→ peak memory ~100KB regardless 1M row, TTFB ~50ms. - Pattern Shop API lock: admin CSV export B197 (streaming MANDATORY), order updates SSE B189 (Redis pub/sub forward), product image upload streaming B62 (object_store multipart S3).
- Workspace.dependencies preview B62/B189/B197 add:
http-body-util = "0.1"(BodyExt::into_data_stream),futures = "0.3"(StreamExt::next),tokio-util = { version = "0.7", features = ["io"] }(ReaderStream cầu nối AsyncRead ↔ Stream). - B38 conceptual + preview pattern, KHÔNG tạo file thực tế ở Shop API (Workspace State KHÔNG đổi). File implementation + workspace dep sẽ add ở B62 (product image), B189 (SSE order), B197 (CSV export).
Bài Tập Củng Cố
Tự trả lời, đáp án ở cuối:
- Phân biệt buffered (
Bytes) vs streaming (Body). Peak memory mỗi loại bằng gì? Decision rule Shop API khi nào dùng cái nào và tại sao ngưỡng 5MB? - Upload file 1GB với
Bytesextractor → vấn đề gì? Fix bằng pattern nào? Liệt kê 4 bước pattern fix MANDATORY. Body::from_stream(stream)cần Stream type với Item là gì? Header response axum + hyper tự set là gì? Cầu nốitokio::fs::File→ Stream qua type nào?- Backpressure là gì? Tại sao fast producer + slow consumer gây OOM? Bounded mpsc channel giải quyết thế nào? Memory cap predictable bằng công thức nào?
- CSV export 1M order rows. Pattern streaming so với load full vào RAM khác nhau gì về peak memory + TTFB + risk pod K8s? sqlx
.fetch()vs.fetch_all()khác gì?
Đáp án
- Buffered vs streaming khác biệt cốt lõi: (a) Buffered (
Bytes,String,Json<T>,Form<T>,AppMultipart<T>) load FULL body vào memory rồi parse 1 lần; axum nhận chunk từ hyper TCP → tích lũy vàoVec<u8>internal → khi full body materializebytes::Byteshoặc deserialize struct → trả handler. Peak memory = body size — file 1GB nuốt 1GB RAM một mạch. Pros: code đơn giản (parse 1 lần, không loop chunk), error handling clean (fail → 422 ngay), validate format dễ (có toàn body so check schema). Cons: large file OOM single-pod (K8s memory limit 512MB-2GB kill container), TTFB cao (server đợi full body mới process), bandwidth waste cho attack DoS spam request lớn. (b) Streaming (Body) xử lý từng chunk khi đến quabody.into_data_stream()+ loopStreamExt::next().await; chunk size hyper transport layer ~8KB-1MB tùy HTTP/1.1 chunked encoding hay HTTP/2 frame. Peak memory ~1 chunk (8KB-1MB) — file 1GB hay 100MB cùng cap chunk size. Pros: memory bounded regardless body size, TTFB thấp (xử lý song song với download tiếp tục), reject 413 incremental ngay khi vượt size check không waste bandwidth. Cons: code phức tạp (manual loop + state machine), error mid-stream khó recover (đã ghi 500MB vào S3 multipart phảiabort_multipart_uploadcleanup orphan part), validate format khó (chỉ chunk hiện tại không biết toàn thể), backpressure phải explicit handle (bounded channel mpsc bước 6). Decision rule Shop API lock vĩnh viễn: ≤5MB → buffered, >5MB → streaming MANDATORY. Use case buffered: webhook payload Stripe ~10-100KB (B37 raw Bytes verify HMAC), product image admin upload ≤5MB (B36 multipart với magic bytes verify), JSON API endpoint ≤2MB default. Use case streaming: admin CSV export 1M+ order ~200MB (B197), invoice PDF stream từ S3 future, log file download admin, video file thumbnail. Tại sao ngưỡng 5MB: (i) alignDefaultBodyLimit::max(5 * 1024 * 1024)product image route layer lock B36 — consistency cross-endpoint không phải định nghĩa nhiều ngưỡng khác nhau dev confused; (ii) 5MB là sweet spot S3 multipart minimum part size (S3 yêu cầu part >= 5MB trừ part cuối) — file <5MB upload single-shot PUT object đủ, không cần multipart logic phức tạp; (iii) typical K8s memory limit container 512MB - 2GB chứa được 100+ concurrent buffered request 5MB không OOM, vượt 5MB risk; (iv) trade-off learning curve — dev mới chỉ cần học buffered cho 90% endpoint, streaming chỉ cho endpoint critical (CSV export, large file) tránh over-engineering toàn bộ API. - Upload file 1GB với
Bytesextractor — vấn đề OOM kinh điển: axumBytesextractor implementFromRequestconsume body one-shot — pattern: nhận chunk từ hyper TCP → tích lũy vàoVec<u8>internal → khiBody::Datastream EOF materializebytes::ByteswrapArc<Vec<u8>>→ trả handler. Quá trình tích lũy này load FULL 1GB vào RAM một mạch — peak memory = 1GB. Container K8s thường có memory limit 512MB-2GB (cost-efficient), 1GB request lập tức trigger OOM Killer kill pod (kernel panic exit code 137); pod restart 30-60s rolling deploy → mất tất cả request đang xử lý concurrent. Worse: nếuDefaultBodyLimit::max(N)chưa override per route, axum default cap 2MB → request 1GB reject 413 sớm OK; nhưng nếu route đã override lớn hơn (vd 100MB cho upload feature) thì OOM gây cascade failure ảnh hưởng mọi endpoint shared pod. Pitfall xảy ra khi developer copy-paste patternBytestừ webhook B37 (~100KB OK) cho endpoint upload large file mà không kiểm tra size budget. Fix bằng pattern streaming vớiBodyextractor: peak memory ~1 chunk (1MB nếu config chunk size 1MB) regardless file size 1GB hay 10GB. 4 bước pattern MANDATORY lock vĩnh viễn Shop API: (1) Extractbody: Bodyđặt CUỐI arg list theo lock B31 (cùng quy tắcJson/Form/Multipart/Bytesbody extractor B32/B35/B36/B37 vì consume body one-shot — sai vị trí compile error). (2) Convert sang Stream quabody.into_data_stream()— traithttp_body_util::BodyExtworkspace dephttp-body-util = "0.1"; trảimpl Stream<Item = Result<Bytes, axum::Error>>lazy chunk-by-chunk. (3) Loop consume chunk quawhile let Some(chunk_result) = stream.next().await+ incremental size checktotal_bytes += chunk.len() as u64; if total_bytes > MAX_UPLOAD_SIZE { return Err(AppError::BadRequest("file too large")); }reject 413 ngay khi vượt KHÔNG đợi load hết — quan trọng cho DoS protection (attacker spam request 10GB sẽ bị reject sau 1 chunk đầu, không waste bandwidth + memory). (4) Save destination: (a) disk local quatokio::fs::File::create+AsyncWriteExt::write_all(&chunk)— CHỈ dev (lock B36 disk local pod restart mất file); (b) S3 multipart streaming production — buffer mỗi 5MB (S3 minimum part size hard limit AWS) upload quaupload_part(upload_id, part_number, buffer), complete khi xong quacomplete_multipart_upload(parts), abort cleanup orphan khi error mid-stream quaabort_multipart_uploadtránh charge phí storage 7 ngày default lifecycle. Shop API B62 product image upload quaobject_storecrate abstract multipart S3 tự động — lean vềobject_storecho R2 swap (lock B36) thayaws-sdk-s3(dependency lớn ~30 transitive AWS lock-in). Workspace dep preview:http-body-util = "0.1",futures = "0.3",object_store = "0.11"hoặcaws-sdk-s3 = "1"(TBD lock B62). Body::from_stream(stream)requirement Item type: Stream phải cóItem = Result<T, E>vớiT: Into<Bytes>+E: Into<BoxError>(BoxError = Box<dyn std::error::Error + Send + Sync>). Typical:Item = Result<Bytes, std::io::Error>cho file stream,Item = Result<Bytes, sqlx::Error>cho DB stream,Item = Result<Bytes, AppError>cho custom domain error map. Header response axum + hyper tự set: (a)Transfer-Encoding: chunkedresponse header — vì server KHÔNG biết trước Content-Length (Stream lazy unknown size); HTTP/1.1 chunked encoding format<hex_size>\r\n<chunk_bytes>\r\n0\r\n\r\n(size 0 báo EOF). HTTP/2 không cần chunked vì frame format built-in length per frame. (b) Content-Type defaultapplication/octet-streamnếu KHÔNG override — developer phải set explicit(header::CONTENT_TYPE, "text/csv")hoặc"text/plain"tùy use case qua tuple(headers, body)IntoResponse. (c) Content-Length axum KHÔNG set vì unknown size — browser/curl handle chunked transfer encoding incremental tự động không cần Content-Length. Cầu nốitokio::fs::File→ Stream:tokio_util::io::ReaderStream::new(reader)wrap bất kỳ type implementtokio::io::AsyncReadtrait sangStream<Item = Result<Bytes, std::io::Error>>— workspace deptokio-util = { version = "0.7", features = ["io"] }(featureiomandatory cho ReaderStream + StreamReader). Chunk size default 8KB cấu hình quaReaderStream::with_capacity(file, 64 * 1024)balance throughput (larger chunk = fewer syscall read) vs memory (larger chunk = larger peak). Type implementAsyncReadphổ biến:tokio::fs::Filefile local,tokio::net::TcpStreamproxy upstream,aws_sdk_s3::primitives::ByteStreamS3 GetObject response,tokio::io::BufReaderwrap stream khác để giảm syscall,std::io::CursorwrapVec<u8>in-memory dùng cho test. Pattern Shop API future invoice PDF download:let resp = s3.get_object(bucket, key).await?; let body = Body::from_stream(resp.body.into_async_read()); Ok((headers, body))— proxy bytes từ S3 sang client KHÔNG buffer toàn file vào RAM server, peak memory ~chunk size + Time to First Byte ~50ms (S3 GetObject latency).- Backpressure định nghĩa: cơ chế producer (server đọc file/DB) tự động dừng đọc khi consumer (client download) chưa kịp xử lý chunk trước, đảm bảo memory bounded không leak. Pull-based design: consumer chủ động pull data khi sẵn sàng thay vì producer push tuyến tính bất kể consumer state. Tại sao fast producer + slow consumer gây OOM: pitfall kinh điển server đọc file 10GB nhanh từ disk SSD NVMe (~1GB/s) nhưng client download chậm qua mobile network (~1MB/s). Tỷ lệ chênh 1000x → trong 10s server đọc xong toàn 10GB file vào buffer chờ client, client mới consume được 10MB → server hold 10GB RAM chờ → OOM Killer K8s kill pod. Variant worse: attacker mở 100 connection download chậm đồng thời, mỗi connection hold partial file → server allocate 100 × N GB → DoS chỉ với ~10 client. Vấn đề universal mọi system queue-based: log pipeline (producer ghi log nhanh hơn consumer tail), Kafka topic (producer publish nhanh hơn consumer subscribe), TCP socket buffer (sender ghi nhanh hơn receiver read). Bounded mpsc channel giải quyết thế nào:
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, io::Error>>(10)— channel với bounded capacity 10; producer tasktx.send(chunk).awaittự suspend khi channel đã có 10 chunk pending (chưa được consumer lấy ra quarx.recv().await) — tokio scheduler park task tạisend().awaitđợi capacity available, KHÔNG block worker thread (cooperative scheduling). Khi consumer (axum poll Stream qua ReceiverStream wrap rx) lấy 1 chunk ra → capacity freed 1 slot → producer wake up gửi tiếp. Backpressure propagate end-to-end: client TCP slow read → hyper chậm poll response writer → axum chậm poll Stream → ReceiverStream chậm poll rx → channel chậm drain → producer block tại tx.send. Producer task tự nhiên rate-limit theo tốc độ consumer. Client disconnect → axum drop response → hyper drop ReceiverStream → rx drop → tx.send() trảErr(SendError)→ producer taskreturncleanup ngay (giải phóng file handle + memory). Memory cap predictable công thức:memory_peak = capacity × chunk_size. Vd capacity 10 × chunk 1MB = 10MB peak per request regardless file size (1GB, 10GB đều cap 10MB). Áp dụng cho 100 concurrent connection: 100 × 10MB = 1GB total memory budget — fit container 2GB limit OK. So capacity 1000 × 1MB = 1GB per request → 100 connection = 100GB → OOM ngay. Decision lock Shop API capacity 10 default: cân bằng (a) throughput (capacity 1 quá nhỏ producer luôn block dù consumer chỉ chậm 1 chunk lag, latency cao), (b) memory cap (capacity 100+ peak quá lớn, không buffer được attack), (c) typical chunk size 1MB cho large file stream. Tune theo workload: file nhỏ + throughput cao → capacity 50, large file + memory critical → capacity 5. - CSV export 1M order rows — streaming vs buffered: (a) Buffered pattern (anti-pattern OOM risk):
let rows: Vec<Order> = sqlx::query_as::<_, Order>("SELECT * FROM orders").fetch_all(&pool).await?; let csv = build_csv(rows); Ok(csv)— bước 1 load 1M Order struct vào Vec ~500MB RAM (struct ~500 bytes × 1M = 500MB), bước 2 build CSV String 200MB, total peak ~700MB RAM. Container K8s limit 2GB còn handle được nhưng risk concurrent 3 admin export đồng thời = 2.1GB → OOM. TTFB ~30s+ (admin nhìn loading spinner đợi server load full DB + build CSV mới thấy download bắt đầu) — UX kém. (b) Streaming pattern (MANDATORY lock Shop API B197):let row_stream = sqlx::query!("SELECT id, total, status FROM orders").fetch(&state.pool); let body_stream = row_stream.map(|row_result| { let row = row_result?; let csv_row = format!("{},{},{}\n", row.id, row.total, row.status); Ok(Bytes::from(csv_row)) }); Body::from_stream(body_stream)— server gửi response header + first chunk ngay sau query setup ~50ms, sau đó stream từng row format CSV bytes incremental. Peak memory ~100KB (vài chục row chunk + channel buffer + sqlx cursor state) regardless 1M hay 10M row. TTFB ~50ms — browser hiện download progress bar ngay, admin biết file đang tải xuống. Risk pod K8s: streaming KHÔNG ăn RAM nên KHÔNG OOM dù 100 concurrent export đồng thời (100 × 100KB = 10MB) — vẫn cần monitor pool DB connection (1M row hold 1 connection ~17 phút, setstatement_timeoutPostgres lớn hơn + dùng read replica tránh ảnh hưởng primary pool, monitor pool starvation Prometheus alert khi connection >80% pool size). sqlx.fetch()vs.fetch_all()khác biệt cốt lõi: (a).fetch(&pool)trảimpl Stream<Item = Result<Row, sqlx::Error>>— lazy load row-by-row qua Postgres cursor declared trong transaction; mỗiStreamExt::next().awaitfetch 1 row (hoặc batch nhỏ tùy driver tune) từ DB cursor, drop sau xử lý → peak memory ~1 row. Hold 1 connection từ pool suốt query duration. Phù hợp large result set (1M+ row). Pitfall: nếu query cóORDER BYphức tạp, Postgres vẫn build full result internal trước stream → memory DB-side cao, statement timeout risk; mitigation:ORDER BYtrên indexed column, use cursor cho keyset pagination thayOFFSET. (b).fetch_all(&pool)trảResult<Vec<Row>, sqlx::Error>— eager load full result vào Vec rồi return, peak memory = sum tất cả row. Phù hợp small result set (<10k row, <10MB RAM). Pitfall: vô tình dùng cho large query → OOM. (c).fetch_one(&pool)trảResult<Row, sqlx::Error>single row, error nếu 0 hoặc >1 row. (d).fetch_optional(&pool)trảResult<Option<Row>, sqlx::Error>0 hoặc 1 row — pattern lock G7 Shop API cho read-by-id (B63 implement). Pattern lock vĩnh viễn Shop API: mọi export admin lớn (orders, customers, payments, inventory transactions, audit_logs) MANDATORY.fetch()stream +Body::from_streamKHÔNG.fetch_all()buffer; small list endpoint user-facing pagination 20 row/page dùng.fetch_all()OK. Cratecsv = "1"alternative format chuẩn handle escape comma/quote/newline đúng RFC 4180 — B197 chọn cuối khi implement quyết định manualformat!()đơn giản haycsv::Writerđầy đủ.
Bài Tiếp Theo
Bài 39: Extension Extractor — Request-Scoped Data — chi tiết Extension<T> set qua middleware (X-Request-Id middleware sẽ implement đầy đủ với tower-http::SetRequestIdLayer + custom layer wrap), extract trong handler qua Extension(req_id): Extension<RequestId>; so sánh với State<T> (Extension dynamic per-request lifetime, State persistent global Arc cloned cross-handler); use case: request_id propagation, trace context OpenTelemetry, current_user injection sau auth middleware, A/B test variant context.
