Danh sách bài viết

Bài 49: NDJSON + Streaming JSON Parse

Bài 49 của series Rust RESTful API — NDJSON (Newline-Delimited JSON) là format mỗi dòng = 1 JSON object độc lập, khác JSON Array ở 4 điểm: parse line-by-line không cần buffer toàn file vào RAM, append-friendly thêm dòng cuối không cần đọc đầu, error resilient 1 dòng lỗi không kill toàn batch, streaming serialize/deserialize cả server và client xử lý từng row chunk-by-chunk; spec de-facto http://ndjson.org/ không có RFC chính thức, MIME type chọn application/x-ndjson (Shop API lock B49, alternative application/jsonl không dùng); use case industry rộng — Elasticsearch bulk API gửi mỗi action 1 line, BigQuery + Snowflake export dataset 100M row, LogStash + Vector log shipping streaming, ML dataset mỗi line 1 sample training, OpenAI fine-tuning data format chuẩn; implement export endpoint GET /api/v1/products/export.ndjson Shop API stream NDJSON từ DB qua tokio::sync::mpsc channel size 32 (backpressure tự nhiên client chậm → server pause fetch) + StreamBody::from_stream (lock B38 streaming continued) + tokio::spawn task fetch + serialize từng product newline-delimited không load toàn dataset vào RAM, headers Content-Type: application/x-ndjson + Cache-Control: no-store + X-Content-Type-Options: nosniff bảo vệ MIME sniff; implement import endpoint POST /api/v1/products/import.ndjson parse NDJSON line-by-line qua tokio::io::BufReader::new(StreamReader::new(body)).lines() + tokio_util::io::StreamReader wrap body stream thành AsyncBufRead, gọi lines.next_line().await? async iterator KHÔNG load all to RAM, parse mỗi line qua serde_path_to_error::deserialize(&mut serde_json::Deserializer::from_str(line)) (lock B48 mandatory continued) cho path detail nested deep per-line, resilient batch 1 line lỗi KHÔNG kill cả batch (tiếp tục dòng tiếp theo), cap error report 100 lines tránh response payload lớn (errors_truncated: true flag); ImportReport envelope 4 field structured success: u64 + failed: u64 + errors: Vec<LineError> + errors_truncated: bool với LineError 2 field line: u64 (1-based số dòng) + message: String (error display) — pattern envelope lock cho mọi bulk operation Shop API tương lai; DefaultBodyLimit override 10MB cho route .ndjson import qua .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) per-route (lock B47 continued, base 2MB AppJson lock B32 không đủ bulk); URL suffix .ndjson rõ format trả về — pattern path lock cho mọi bulk endpoint Shop API; so sánh 3 pattern transfer: Json<Vec<T>> array 100MB RAM + all-or-nothing error vs NDJSON manual stream <1MB RAM + per-line resilient vs SSE <1MB RAM + one-way server-sent only; decision matrix endpoint Shop API lock B49 vĩnh viễn: Read API paginated list dùng Json<ProductListResponse> (lock B47), bulk export full dataset dùng NDJSON (B49), bulk import dùng NDJSON (B49), live event admin dashboard dùng SSE (lock B38); foundation cho B50 (HTTP compression — tower-http::CompressionLayer gzip/brotli/deflate encode NDJSON response giảm bandwidth 5-10 lần với log + dataset structure repeat, pitfall Content-Length mismatch sau encode chain dynamic length).

14/06/2026
13 phút đọc
0 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu NDJSON (Newline-Delimited JSON) là gì, khác JSON Array ở 4 điểm chính.
  • Biết khi nào chọn NDJSON: bulk export, log streaming, ETL pipeline, ML dataset, OpenAI fine-tuning.
  • Implement products export endpoint trả NDJSON stream từ DB qua mpsc channel + StreamBody.
  • Implement products import endpoint parse NDJSON line-by-line qua AsyncBufRead.lines().
  • Hiểu MIME type application/x-ndjson và alternative application/jsonl.
  • Pattern error reporting per-line dạng [{"line": 5, "message": "..."}] với cap 100 lines.
2

NDJSON Là Gì

NDJSON (Newline-Delimited JSON) là format mỗi dòng = 1 JSON object độc lập, các dòng tách nhau bằng ký tự \n. Không có dấu phẩy giữa dòng, không có dấu ngoặc vuông bao ngoài như JSON Array.

{"id":1,"name":"iPhone 15","price":"25000000"}
{"id":2,"name":"Samsung S24","price":"20000000"}
{"id":3,"name":"Pixel 8","price":"18000000"}

So với JSON Array [{}, {}, {}], NDJSON có 4 ưu điểm:

  • Parse line-by-line không cần buffer toàn file vào RAM — đọc 1 dòng, parse, xử lý, sang dòng tiếp; file 10GB chỉ cần vài KB RAM tại 1 thời điểm.
  • Append-friendly thêm dòng cuối không cần đọc đầu — log streaming write O(1); JSON Array phải parse full file để insert.
  • Error resilient 1 dòng lỗi không kill toàn batch — parse fail line 5 thì line 6, 7, 8 vẫn xử lý được; JSON Array sai 1 ký tự là cả file invalid.
  • Streaming serialize/deserialize cả server và client xử lý từng row chunk-by-chunk — pipe sang jq, Spark, BigQuery khả thi.

Spec NDJSON đặt ở http://ndjson.org/de-facto standard, không có RFC chính thức (khác JSON RFC 8259). Format gần như tương đương JSON Lines (jsonlines.org), chỉ khác về MIME type:

  • application/x-ndjson — MIME de-facto Elasticsearch + BigQuery dùng, Shop API lock chọn.
  • application/jsonl — alternative JSON Lines, KHÔNG dùng để giữ consistency.

Use case industry phổ biến:

  • Elasticsearch bulk API — gửi POST _bulk mỗi 2 line là 1 action (1 line metadata + 1 line document) qua NDJSON.
  • BigQuery / Snowflake export — export dataset 100M row sang NDJSON file lưu S3/GCS, ETL re-import.
  • LogStash / Vector log shipping — agent đọc log file rotate theo dòng, ship NDJSON sang Elasticsearch/Loki real-time.
  • ML dataset — mỗi line 1 sample training, parse stream tránh OOM với dataset GB.
  • OpenAI fine-tuning — input training data format JSONL chuẩn cho POST /v1/files.
3

Implement Export Endpoint — Stream NDJSON Từ DB

Pattern Shop API: dùng tokio::sync::mpsc channel + axum::body::Body::from_stream (lock B38 streaming continued). tokio::spawn 1 task fetch DB + serialize từng product, gửi qua channel; Body::from_stream consume channel chuyển thành response stream chunk-by-chunk.

// File: crates/shop-api/src/routes/products.rs (extend B49)
use axum::{
    body::Body,
    extract::State,
    http::{header, StatusCode},
    response::Response,
};
use futures::stream::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use crate::state::AppState;
use shop_common::dto::ProductResponseDto;
use shop_common::error::AppError;

pub async fn export_products_ndjson(
    State(state): State<AppState>,
) -> Result<Response, AppError> {
    // Channel size 32 — backpressure tự nhiên khi client chậm
    let (tx, rx) = mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(32);

    let service = state.product_service.clone();

    tokio::spawn(async move {
        let mut stream = service.fetch_all_stream(); // sqlx::Stream — preview G6

        while let Some(product_result) = stream.next().await {
            match product_result {
                Ok(product) => {
                    let dto = ProductResponseDto::from(product);
                    let mut line = serde_json::to_vec(&dto)
                        .unwrap_or_else(|_| b"{}".to_vec());
                    line.push(b'\n');

                    // Client disconnect → tx.send err → break loop
                    if tx.send(Ok(line.into())).await.is_err() {
                        tracing::debug!("client disconnect, stop export");
                        break;
                    }
                }
                Err(e) => {
                    tracing::error!(?e, "fetch product failed");
                    break;
                }
            }
        }
    });

    let stream = ReceiverStream::new(rx);
    let body = Body::from_stream(stream);

    Ok(Response::builder()
        .status(StatusCode::OK)
        .header(header::CONTENT_TYPE, "application/x-ndjson")
        .header(header::CACHE_CONTROL, "no-store")
        .header("X-Content-Type-Options", "nosniff")
        .body(body)
        .map_err(|_| AppError::Internal(
            anyhow::anyhow!("response build failed")
        ))?)
}

4 điểm cần chú ý:

  • Channel size 32 — backpressure tự nhiên. Client chậm thì tx.send().await block ở task spawn, DB fetch pause; tránh fetch ào ạt 1 triệu row vào RAM trong khi client mới đọc 100 line.
  • Newline append line.push(b'\n') — MANDATORY giữa mỗi JSON object; thiếu newline thì client parse fail toàn bộ.
  • Header Cache-Control: no-store — dataset export thường dynamic, cache proxy sai logic.
  • X-Content-Type-Options: nosniff — chặn browser sniff MIME tự đoán content nhạy cảm.

So sánh memory tại 1 thời điểm:

Pattern              | Memory peak
Json<Vec<Product>>    | 100MB-1GB (load all)
NDJSON stream (B49)  | ~vài KB (32 line buffer channel)

Wire route ở step 8: GET /api/v1/products/export.ndjson. URL suffix .ndjson rõ format trả về — client parse jq hoặc tự built reader.

4

Test Export Endpoint

Chạy server và test bằng curl:

cargo run -p shop-api
# Output: shop-api listening addr=0.0.0.0:3000

Curl basic xem stream output:

curl http://localhost:3000/api/v1/products/export.ndjson

# {"id":1,"name":"iPhone 15","price":"25000000.00",...}
# {"id":2,"name":"Samsung S24","price":"20000000.00",...}
# {"id":3,"name":"Pixel 8","price":"18000000.00",...}

Pipe sang jq để xử lý từng line — minh chứng client xử lý streaming không cần đợi response complete:

curl http://localhost:3000/api/v1/products/export.ndjson | jq -c '.name'

# "iPhone 15"
# "Samsung S24"
# "Pixel 8"

Save sang file rồi import lại (round-trip test):

curl http://localhost:3000/api/v1/products/export.ndjson > products.ndjson
wc -l products.ndjson
# 1000 products.ndjson  (tổng số sản phẩm DB)

head -3 products.ndjson | jq '.name'
# "iPhone 15"
# "Samsung S24"
# "Pixel 8"

Verify response header chứa Content-Type: application/x-ndjson:

curl -I http://localhost:3000/api/v1/products/export.ndjson

# HTTP/1.1 200 OK
# content-type: application/x-ndjson
# cache-control: no-store
# x-content-type-options: nosniff
# transfer-encoding: chunked

Header Transfer-Encoding: chunked tự động set bởi axum/hyper vì response stream không biết trước Content-Length — đúng pattern HTTP/1.1 streaming.

5

Implement Import Endpoint — Parse Line-By-Line

Import endpoint nhận body NDJSON, parse line-by-line, return ImportReport tổng kết. Pattern dùng tokio_util::io::StreamReader wrap body stream thành AsyncBufRead, gọi BufReader::new(...).lines() async iterator KHÔNG load all to RAM.

// File: crates/shop-api/src/routes/products.rs (extend B49)
use axum::{
    extract::{Request, State},
    http::header,
    Json,
};
use futures::stream::TryStreamExt;
use serde::Serialize;
use shop_common::dto::CreateProductDto;
use shop_common::error::AppError;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_util::io::StreamReader;
use validator::Validate;

pub async fn import_products_ndjson(
    State(state): State<AppState>,
    request: Request,
) -> Result<Json<ImportReport>, AppError> {
    // 1. Validate Content-Type
    let content_type = request
        .headers()
        .get(header::CONTENT_TYPE)
        .and_then(|v| v.to_str().ok())
        .unwrap_or("");

    if !content_type.contains("application/x-ndjson") {
        return Err(AppError::BadRequest(
            "Content-Type phải là application/x-ndjson".into()
        ));
    }

    // 2. Convert body stream → AsyncBufRead
    let body_stream = request
        .into_body()
        .into_data_stream()
        .map_err(std::io::Error::other);
    let stream_reader = StreamReader::new(body_stream);
    let mut lines = BufReader::new(stream_reader).lines();

    // 3. Process từng line
    let mut report = ImportReport::default();
    let mut line_number: u64 = 0;

    while let Some(line) = lines.next_line().await
        .map_err(|e| AppError::BadRequest(format!("đọc body fail: {e}")))?
    {
        line_number += 1;

        if line.trim().is_empty() {
            continue;
        }

        match parse_and_import_line(&state, &line).await {
            Ok(_) => report.success += 1,
            Err(e) => {
                report.failed += 1;

                // Cap error report 100 lines tránh response lớn
                if report.errors.len() < 100 {
                    report.errors.push(LineError {
                        line: line_number,
                        message: e.to_string(),
                    });
                } else {
                    report.errors_truncated = true;
                }
            }
        }
    }

    Ok(Json(report))
}

async fn parse_and_import_line(
    state: &AppState,
    line: &str,
) -> Result<(), AppError> {
    // serde_path_to_error wrap mỗi line — lock B48 continued
    let de = &mut serde_json::Deserializer::from_str(line);
    let dto: CreateProductDto = serde_path_to_error::deserialize(de)
        .map_err(|e| AppError::JsonDataMismatch {
            path: e.path().to_string(),
            message: e.inner().to_string(),
        })?;

    dto.validate()?;
    state.product_service.create(dto).await?;
    Ok(())
}

#[derive(Debug, Default, Serialize)]
pub struct ImportReport {
    pub success: u64,
    pub failed: u64,
    pub errors: Vec<LineError>,
    #[serde(skip_serializing_if = "std::ops::Not::not")]
    pub errors_truncated: bool,
}

#[derive(Debug, Serialize)]
pub struct LineError {
    pub line: u64,
    pub message: String,
}

4 quyết định pattern lock B49:

  • Resilient batch: 1 line fail thì report.failed += 1 + continue line tiếp theo. KHÔNG return Err(...) giữa chừng. Đây là semantic bulk operation chuẩn — báo cáo tổng kết, client tự quyết định retry line nào.
  • Cap error 100 lines: nếu fail nhiều quá thì errors_truncated: true, không tiếp tục push vào errors. Tránh response 100MB error message với batch toàn lỗi.
  • serde_path_to_error per-line: mỗi line wrap deserializer riêng (lock B48 continued); path detail items[0].metadata.color nested deep cho client UI highlight đúng field.
  • Skip empty line: if line.trim().is_empty() { continue; } tolerate trailing newline cuối file hoặc blank line giữa batch — chuẩn behavior NDJSON parser.

Wire route ở step 8: POST /api/v1/products/import.ndjson với DefaultBodyLimit::max(10 * 1024 * 1024) override 10MB (lock B47 continued, base 2MB AppJson không đủ bulk).

6

Test Import Endpoint + Error Reporting

Tạo file products.ndjson với 4 line — 3 valid + 1 invalid (price sai type, slug uppercase):

{"name":"Product 1","slug":"product-1","price":"100000.00","stock":10}
{"name":"Product 2","slug":"product-2","price":"200000.00","stock":20}
{"name":"BadProduct","slug":"INVALID","price":"abc","stock":5}
{"name":"Product 4","slug":"product-4","price":"400000.00","stock":40}

Import qua curl với header MIME đúng và body file:

curl -X POST http://localhost:3000/api/v1/products/import.ndjson \
  -H 'Content-Type: application/x-ndjson' \
  --data-binary @products.ndjson

Response JSON envelope ImportReport:

{
  "success": 3,
  "failed": 1,
  "errors": [
    {
      "line": 3,
      "message": "JSON data mismatch at price: invalid type: string \"abc\", expected u64"
    }
  ]
}

3 line valid được insert thành công vào DB, line 3 fail vì price không phải u64, line 4 vẫn tiếp tục insert bình thường. Pattern 1 line lỗi KHÔNG kill batch minh chứng end-to-end.

Test case batch lỗi nhiều (cap 100 lines):

# Tạo file 200 line đều invalid
seq 1 200 | awk '{print "{\"name\":\"x\",\"slug\":\"X\"}"}' > bad.ndjson

curl -X POST http://localhost:3000/api/v1/products/import.ndjson \
  -H 'Content-Type: application/x-ndjson' \
  --data-binary @bad.ndjson

Response cap 100 error và set flag errors_truncated:

{
  "success": 0,
  "failed": 200,
  "errors": [
    {"line": 1, "message": "JSON data mismatch at slug: ..."},
    {"line": 2, "message": "..."},
    "... (98 more entries)"
  ],
  "errors_truncated": true
}

Field errors_truncated: true tín hiệu cho client biết có nhiều lỗi nữa không show — admin cần xem server log full chi tiết qua tracing correlation request_id (B39).

7

tower-http NDJSON Vs Manual Stream

Có 3 cách handle bulk data transfer trong axum, mỗi cách phù hợp use case khác nhau:

Pattern              | Memory  | Error handling     | Stream true
Json<Vec<T>>         | 100MB+  | All-or-nothing    | No (load all)
NDJSON manual stream | <1MB    | Per-line resilient | Yes
SSE (B38)            | <1MB    | Server-sent only   | One-way

Json<Vec<T>> — đơn giản nhất nhưng load toàn array vào RAM, parse fail giữa chừng là toàn bộ batch lỗi. Lock B47 cap 1000 items max qua #[validate(length(max = 1000))]. Phù hợp list paginated trả ít item.

NDJSON manual stream (B49) — full control memory + error handling per-line. Phù hợp bulk export/import dataset lớn. Trade-off: code phức tạp hơn, phải tự handle channel + StreamBody + line parser.

SSE (B38) — text/event-stream với keep-alive ping, push event server → client một chiều. Phù hợp live event admin dashboard (order status change, inventory update real-time). KHÔNG dùng cho bulk export vì semantic khác (event stream vs data dump).

Decision matrix endpoint Shop API lock B49 vĩnh viễn:

  • Read API paginated (GET list) → Json<ProductListResponse> envelope B47 (5 field items/total/page/per_page/total_pages).
  • Bulk export (GET full dataset) → NDJSON stream B49 (mpsc + StreamBody).
  • Bulk import (POST batch) → NDJSON parse line-by-line B49 (AsyncBufRead + ImportReport).
  • Live event (admin dashboard real-time) → SSE B38 (text/event-stream + keep-alive).

Pattern lock này áp dụng cho mọi resource Shop API tương lai — orders, categories, users, reviews, notifications. Mỗi resource có 4 endpoint corresponding khi cần.

8

Wire Routes Vào routes/products.rs

Wire 2 route mới vào routes() function, áp DefaultBodyLimit::max(10 MB) per-route cho import endpoint:

// File: crates/shop-api/src/routes/products.rs (extend B49)
use axum::{
    extract::DefaultBodyLimit,
    routing::{get, post},
    Router,
};

use crate::state::AppState;

pub fn routes() -> Router<AppState> {
    Router::new()
        .route("/products", get(list_products).post(create_product))
        .route("/products/popular", get(list_popular))
        .route("/products/{slug}", get(get_product)
            .put(replace_product)
            .patch(update_product)
            .delete(delete_product))
        .route("/products/{slug}/related/{related_slug}",
            get(get_related_product))
        // B49 — NDJSON export/import
        .route("/products/export.ndjson", get(export_products_ndjson))
        .route("/products/import.ndjson",
            post(import_products_ndjson)
                // 10MB override cho bulk import (lock B47 continued)
                .layer(DefaultBodyLimit::max(10 * 1024 * 1024))
        )
}

2 quyết định pattern lock B49:

  • URL suffix .ndjson rõ format trả về — client không phải đọc header Content-Type mới biết, dev đọc route file biết ngay endpoint trả NDJSON. Pattern path lock cho mọi bulk endpoint Shop API tương lai (/orders/export.ndjson, /categories/import.ndjson).
  • .layer(DefaultBodyLimit::max(N)) per-route (lock B47 continued) — bulk import 10MB override base 2MB AppJson. Tách per-route an toàn vì các endpoint khác (create_product, update_product) vẫn 2MB chống DoS body size.

Note ProductService cần thêm method fetch_all_stream() trả impl Stream<Item = Result<Product, Error>> — preview G6 dùng sqlx::query_as!(...).fetch(&pool) trả stream native. Hiện tại placeholder mock — implement chính thức ở G6 B58 sqlx stream pattern.

9

Tổng Kết

  • NDJSON: format mỗi dòng = 1 JSON object — streaming, append-friendly, error-resilient, parse line-by-line không buffer toàn file.
  • Use case: bulk export DB, log streaming, ETL pipeline, ML dataset, OpenAI fine-tuning training data.
  • MIME type lock: application/x-ndjson (Shop API chọn, alternative application/jsonl không dùng).
  • URL suffix .ndjson rõ format trả về — pattern path lock cho mọi bulk endpoint.
  • Export pattern: tokio::sync::mpsc channel size 32 + Body::from_stream (B38 lock continued) + DB stream → newline serialize → response stream + headers Cache-Control: no-store + X-Content-Type-Options: nosniff.
  • Import pattern: tokio_util::io::StreamReader wrap body → BufReader::new(...).lines() async iterator → parse từng line không load all to RAM.
  • serde_path_to_error MANDATORY per-line (B48 lock continued) — path detail nested deep cho mọi line parse fail.
  • Resilient batch: 1 line lỗi KHÔNG kill batch, tiếp tục dòng tiếp theo; cap error report 100 lines (errors_truncated: true flag) tránh response lớn.
  • ImportReport envelope lock B49: 4 field success + failed + errors[{line, message}] + errors_truncated — pattern envelope cho mọi bulk operation Shop API.
  • DefaultBodyLimit::max(10 MB) override per-route import endpoint (lock B47 continued, base 2MB AppJson không đủ bulk).
  • Decision matrix endpoint lock B49 vĩnh viễn: paginated list Json<ListResponse> (B47), bulk export NDJSON (B49), bulk import NDJSON (B49), live event SSE (B38).
  • File path lock: extend crates/shop-api/src/routes/products.rs thêm 2 handler + 2 struct ImportReport + LineError + 2 route .ndjson.
  • Foundation cho B50 (HTTP compression — tower-http::CompressionLayer gzip/brotli encode NDJSON response giảm bandwidth 5-10 lần với log + dataset structure repeat).
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. NDJSON khác JSON Array sao? Liệt kê 4 ưu điểm chính và cho ví dụ use case industry.
  2. Pattern export endpoint dùng mpsc channel size 32, KHÔNG dùng 1 hay 1000 — lý do chính là gì?
  3. Import line-by-line resilient: tại sao 1 line lỗi KHÔNG kill batch? Khi nào nên kill batch sớm?
  4. BufReader::lines() vs BufReader::split('\n') khác nhau gì? Khi nào chọn cái nào?
  5. Decision matrix endpoint Shop API: khi nào dùng JSON list, NDJSON export/import, SSE? Cho ví dụ cụ thể mỗi loại.
Đáp án
  1. NDJSON khác JSON Array: NDJSON = mỗi dòng 1 JSON object độc lập tách nhau bằng \n, không có dấu phẩy giữa dòng, không có dấu ngoặc vuông bao ngoài. JSON Array = [{}, {}, {}] 1 cấu trúc lớn duy nhất. 4 ưu điểm NDJSON: (a) parse line-by-line không cần buffer toàn file vào RAM — đọc 1 dòng parse 1 dòng, file 100GB chỉ cần vài KB RAM tại 1 thời điểm; JSON Array phải parse full document để biết structure; (b) append-friendly thêm dòng cuối O(1) không cần đọc đầu — log streaming pattern chuẩn, agent shipper (Vector/Fluentd) append continuous; JSON Array thêm element phải parse + serialize lại toàn file; (c) error resilient 1 dòng lỗi không kill toàn batch — parse fail line 5 thì line 6, 7, 8 vẫn xử lý được, semantic chuẩn bulk operation báo cáo tổng kết; JSON Array sai 1 ký tự syntax (thiếu comma, lệch quote) là cả document invalid, parser fail từ byte đầu tiên; (d) streaming serialize/deserialize cả server và client xử lý từng row chunk-by-chunk — pipe sang jq, Spark, BigQuery, Snowflake khả thi; JSON Array client phải đợi full document mới parse được. Use case industry cụ thể: (i) Elasticsearch bulk API POST _bulk nhận body NDJSON mỗi 2 line là 1 action — 1 line metadata {"index": {"_id": "1"}} + 1 line document {"field": "value"}, batch insert/update/delete trong 1 request HTTP; (ii) BigQuery / Snowflake export dataset 100M row sang file NDJSON lưu S3/GCS — load lại qua LOAD DATA hoặc COPY INTO streaming không OOM; (iii) LogStash + Vector + Fluentd ship log file rotate dòng → Elasticsearch/Loki real-time, mỗi line 1 log event với timestamp + level + message + structured field; (iv) ML dataset training mỗi line 1 sample {"input": "...", "output": "..."} — PyTorch/TensorFlow iter() stream tránh OOM với dataset 50GB; (v) OpenAI fine-tuning format chuẩn JSONL cho POST /v1/files với purpose: "fine-tune" — mỗi line 1 conversation training.
  2. Channel size 32 cho mpsc: con số này là buffer size giữa producer (DB fetch task) và consumer (HTTP response stream). Tại sao không size 1: size 1 sync hoàn toàn — producer phải đợi consumer nhận xong mới fetch row tiếp; round-trip overhead cao (tx.send().await + context switch + rx.recv().await mỗi line) — throughput thấp với DB fast 100K row/s; latency tăng. Tại sao không size 1000: size 1000 = buffer 1000 product DTO trong channel — mỗi DTO 1-5KB → 1-5MB RAM cho channel; nếu client slow (mobile 3G, attacker intentionally slow) thì DB tiếp tục fetch ào ạt fill channel; backpressure mất hiệu lực — producer chạy nhanh hơn consumer, RAM phình to. Worst case 10K concurrent export request × 5MB channel = 50GB RAM exhausted. Size 32 cân bằng: đủ buffer cho throughput cao (32 line × tx.send async = pipeline parallel producer-consumer, hide latency network 1-10ms với DB fetch 1ms), nhưng đủ nhỏ để backpressure kích hoạt sớm khi client chậm (32 line × 5KB = 160KB RAM/connection acceptable). Pattern lock Shop API B49: channel size 32 cho mọi NDJSON export endpoint Shop API tương lai — orders/categories/reviews export đều cùng size 32. Tunable qua benchmark nếu cần (size 16 cho payload lớn 50KB/row, size 128 cho payload nhỏ 100B/row).
  3. 1 line lỗi KHÔNG kill batch — semantic bulk operation: pattern chuẩn bulk import là "best effort" — process tất cả line có thể, báo cáo tổng kết success/failed. Lý do: (a) UX admin: import file 10K product, line 50 sai format không có nghĩa 9950 line còn lại đều insert được — admin chỉ cần sửa line 50 và import lại line đó, không phải sửa toàn file; (b) idempotent với upsert: pattern lock B49 dùng INSERT ... ON CONFLICT DO UPDATE (preview G6) nên re-run import cùng file an toàn — line 50 đã có thì update, line 51-10K đã insert được skip; (c) error visibility: ImportReport envelope success/failed/errors[{line, message}] cho admin thấy chính xác line nào fail vì sao; (d) industry standard: Elasticsearch _bulk API, BigQuery load job, Stripe webhook batch đều dùng pattern này; (e) resilience: bulk job 30 phút chạy gần xong fail vì 1 line lỗi là disaster — phải redo từ đầu lãng phí công sức + tài nguyên DB. Khi nào nên kill batch sớm: (i) auth fail / DB connection lost: cause là systemic không phải data, tiếp tục vô nghĩa; (ii) line đầu tiên Content-Type/format sai hoàn toàn: client gửi nhầm CSV thay NDJSON, parse fail liên tục là dấu hiệu, kill sớm tiết kiệm; (iii) fail rate > 50%: heuristic — quá nửa batch fail thì có lẽ schema mismatch hoặc client bug, return early với suggest "kiểm tra schema"; (iv) cap error 100 lines: B49 đã apply — tiếp tục process nhưng không push error list nữa, errors_truncated: true flag báo client biết. Lock decision Shop API B49: resilient default (chỉ cap error report, không kill batch); admin có metric Prometheus theo fail_rate alert nếu > 50% batch fail liên tục — manual investigate.
  4. BufReader::lines() vs BufReader::split('\n') đều là async iterator wrap AsyncBufRead nhưng output khác nhau. lines() trả Lines stream Item = io::Result<String> — String đã strip terminator (\n hoặc \r\n Windows), parse UTF-8 internal, fail nếu byte sequence không valid UTF-8. Pros: idiomatic cho text-based format (NDJSON, log, CSV header line); strip terminator tự động dev không phải tự manual. Cons: UTF-8 parse overhead nhỏ; fail toàn line nếu byte invalid (vd binary trộn lẫn). split(byte) trả Split stream Item = io::Result<Vec<u8>> — bytes raw đã strip delimiter byte; KHÔNG parse UTF-8, KHÔNG validate. Pros: nhanh hơn nhỏ (~5-10%), hỗ trợ binary content giữa delimiter (vd protobuf record với \x00 delimiter), tolerate invalid UTF-8 (vd log có encoding lỗi vẫn process được line khác). Cons: dev phải tự String::from_utf8 + handle error; semantic không tự nhiên cho text. Khi nào chọn cái nào: (a) NDJSON Shop API B49lines() — payload luôn UTF-8 text JSON, idiomatic + strip terminator tự động + fail-fast nếu encoding sai; (b) log streaming với encoding hỗn hợp (legacy Windows-1252 latin-1, tolerate junk) → split(b'\n') + custom decode per-line; (c) binary protocol Protocol Buffers với length-prefix hoặc \x00 delimiter → split(byte) + parse bytes; (d) CSV import RFC 4180 với newline trong quoted field → KHÔNG dùng cả 2, phải dùng csv-async crate stateful parser tôn trọng quote escape. Lock Shop API B49: BufReader::lines() cho NDJSON import endpoint — payload UTF-8 text JSON spec đảm bảo encoding chuẩn.
  5. Decision matrix endpoint Shop API B49 lock vĩnh viễn. (a) Read API paginated dùng Json<ProductListResponse> envelope B47 với 5 field items + total + page + per_page + total_pages — phù hợp khi: dataset trả limited subset (10-100 items / page), client UI table có pagination control, response size predictable < 1MB, latency yêu cầu < 100ms. Ví dụ: GET /api/v1/products?page=1&per_page=20 hiển thị table 20 product trên UI admin, user click "Next" gọi page 2; GET /api/v1/orders?page=1&per_page=10&status=pending dashboard user xem order pending. (b) Bulk export dùng NDJSON stream B49 — phù hợp khi: dataset full không pagination (10K-100M row), admin/data team cần dump để analyze offline, response size 100MB-100GB, latency 30s-30 phút acceptable. Ví dụ: GET /api/v1/products/export.ndjson admin export toàn bộ catalog để analyze BI; GET /api/v1/orders/export.ndjson?date_from=2026-01-01 tài chính export order Q1 lên BigQuery; GET /api/v1/audit_logs/export.ndjson?date=2026-06-13 compliance team xuất log daily. (c) Bulk import dùng NDJSON parse line-by-line B49 + ImportReport — phù hợp khi: admin upload batch update/insert, file size 1-10MB (cap DefaultBodyLimit), 1K-10K records, per-line error visibility cần. Ví dụ: POST /api/v1/products/import.ndjson admin import catalog mới từ nhà cung cấp; POST /api/v1/categories/import.ndjson migrate category tree từ system cũ; POST /api/v1/users/import.ndjson bulk register staff account. (d) Live event dùng SSE B38 — phù hợp khi: server push event real-time, one-way server → client, latency < 1s, low bandwidth (1-100 event/s), client browser support tự native EventSource. Ví dụ: GET /api/v1/admin/orders/stream dashboard admin xem order mới push real-time khi customer checkout; GET /api/v1/admin/inventory/stream alert khi stock dưới threshold; GET /api/v1/me/notifications/stream user nhận notification in-app live (alternative WebSocket B250+ cho bidirectional). Pattern lock cho mọi resource Shop API tương lai — Orders/Categories/Users/Reviews/Notifications mỗi resource có 4 endpoint corresponding khi cần (list paginated + export + import + stream live).
11

Bài Tiếp Theo

— bài CUỐI Group 5: tower-http::CompressionLayer + DecompressionLayer apply global Shop API; chọn algorithm theo Accept-Encoding negotiation; benchmark gzip vs brotli ratio + CPU cost; pitfall Content-Length mismatch với stream NDJSON; áp compression cho export endpoint giảm bandwidth 5-10 lần.