Danh sách bài viết

Bài 64: Products ↔ Categories M:N Relation

Bài 64 của series Rust RESTful API — bài CODE thực tế nối tiếp B63 (categories tree adjacency + materialized path + bridge product_categories M:N composite PK): implement endpoint assign categories cho product qua bridge table với PUT replace full set semantic (KHÔNG PATCH delta cho M:N — exception B61 PATCH lock vì M:N bản chất "set membership" idempotent); assign_categories transaction 6 step trong shop-db (verify product exists + not soft-deleted → verify mọi category ids tồn tại + not soft-deleted → get old set cho audit diff → DELETE existing assignments → INSERT batch qua UNNEST array Postgres bulk insert → audit::log_action ghi vào audit_logs với old vs new JSON); populate category_ids: Vec<CategoryId> cho ProductResponseDto (preview B47 đã define field) qua LEFT JOIN + ARRAY_AGG FILTER + COALESCE + GROUP BY pattern lock tránh N+1 anti-pattern; ProductRowWithCategories struct 12 field thêm category_ids: Vec<i64> aggregate từ JOIN; refactor list_products dùng list_products_with_categories JOIN aggregate; endpoint PUT /api/v1/products/{slug}/categories replace full set 204 No Content + AssignCategoriesDto validate max 20 categories/product (B63 lock continued); endpoint GET /api/v1/categories/{slug}/products với materialized path subtree filter (path = $1 OR path LIKE $1 || '.%') lấy products thuộc category root + tất cả descendants subtree (B63 path lock continued), 2 JOIN pattern INNER JOIN filter products thuộc subtree + LEFT JOIN aggregate toàn bộ categories của product; query optimization với EXPLAIN ANALYZE + CTE pagination pattern tránh OFFSET scan inflation khi pagination cùng JOIN aggregate; force not-null override syntax as "category_ids!: Vec<i64>" sqlx (B53 lock continued); file path lock products.rs + categories.rs + routes/products.rs + routes/categories.rs.

15/06/2026
13 phút đọc
2 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Implement assign_categories endpoint qua bridge table product_categories với transaction 6 step và audit log.
  • Hiểu pattern PUT replace full set cho M:N relation thay vì PATCH delta — ngoại lệ của lock B61 PATCH-only.
  • Implement GET /categories/{slug}/products với subtree filter (category root + tất cả descendants) dùng materialized path B63.
  • Optimize JOIN query với index hint và pattern ARRAY_AGG + FILTER + COALESCE + GROUP BY.
  • Populate field category_ids cho ProductResponseDto qua aggregate trong cùng 1 query (tránh N+1).
  • Endpoint GET /api/v1/categories/{slug}/products trả danh sách product thuộc subtree với 2 JOIN pattern.
  • Preview cycle prevention validation (B65 deep) khi update parent.
2

Vấn Đề M:N Relation Update Semantic

Quan hệ M:N giữa productscategories: 1 product có N category (iPhone thuộc Electronics + Smartphones + Apple), 1 category có N product (Smartphones chứa iPhone + Samsung + Pixel). Bridge table product_categories đã tạo ở B63 với composite PK (product_id, category_id) guarantee 1 cặp không xuất hiện 2 lần (idempotent assign).

Khi admin muốn update danh sách category của 1 product, REST có 3 cách thiết kế endpoint phổ biến:

  • PATCH delta — client gửi delta dạng {"add": [1, 5], "remove": [3]}. Tường minh hành động cụ thể, audit log ghi nhận từng thao tác chi tiết. Bất lợi: client phải biết state hiện tại để tính delta, không idempotent (gửi lần 2 cùng body có thể fail nếu state đã đổi giữa 2 request).
  • PUT replace full set — client gửi {"category_ids": [1, 5, 12]} nghĩa là "product này có đúng 3 category này, xóa hết phần thiếu, giữ phần khớp". Đơn giản, idempotent, không cần biết state hiện tại.
  • POST individual — mỗi cặp 1 endpoint riêng POST /products/{slug}/categories/{cat_id} để thêm + DELETE để xóa. Mất nhiều round-trip nếu update 5 category cùng lúc, audit log fragment.

Lock decision Shop API: PUT replace full set cho mọi M:N relation update. Lý do cụ thể:

  • M:N bản chất là "set membership" — câu hỏi "product này thuộc category nào" trả về 1 set hữu hạn. Replace full set khớp semantic tự nhiên hơn delta.
  • Idempotent: gửi cùng body N lần kết quả bằng nhau. Retry an toàn dưới mọi network failure.
  • Đây là ngoại lệ của lock B61 (PATCH-only cho user-facing update) — M:N relation update khác semantic so với partial update field. Lock áp dụng cho mọi M:N tương lai (product tags, user roles, order items thay thế).
  • Endpoint path lock: PUT /api/v1/products/{slug}/categories — KHÔNG dùng PATCH, KHÔNG dùng POST individual.

Bảng so sánh nhanh:

Pattern       | Idempotent | Round-trip | Client knows state | Audit detail
PATCH delta   | No         | 1          | Yes (delta calc)   | Per-action
PUT replace   | Yes        | 1          | No                 | Old vs new diff
POST one-by-1 | Yes        | N          | Partial            | Per-pair

Shop API chọn PUT replace full set với audit log dạng diff {"old": [1, 3], "new": [1, 5, 12]} — pattern lock vĩnh viễn.

3

assign_categories Function Trong shop-db

Mở rộng crates/shop-db/src/products.rs thêm function assign_categories service-layer transaction-scoped lock pattern B62 (begin → mutation → audit → commit). Function nhận product_id: i64 đã resolve từ slug ở handler, category_ids: &[i64] slice (rỗng OK = clear toàn bộ), 2 context optional actor_user_id + request_id cho audit:

// File: crates/shop-db/src/products.rs

use crate::audit;
use sqlx::PgPool;

pub async fn assign_categories(
    pool: &PgPool,
    product_id: i64,
    category_ids: &[i64],
    actor_user_id: Option<i64>,
    request_id: Option<&str>,
) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;

    // Step 1: Verify product exists + not soft deleted
    let _product = sqlx::query!(
        "SELECT id FROM products WHERE id = $1 AND deleted_at IS NULL",
        product_id
    )
    .fetch_optional(&mut *tx)
    .await?
    .ok_or(sqlx::Error::RowNotFound)?;

    // Step 2: Verify all categories exist + not soft deleted
    if !category_ids.is_empty() {
        let count: (i64,) = sqlx::query_as(
            "SELECT COUNT(*) FROM categories \
             WHERE id = ANY($1) AND deleted_at IS NULL"
        )
        .bind(category_ids)
        .fetch_one(&mut *tx)
        .await?;

        if count.0 as usize != category_ids.len() {
            tx.rollback().await?;
            return Err(sqlx::Error::RowNotFound);
        }
    }

    // Step 3: Get old assignments cho audit diff
    let old_categories: Vec<i64> = sqlx::query_scalar!(
        "SELECT category_id FROM product_categories WHERE product_id = $1",
        product_id
    )
    .fetch_all(&mut *tx)
    .await?;

    // Step 4: Delete all current assignments
    sqlx::query!(
        "DELETE FROM product_categories WHERE product_id = $1",
        product_id
    )
    .execute(&mut *tx)
    .await?;

    // Step 5: Insert new assignments (UNNEST batch insert)
    if !category_ids.is_empty() {
        sqlx::query!(
            r#"
            INSERT INTO product_categories (product_id, category_id)
            SELECT $1, UNNEST($2::BIGINT[])
            "#,
            product_id,
            category_ids
        )
        .execute(&mut *tx)
        .await?;
    }

    // Step 6: Audit log diff old vs new
    audit::log_action(
        &mut tx,
        "products",
        product_id,
        "assign_categories",
        actor_user_id,
        Some(serde_json::json!({
            "old": old_categories,
            "new": category_ids,
        })),
        request_id,
    )
    .await?;

    tx.commit().await?;
    Ok(())
}

6 step pattern lock cho mọi M:N assign operation Shop API tương lai (user roles, order tags, notification topics):

  1. Verify left entity (product) — fail-fast nếu không tồn tại hoặc đã soft-deleted, tránh ghi đè quan hệ vào row "ma".
  2. Verify right entities (categories) — đếm số row khớp id = ANY($1) và compare với category_ids.len(). Khác = có id không hợp lệ → rollback + return RowNotFound.
  3. Get old set cho audit diff — query SELECT category_id trước khi DELETE.
  4. Delete old — xóa toàn bộ assignment hiện tại (bridge composite PK CASCADE).
  5. Insert new batch qua UNNEST — pattern Postgres UNNEST($2::BIGINT[]) expand array → bulk INSERT trong 1 statement. So với loop N statement riêng lẻ, UNNEST batch nhanh hơn 5-10× với set lớn nhờ giảm round-trip protocol và planning overhead.
  6. Audit log — gọi audit::log_action (B62 lock) với action = "assign_categories" + changes JSONB dạng {"old": [...], "new": [...]}.

Thiếu transaction wrap → race condition: hai admin assign concurrent có thể đan xen DELETE + INSERT của nhau, để lại bridge ở trạng thái lẫn lộn. Transaction READ COMMITTED đảm bảo mỗi assign nhìn thấy snapshot consistent.

4

Populate category_ids Cho ProductResponseDto

ProductResponseDto đã có field category_ids: Vec<CategoryId> từ B47 nhưng From<ProductRow> hiện tại set Vec::new() placeholder (chưa join bridge). Refactor để query JOIN aggregate populate field này.

Tạo struct mới ProductRowWithCategories trong crates/shop-db/src/products.rs kèm 1 field aggregate category_ids: Vec<i64> (12 field tổng):

// File: crates/shop-db/src/products.rs

use chrono::{DateTime, Utc};
use rust_decimal::Decimal;

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ProductRowWithCategories {
    pub id: i64,
    pub name: String,
    pub slug: String,
    pub price: Decimal,
    pub stock: i32,
    pub description: Option<String>,
    pub tags: Vec<String>,
    pub metadata: serde_json::Value,
    pub category_ids: Vec<i64>,       // aggregate từ JOIN
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub deleted_at: Option<DateTime<Utc>>,
}

Function list_products_with_categories query LEFT JOIN bridge + ARRAY_AGG aggregate:

pub async fn list_products_with_categories(
    pool: &PgPool,
    filter: ProductFilter,
) -> Result<(Vec<ProductRowWithCategories>, u64), sqlx::Error> {
    let rows = sqlx::query_as!(
        ProductRowWithCategories,
        r#"
        SELECT
            p.id, p.name, p.slug, p.price, p.stock, p.description,
            p.tags, p.metadata, p.created_at, p.updated_at, p.deleted_at,
            COALESCE(
                ARRAY_AGG(pc.category_id) FILTER (WHERE pc.category_id IS NOT NULL),
                '{}'::BIGINT[]
            ) AS "category_ids!: Vec<i64>"
        FROM products p
        LEFT JOIN product_categories pc ON pc.product_id = p.id
        WHERE p.deleted_at IS NULL
        GROUP BY p.id
        ORDER BY p.created_at DESC
        LIMIT $1 OFFSET $2
        "#,
        filter.per_page as i64,
        ((filter.page - 1) * filter.per_page) as i64
    )
    .fetch_all(pool)
    .await?;

    let total: i64 = sqlx::query_scalar!(
        "SELECT COUNT(*) FROM products WHERE deleted_at IS NULL"
    )
    .fetch_one(pool)
    .await?;

    Ok((rows, total as u64))
}

3 chi tiết SQL cốt lõi:

  • ARRAY_AGG(pc.category_id) FILTER (WHERE pc.category_id IS NOT NULL) — aggregate cột category_id của các row JOIN match. FILTER WHERE NOT NULL loại bỏ row LEFT JOIN không khớp (product chưa có category sẽ có 1 row với pc.category_id = NULL).
  • COALESCE(..., '{}'::BIGINT[]) — fallback empty array Postgres khi không có category nào (ARRAY_AGG trên empty set trả NULL, không phải empty array). Sau coalesce, kết quả luôn là array (có thể rỗng) — không bao giờ NULL.
  • AS "category_ids!: Vec<i64>"force not-null override syntax sqlx (B53 lock continued). Dấu ! báo sqlx biết cột này không bao giờ NULL (compiler infer từ COALESCE), : Vec<i64> ép kiểu Rust thay nullable Option<Vec<i64>>.

GROUP BY p.id bắt buộc khi có aggregate function — gom row JOIN về 1 row mỗi product. Có thể GROUP BY mọi cột p.* trong câu trên nhờ p.id là PRIMARY KEY (Postgres functional dependency tối ưu: 1 cột PK đủ infer mọi cột khác).

5

Refactor From Conversion Và list_products Handler

Update impl From<ProductRowWithCategories> for ProductResponseDto trong crates/shop-api/src/routes/products.rs populate field category_ids thật từ aggregate:

// File: crates/shop-api/src/routes/products.rs

use shop_common::dto::{CategoryId, ProductResponseDto};
use shop_db::products::{self as db, ProductRowWithCategories};

impl From<ProductRowWithCategories> for ProductResponseDto {
    fn from(row: ProductRowWithCategories) -> Self {
        Self {
            id: ProductId(row.id as u64),
            name: row.name,
            slug: row.slug,
            price: Money(row.price),
            stock: row.stock as u32,
            description: row.description,
            category_ids: row
                .category_ids
                .into_iter()
                .map(|id| CategoryId(id as u64))
                .collect(),
            tags: row.tags.into_iter().collect(),
            metadata: serde_json::from_value(row.metadata).unwrap_or_default(),
            created_at: row.created_at,
        }
    }
}

Refactor handler list_products dùng list_products_with_categories thay function cũ:

pub async fn list_products(
    State(state): State<AppState>,
    AppQuery(query): AppQuery<ProductSearchQuery>,
) -> Result<Json<ProductListResponse>, AppError> {
    query.validate()?;
    let filter = db::ProductFilter::from(&query);

    let (rows, total) = db::list_products_with_categories(&state.db, filter).await?;
    let items: Vec<ProductResponseDto> = rows.into_iter().map(Into::into).collect();

    Ok(Json(ProductListResponse {
        items,
        total,
        page: query.page,
        per_page: query.per_page,
        total_pages: ((total as f64 / query.per_page as f64).ceil() as u32).max(1),
    }))
}

Tương tự refactor get_product dùng query JOIN aggregate khi load single product (ETag pattern B62 vẫn áp dụng):

curl http://localhost:3000/api/v1/products/iphone-15 | jq
# {
#   "id": 1,
#   "name": "iPhone 15",
#   "slug": "iphone-15",
#   "category_ids": [1, 5],         # ← populate từ DB
#   "price": "29990000.00",
#   "stock": 50,
#   "tags": ["apple", "smartphone"],
#   ...
# }

Field category_ids giờ phản ánh đúng quan hệ bridge — không còn rỗng placeholder.

6

PUT /products/{slug}/categories Endpoint

Handler assign_categories trong crates/shop-api/src/routes/products.rs wire qua ValidatedJson<AssignCategoriesDto> (DTO đã define B63 với validate(length(max = 20)) cap chống abuse) và Extension<RequestId> (B39 lock cho audit correlation):

// File: crates/shop-api/src/routes/products.rs

use axum::Extension;
use crate::middleware::request_id::RequestId;
use crate::responses::NoContent;
use shop_common::dto::AssignCategoriesDto;

pub async fn assign_categories(
    State(state): State<AppState>,
    AppPath(slug): AppPath<String>,
    Extension(request_id): Extension<RequestId>,
    ValidatedJson(dto): ValidatedJson<AssignCategoriesDto>,
) -> Result<NoContent, AppError> {
    // Resolve product slug → id (exclude soft deleted)
    let product = db::find_by_slug(&state.db, &slug, false)
        .await?
        .ok_or_else(|| AppError::NotFound(format!("product '{}' not found", slug)))?;

    // Convert CategoryId newtype → i64 cho DB layer
    let category_ids: Vec<i64> = dto
        .category_ids
        .iter()
        .map(|c| c.0 as i64)
        .collect();

    // Assign qua transaction 6 step + audit
    db::assign_categories(
        &state.db,
        product.id,
        &category_ids,
        None, // actor_user_id wire B112 sau khi có auth
        Some(request_id.as_str()),
    )
    .await?;

    Ok(NoContent)
}

Wire route trong routes() function của products.rs:

pub fn routes() -> Router<AppState> {
    Router::new()
        .route("/products", get(list_products).post(create_product))
        .route(
            "/products/{slug}",
            get(get_product).patch(update_product).delete(delete_product),
        )
        // B64: M:N assign endpoint với PUT replace full set
        .route(
            "/products/{slug}/categories",
            put(assign_categories),
        )
        .route(
            "/products/{slug}/restore",
            post(restore_product),
        )
        .route(
            "/products/export.ndjson",
            get(export_products_ndjson),
        )
        .route(
            "/products/import.ndjson",
            post(import_products_ndjson)
                .layer(DefaultBodyLimit::max(10 * 1024 * 1024)),
        )
}

Verify end-to-end qua curl:

# Assign 3 categories (replace toàn bộ assignment cũ)
curl -i -X PUT http://localhost:3000/api/v1/products/iphone-15/categories \
  -H 'Content-Type: application/json' \
  -d '{"category_ids": [1, 5, 12]}'
# HTTP/1.1 204 No Content
# x-request-id: 550e8400-e29b-41d4-a716-446655440000

# Verify
curl http://localhost:3000/api/v1/products/iphone-15 | jq '.category_ids'
# [1, 5, 12]

# Clear toàn bộ (empty array — idempotent)
curl -X PUT http://localhost:3000/api/v1/products/iphone-15/categories \
  -H 'Content-Type: application/json' \
  -d '{"category_ids": []}'
# HTTP/1.1 204 No Content

# Verify
curl http://localhost:3000/api/v1/products/iphone-15 | jq '.category_ids'
# []

# Vi phạm max 20 (B63 lock continued)
curl -X PUT http://localhost:3000/api/v1/products/iphone-15/categories \
  -H 'Content-Type: application/json' \
  -d "{\"category_ids\": [$(seq -s, 1 25)]}"
# HTTP/1.1 422 Unprocessable Entity
# { "error": "validation failed", "fields": { "category_ids": ["length"] } }

# Category id không tồn tại
curl -X PUT http://localhost:3000/api/v1/products/iphone-15/categories \
  -H 'Content-Type: application/json' \
  -d '{"category_ids": [99999]}'
# HTTP/1.1 404 Not Found

# Kiểm tra audit log
docker compose exec postgres psql -U shop -d shop_dev -c \
  "SELECT action, changes FROM audit_logs WHERE table_name = 'products' \
   ORDER BY created_at DESC LIMIT 1;"
#       action          |               changes
# ----------------------+--------------------------------------
#  assign_categories    | {"new": [1, 5, 12], "old": []}

Audit log giữ đầy đủ diff old vs new — admin dashboard query "ai assign category cho product nào lúc nào" qua endpoint /audit-logs (G14).

7

GET /categories/{slug}/products — Subtree Filter

Endpoint lookup product theo category cần lấy subtree — request GET /categories/electronics/products phải trả mọi product thuộc Electronics + Smartphones + iPhone (mọi descendant). Materialized path B63 cung cấp pattern subtree query đơn giản: path = $1 OR path LIKE $1 || '.%' lấy chính category root + tất cả descendant.

Thêm function list_products_in_subtree trong crates/shop-db/src/categories.rs:

// File: crates/shop-db/src/categories.rs

use crate::products::ProductRowWithCategories;
use sqlx::PgPool;

pub async fn list_products_in_subtree(
    pool: &PgPool,
    category_slug: &str,
    limit: i64,
    offset: i64,
) -> Result<(Vec<ProductRowWithCategories>, u64), sqlx::Error> {
    // Step 1: Resolve slug → path
    let cat = sqlx::query!(
        "SELECT id, path FROM categories \
         WHERE slug = $1 AND deleted_at IS NULL",
        category_slug
    )
    .fetch_optional(pool)
    .await?
    .ok_or(sqlx::Error::RowNotFound)?;

    // Step 2: Get all descendant category ids (materialized path)
    let subtree_ids: Vec<i64> = sqlx::query_scalar!(
        "SELECT id FROM categories \
         WHERE (path = $1 OR path LIKE $1 || '.%') \
           AND deleted_at IS NULL",
        cat.path
    )
    .fetch_all(pool)
    .await?;

    // Step 3: Fetch products có ít nhất 1 category trong subtree
    let rows = sqlx::query_as!(
        ProductRowWithCategories,
        r#"
        SELECT
            p.id, p.name, p.slug, p.price, p.stock, p.description,
            p.tags, p.metadata, p.created_at, p.updated_at, p.deleted_at,
            COALESCE(
                ARRAY_AGG(pc2.category_id)
                    FILTER (WHERE pc2.category_id IS NOT NULL),
                '{}'::BIGINT[]
            ) AS "category_ids!: Vec<i64>"
        FROM products p
        INNER JOIN product_categories pc ON pc.product_id = p.id
        LEFT JOIN product_categories pc2 ON pc2.product_id = p.id
        WHERE pc.category_id = ANY($1) AND p.deleted_at IS NULL
        GROUP BY p.id
        ORDER BY p.created_at DESC
        LIMIT $2 OFFSET $3
        "#,
        &subtree_ids,
        limit,
        offset
    )
    .fetch_all(pool)
    .await?;

    // Step 4: Count total (no JOIN trên LEFT — chỉ INNER filter)
    let total: i64 = sqlx::query_scalar!(
        "SELECT COUNT(DISTINCT p.id) FROM products p \
         INNER JOIN product_categories pc ON pc.product_id = p.id \
         WHERE pc.category_id = ANY($1) AND p.deleted_at IS NULL",
        &subtree_ids
    )
    .fetch_one(pool)
    .await?;

    Ok((rows, total as u64))
}

2 JOIN pattern trong câu SELECT chính:

  • INNER JOIN product_categories pc — filter chỉ giữ products có ít nhất 1 category trong subtree (qua pc.category_id = ANY($1)). Nếu product không thuộc subtree → loại khỏi result.
  • LEFT JOIN product_categories pc2 — aggregate toàn bộ categories của product (kể cả category ngoài subtree). Mục đích: trả về category_ids đầy đủ cho client. Ví dụ iPhone thuộc cả [Electronics, Apple] — query subtree Electronics trả về iPhone với category_ids: [1, 8] (cả 2, không cắt cụt).

SQL subquery materialized path: path = $1 OR path LIKE $1 || '.%' bao gồm:

-- Ví dụ path Electronics = "1"
-- Subtree includes:
--   path = "1"      (chính Electronics)
--   path LIKE "1.%" (mọi descendant: "1.2", "1.2.3", "1.5", ...)
SELECT id FROM categories
WHERE (path = '1' OR path LIKE '1.%') AND deleted_at IS NULL;
-- Trả: 1 (Electronics), 2 (Smartphones), 3 (iPhone), 5 (Apple), ...

Wire handler trong crates/shop-api/src/routes/categories.rs:

// File: crates/shop-api/src/routes/categories.rs

use crate::extractors::{AppPath, AppQuery};
use shop_common::pagination::Pagination;
use shop_common::dto::{ProductListResponse, ProductResponseDto};
use shop_db::categories as db_cat;

pub async fn list_products_in_category(
    State(state): State<AppState>,
    AppPath(slug): AppPath<String>,
    AppQuery(query): AppQuery<Pagination>,
) -> Result<Json<ProductListResponse>, AppError> {
    query.validate()?;

    let (rows, total) = db_cat::list_products_in_subtree(
        &state.db,
        &slug,
        query.per_page as i64,
        ((query.page - 1) * query.per_page) as i64,
    )
    .await?;

    let items: Vec<ProductResponseDto> = rows.into_iter().map(Into::into).collect();

    Ok(Json(ProductListResponse {
        items,
        total,
        page: query.page,
        per_page: query.per_page,
        total_pages: ((total as f64 / query.per_page as f64).ceil() as u32).max(1),
    }))
}

pub fn routes() -> Router<AppState> {
    Router::new()
        .route("/categories", get(list_categories).post(create_category))
        // B64: products theo category với subtree filter
        .route(
            "/categories/{slug}/products",
            get(list_products_in_category),
        )
}

Verify subtree filter qua scenario cây Electronics → Smartphones → iPhone:

# Setup: assign iPhone-15 vào category iPhone (id 3, leaf)
curl -X PUT http://localhost:3000/api/v1/products/iphone-15/categories \
  -H 'Content-Type: application/json' \
  -d '{"category_ids": [3]}'

# Query products thuộc Electronics (root) — bao gồm subtree iPhone
curl 'http://localhost:3000/api/v1/categories/electronics/products' | jq
# {
#   "items": [
#     { "slug": "iphone-15", "category_ids": [3], ... }
#   ],
#   "total": 1, "page": 1, "per_page": 20, "total_pages": 1
# }

# Query products thuộc Smartphones (mid level) — vẫn match iPhone
curl 'http://localhost:3000/api/v1/categories/smartphones/products' | jq '.total'
# 1

# Query products thuộc iPhone (leaf) — chỉ iPhone direct
curl 'http://localhost:3000/api/v1/categories/iphone/products' | jq '.total'
# 1

# Query products thuộc Laptops (sibling không match)
curl 'http://localhost:3000/api/v1/categories/laptops/products' | jq '.total'
# 0

Subtree filter cho phép user browse từ root catalog xuống leaf — request 1 endpoint duy nhất, server resolve hierarchy.

8

Query Optimization — Index + EXPLAIN + CTE Pagination

Bridge table product_categories đã có 2 index hữu dụng từ B63:

  • PRIMARY KEY (product_id, category_id) composite — cover query forward "categories của product X" qua WHERE product_id = $1.
  • product_categories_category_idx ON (category_id) — cover query reverse "products của category X" qua WHERE category_id = $1.

Verify JOIN dùng index qua EXPLAIN ANALYZE:

EXPLAIN (ANALYZE, BUFFERS)
SELECT p.id, p.name,
       ARRAY_AGG(pc.category_id) AS cats
FROM products p
LEFT JOIN product_categories pc ON pc.product_id = p.id
WHERE p.deleted_at IS NULL
GROUP BY p.id
ORDER BY p.created_at DESC
LIMIT 20;
-- HashAggregate (actual time=2.1..2.4 rows=20)
--   Group Key: p.id
--   -> Nested Loop Left Join (actual time=0.04..1.8 rows=42)
--      -> Index Scan products_active_idx (rows=20)
--      -> Index Scan product_categories_pkey (rows=2 each)
-- Planning Time: 0.4 ms
-- Execution Time: 2.6 ms

Index product_categories_pkey được dùng cho JOIN lookup theo product_id — O(log n) cho mỗi product. Index products_active_idx partial (B62 lock) cho filter deleted_at IS NULL.

N+1 anti-pattern phải tránh tuyệt đối: code Rust nhìn ngây thơ sẽ viết "list 20 products → loop mỗi product gọi SELECT category_id WHERE product_id = ?" → 21 query tổng cộng. Với pool size 20 (B56 lock), 21 query liên tiếp tốn ~50-100ms latency. Pattern ARRAY_AGG + GROUP BY 1 query duy nhất giảm về <5ms — lock MANDATORY cho mọi list endpoint Shop API.

Pagination khi kết hợp JOIN + aggregate có pitfall: LIMIT 20 OFFSET 100 AFTER GROUP BY đúng semantic (20 group sau khi gom), nhưng Postgres planner đôi khi scan rộng hơn — JOIN bridge có thể duplicate row trước GROUP BY. Pattern lock Shop API cho list endpoint với JOIN aggregate: CTE pagination paginate trên table chính trước, JOIN bridge sau:

-- CTE pagination pattern (lock Shop API G7+ deep)
WITH paginated_products AS (
    SELECT id, name, slug, price, stock, description,
           tags, metadata, created_at, updated_at, deleted_at
    FROM products
    WHERE deleted_at IS NULL
    ORDER BY created_at DESC
    LIMIT 20 OFFSET 0
)
SELECT p.*,
       COALESCE(
           ARRAY_AGG(pc.category_id)
               FILTER (WHERE pc.category_id IS NOT NULL),
           '{}'::BIGINT[]
       ) AS category_ids
FROM paginated_products p
LEFT JOIN product_categories pc ON pc.product_id = p.id
GROUP BY p.id, p.name, p.slug, p.price, p.stock, p.description,
         p.tags, p.metadata, p.created_at, p.updated_at, p.deleted_at;

CTE chạy LIMIT trên table chính trước (Postgres dùng products_active_idx + ORDER BY index), JOIN bridge chỉ aggregate cho 20 product cuối cùng. Tổng work scale với 20 thay vì N row tổng. Pattern lock cho mọi list endpoint với aggregate (preview deep G7 optimization deep B96 cursor pagination + G15 query log slow query).

Cycle prevention preview (B65 deep): mọi mutation parent_id trên categories cần check new parent KHÔNG thuộc subtree của current node qua WITH RECURSIVE EXISTS subquery. Lock pattern reuse cho mọi self-referencing FK tương lai (comment thread, BOM, org chart).

Suggested commit khi mọi bước verify pass: B64: products-categories M:N assign + JOIN aggregate + subtree filter.

9

Tổng Kết

  • M:N relation update semantic: PUT replace full set thay vì PATCH delta — ngoại lệ của lock B61 PATCH-only vì M:N bản chất "set membership" idempotent.
  • assign_categories 6 step transaction pattern: verify product → verify categories tồn tại → get old set → DELETE old → INSERT new qua UNNEST → audit log diff.
  • UNNEST batch insert Postgres pattern: array bind → INSERT SELECT $1, UNNEST($2::BIGINT[]) bulk insert nhanh hơn N statement lẻ 5-10×.
  • ARRAY_AGG + FILTER + COALESCE pattern populate aggregate Vec<T> trong row: ARRAY_AGG aggregate cột, FILTER loại NULL từ LEFT JOIN, COALESCE fallback empty array.
  • GROUP BY MANDATORY khi có aggregate function — gom row JOIN về 1 row mỗi entity.
  • as "category_ids!: Vec<i64>" force not-null override syntax sqlx (B53 lock continued) cho aggregate field.
  • PUT /api/v1/products/{slug}/categories endpoint replace full set 204 NoContent + AssignCategoriesDto validate max 20 (B63 lock continued).
  • GET /api/v1/categories/{slug}/products endpoint với materialized path subtree filter path = $1 OR path LIKE $1 || '.%' (B63 lock continued).
  • 2 JOIN pattern: INNER JOIN filter products thuộc subtree + LEFT JOIN aggregate toàn bộ categories của product (kể cả ngoài subtree).
  • N+1 anti-pattern phải tránh — luôn dùng ARRAY_AGG + GROUP BY 1 query thay nested query.
  • CTE pagination + JOIN aggregate pattern lock cho mọi list endpoint với aggregate — paginate table chính trước, JOIN bridge sau (avoid OFFSET scan inflation).
  • File path lock: extend crates/shop-db/src/products.rs (assign_categories + ProductRowWithCategories + list_products_with_categories), extend crates/shop-db/src/categories.rs (list_products_in_subtree), extend crates/shop-api/src/routes/products.rs (assign handler + wire PUT), extend crates/shop-api/src/routes/categories.rs (list_products_in_category + wire GET).
10

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. M:N relation update — 3 cách (PATCH delta, PUT replace, POST individual) trade-off ra sao? Shop API chọn cách nào và tại sao đây là ngoại lệ của lock B61 PATCH-only?
  2. assign_categories 6 step transaction — bước nào tránh được race condition? UNNEST batch insert lợi gì so với loop N statement?
  3. ARRAY_AGG + COALESCE + FILTER pattern — giải quyết vấn đề gì cụ thể? Output ví dụ cho product không có category nào là gì?
  4. N+1 query anti-pattern — pitfall scenario list 20 products kèm category là gì? Solution Postgres aggregate là gì và tại sao nhanh hơn nhiều lần?
  5. CTE pagination với JOIN aggregate — tại sao tốt hơn OFFSET đơn giản trên cùng câu JOIN? Cho ví dụ EXPLAIN minh họa khác biệt.
Đáp án
  1. 3 cách M:N update + Shop API chọn + lý do ngoại lệ B61: (a) PATCH delta client gửi {"add": [1, 5], "remove": [3]} — pros: tường minh hành động, audit log per-action chi tiết, payload nhỏ khi chỉ đổi 1 item; cons: client phải biết state hiện tại để tính delta (round-trip thêm GET trước), KHÔNG idempotent (gửi lần 2 với remove: [3] mà 3 không còn → fail hoặc no-op gây confusion), conflict khi 2 client gửi delta đan xen. (b) PUT replace full set client gửi {"category_ids": [1, 5, 12]} nghĩa "đúng 3 cái này, xóa phần thiếu, giữ phần khớp" — pros: idempotent (gửi N lần kết quả như 1 lần), client không cần biết state trước, retry an toàn dưới network failure, semantic "set membership" tự nhiên với M:N relation; cons: payload lớn khi set lớn (chấp nhận với cap 20 lock B63), audit log dạng diff old vs new (vẫn đủ thông tin); (c) POST individual mỗi cặp 1 endpoint POST /products/{slug}/categories/{cat_id} + DELETE — pros: simple semantic per-pair, audit log granular; cons: N round-trip cho N item, fragment khó atomic, không thấy diff tổng thể, URL phình cho mọi cặp. Shop API chọn PUT replace full set lock vĩnh viễn cho mọi M:N relation update Shop API tương lai (user roles, order tags, notification topics, product variants snapshot). Tại sao ngoại lệ lock B61 PATCH-only: lock B61 quy định PATCH cho user-facing update vì PATCH semantic "partial merge" — gửi field nào update field đó, field thiếu giữ nguyên (no-op safe). Áp dụng vào M:N relation không khớp semantic: nếu PATCH {"category_ids": [1]}, server không biết user muốn "replace toàn bộ" hay "merge thêm vào set hiện tại" — mơ hồ. PUT semantic "replace toàn bộ representation" rõ ràng hơn cho set membership. Generalize: mọi resource có nested collection dạng set (categories của product, roles của user, items của order) MANDATORY dùng PUT replace cho update toàn collection — KHÔNG PATCH cá nhân từng field thuộc collection. Endpoint path lock: PUT /<resource>/{id}/<related-resource> với body {<ids_field>: [...]}.
  2. assign_categories 6 step + race condition + UNNEST batch insert lợi: 6 step: (1) verify product exists + not deleted; (2) verify mọi categories trong input tồn tại + not deleted; (3) get old set cho audit diff; (4) DELETE all current assignments; (5) INSERT new batch qua UNNEST; (6) audit log diff. Bước tránh race condition: cả 6 bước wrap trong cùng transaction là defense chính, KHÔNG bước riêng lẻ nào đủ. Postgres READ COMMITTED isolation default (B73 deep) đảm bảo mỗi transaction nhìn thấy snapshot consistent của data tại thời điểm BEGIN — DELETE + INSERT bên trong cùng transaction atomic visible cho reader concurrent. Scenario race condition nếu thiếu transaction: Admin A gọi assign với [1, 5] cùng lúc Admin B gọi với [3, 7]; nếu không transaction, sequence có thể đan xen: A DELETE → B DELETE (no-op vì A đã xóa) → A INSERT [1,5] → B INSERT [3,7] → kết quả [1,5,3,7] (cả 2 set thay vì 1) hoặc tệ hơn unique constraint violation race. Có transaction → 2 admin queue tuần tự, kết quả deterministic last-writer-wins. Verify product + categories trong transaction defense thêm: nếu admin C đang soft-delete category 5 ngay khi admin A assign [5], MVCC snapshot tại BEGIN của A có thể thấy 5 còn active, nhưng cuối transaction snapshot có thể stale; cần SERIALIZABLE isolation (B73 deep) hoặc trigger constraint để defense hoàn toàn (B66 deep deep). UNNEST batch insert lợi: thay vì loop N statement INSERT INTO product_categories VALUES ($1, $2) chạy N lần (mỗi statement 1 round-trip TCP + 1 lần parse query plan), UNNEST gom array thành 1 statement bulk: INSERT INTO product_categories (product_id, category_id) SELECT $1, UNNEST($2::BIGINT[]) — 1 round-trip + 1 plan + Postgres scan array element execute như N row insert nội bộ. Benchmark thực tế: N = 20 categories, loop 20 statement ~50-80ms (do TCP round-trip dominate), UNNEST ~5-8ms (1 statement). Factor 10× với set vừa, càng lớn càng nghiêng UNNEST. Alternative VALUES list: INSERT VALUES ($1, $2), ($3, $4), ... cũng 1 statement nhưng bind variable scale với N (Postgres protocol limit 65535 bind parameter — cap thực tế ~30K row). UNNEST scale tốt hơn vì 2 parameter cố định (product_id + array). Lock Shop API B64: UNNEST MANDATORY cho mọi bulk insert relation Shop API tương lai (order items batch B66, user roles batch B131, notification topics batch G24). Pattern fundamental Postgres array operation lock vĩnh viễn.
  3. ARRAY_AGG + COALESCE + FILTER pattern + output empty: Vấn đề giải quyết: query JOIN aggregate cần populate field array Vec<T> trong result row, nhưng JOIN có 3 case khó xử lý cùng lúc: (a) product có nhiều category → cần gom thành array thay vì duplicate row; (b) product không có category nào → LEFT JOIN trả 1 row với cột bridge NULL → ARRAY_AGG sẽ output [NULL] 1 element NULL thay empty array (semantic sai cho Rust Vec<i64>); (c) edge case empty set qua ARRAY_AGG trả NULL thay empty array (Postgres aggregate function quirk). 3 component giải quyết tuần tự: (i) ARRAY_AGG(pc.category_id) aggregate cột bridge thành array — gom mọi row JOIN thuộc cùng GROUP BY thành 1 array element; (ii) FILTER (WHERE pc.category_id IS NOT NULL) loại bỏ row LEFT JOIN không match (cột bridge NULL) trước khi aggregate — nếu không có FILTER, output [NULL] cho product không có category, sqlx decode fail; (iii) COALESCE(..., '{}'::BIGINT[]) fallback khi ARRAY_AGG trả NULL (xảy ra khi cả filter loại hết row — không có row nào còn lại để aggregate) — replace bằng empty Postgres array '{}'::BIGINT[]. Output cho product không có category: query trả row product với cột category_ids = '{}'::BIGINT[] Postgres empty array. sqlx decode thành Vec<i64>::new() Rust empty vec. impl From<ProductRowWithCategories> for ProductResponseDto chuyển tiếp thành category_ids: Vec<CategoryId>::new() empty. JSON wire output: {"category_ids": [], ...} empty array literal — client UI render "Chưa phân category" tự nhiên. So sánh thiếu COALESCE: nếu chỉ ARRAY_AGG(...) FILTER (...) không có COALESCE, product không category → ARRAY_AGG trả NULL (vì set rỗng sau filter), sqlx decode fail vì field category_ids: Vec<i64> không nullable → 500 Internal Server Error. Override as "category_ids!: Vec<i64>" dấu ! chỉ giúp compile time nói sqlx "không nullable", runtime NULL vẫn fail. Lock Shop API B64: 3 component MANDATORY khi populate aggregate Vec<T> trong row qua LEFT JOIN — pattern reuse cho mọi 1:N hoặc M:N aggregate Shop API tương lai (product variants list, order items snapshot, user addresses, notification subscribers).
  4. N+1 anti-pattern + scenario + solution Postgres aggregate: N+1 anti-pattern là pitfall kinh điển ORM/data-access: list N entity (1 query), sau đó loop mỗi entity load related data (N query) → tổng N+1 query thay 1 query JOIN aggregate. Scenario cụ thể list 20 products kèm category_ids ngây thơ: code Rust pseudo: let products = sqlx::query_as!(...) SELECT FROM products LIMIT 20 (1 query, 20 row); sau đó for product in products.iter_mut() { let cats: Vec<i64> = sqlx::query_scalar!("SELECT category_id FROM product_categories WHERE product_id = $1", product.id).fetch_all(&pool).await?; product.category_ids = cats; } (20 query thêm). Tổng 21 query. Performance impact: pool size 20 (B56 lock) — 21 query tuần tự bottleneck nếu network DB latency ~2ms/query thì 21 × 2 = 42ms baseline. Với 100 user concurrent → 100 × 21 = 2100 query/request batch, pool exhaust + queue → 200-500ms p99 latency. Trên cloud production DB managed (RDS, Cloud SQL) network latency có thể 5-10ms/query → 21 × 10 = 210ms 1 request, totally unacceptable cho user-facing endpoint <100ms p99 target. Càng đông user + càng N càng tệ — workload growth không scale với pattern này. Solution Postgres aggregate: 1 query JOIN + ARRAY_AGG + GROUP BY thay toàn bộ N+1: SELECT p.*, ARRAY_AGG(pc.category_id) FILTER (WHERE pc.category_id IS NOT NULL) AS category_ids FROM products p LEFT JOIN product_categories pc ON pc.product_id = p.id WHERE ... GROUP BY p.id LIMIT 20. Tại sao nhanh hơn nhiều: (a) 1 round-trip TCP thay 21 round-trip — round-trip cost dominate trong query latency lúc DB load nhẹ; (b) 1 lần parse + plan query thay 21 lần — Postgres prepared statement cache giúp giảm nhưng vẫn có overhead; (c) Index scan tối ưu: Postgres planner optimize JOIN qua composite PK product_categories(product_id, category_id) Nested Loop hoặc Hash Join tùy size — <5ms với 1000 row total bridge; (d) Memory locality: 1 query Postgres giữ working set trong shared_buffers, 21 query có thể bị evict giữa các query nếu cache nhỏ. Benchmark thực tế: 20 product với avg 3 category/product, N+1 pattern ~80-150ms p99 production, JOIN aggregate ~3-8ms p99 (factor 20-50× faster). Workload Shop API user catalog browse hot endpoint MANDATORY 1 query — lock vĩnh viễn pattern ARRAY_AGG + GROUP BY cho mọi list endpoint có nested collection. Generalize: bất kỳ list endpoint trả entity có field nested array/list luôn dùng aggregate trong cùng query; KHÔNG load nested data trong loop application layer. Code review reject MR có N+1 pattern.
  5. CTE pagination với JOIN aggregate vs OFFSET đơn giản + EXPLAIN ví dụ: Vấn đề OFFSET đơn giản trên JOIN aggregate: câu query SELECT p.*, ARRAY_AGG(pc.category_id) FROM products p LEFT JOIN product_categories pc ON pc.product_id = p.id WHERE ... GROUP BY p.id ORDER BY p.created_at DESC LIMIT 20 OFFSET 100 có 2 vấn đề tiềm ẩn: (a) Postgres planner có thể chọn Hash Join hoặc Merge Join tùy size; với LEFT JOIN bridge có nhiều row/product, Postgres scan toàn bộ JOIN result trước khi GROUP BY + ORDER BY + OFFSET. EXPLAIN ANALYZE ví dụ với 10K product + avg 3 cat/product (30K row bridge): HashAggregate (rows=10000) -> Hash Right Join (rows=30000) -> Seq Scan product_categories (rows=30000) -> Hash -> Seq Scan products (rows=10000) — scan toàn bộ rồi sort + skip 100 + lấy 20. Actual time ~50-80ms cho OFFSET 100, scale linear với OFFSET (OFFSET 10000 → ~5 giây). (b) Aggregate cost: GROUP BY trên 10K product compute ARRAY_AGG cho cả 10K rồi mới LIMIT — wasted compute. CTE pagination giải quyết: WITH paginated AS (SELECT * FROM products WHERE deleted_at IS NULL ORDER BY created_at DESC LIMIT 20 OFFSET 100) SELECT p.*, ARRAY_AGG(pc.category_id) FROM paginated p LEFT JOIN product_categories pc ON pc.product_id = p.id GROUP BY p.id. EXPLAIN ANALYZE với cùng data: HashAggregate (rows=20) -> Hash Left Join (rows=~60) -> CTE Scan paginated (rows=20) [Index Scan products_active_idx + LIMIT 20 + skip OFFSET 100 dùng index] -> Hash -> Index Scan product_categories_pkey lookup 20 product_id. Actual time ~3-5ms bất kể OFFSET (vì index seek). Khác biệt cốt lõi: (i) JOIN scope: CTE pagination chỉ JOIN bridge cho 20 product cuối cùng (~60 row) thay 30K row toàn bộ; (ii) Aggregate scope: GROUP BY chỉ trên 20 group thay 10K; (iii) Index usage: CTE dùng products_active_idx + ORDER BY created_at index cho pagination, OFFSET đơn giản có thể bị Postgres skip index nếu Hash Join chọn full scan. Benchmark thực tế: dataset 100K product, OFFSET 5000 + LIMIT 20: OFFSET đơn giản ~800ms-2s, CTE pagination ~5-10ms (factor 100-400× faster). Lock Shop API G7+: CTE pagination MANDATORY cho mọi list endpoint với JOIN aggregate khi pagination cần stable performance. Bonus deep optimization G7+: cursor pagination B96 lock (B61 lock) thay OFFSET với WHERE created_at < $cursor + LIMIT — O(log n) bất kể vị trí, KHÔNG có OFFSET scan tốn kém. Pattern combo: cursor pagination + CTE + JOIN aggregate cho endpoint hot (export, admin dashboard, user catalog browse). Generalize: pattern "paginate then join" lock cho mọi query với multi-table aggregate Shop API — defense in depth chống performance degradation khi data scale. Code review checklist: list endpoint với LIMIT/OFFSET + JOIN aggregate phải dùng CTE.
11

Bài Tiếp Theo

— chi tiết soft delete pattern cho orders + users + categories (orders cancel state, user deactivate, category move parent với cycle check qua WITH RECURSIVE EXISTS subquery), bulk restore endpoint, audit trail filter qua /audit-logs endpoint admin.