Danh sách bài viết

Bài 54: sqlx Transaction + Savepoint — ATOMIC Order Creation

Bài 54 của series Rust RESTful API — đi sâu transaction sqlx áp dụng cho ATOMIC order creation Shop API: ACID 4 properties (Atomicity tất cả hoặc không có gì, Consistency DB constraint giữ nguyên, Isolation concurrent transaction không lẫn lộn, Durability commit xong persist trên đĩa); pool.begin() tạo Transaction<'_, Postgres> + tx.commit() / tx.rollback() explicit pattern; cú pháp &mut *tx deref về &mut PgConnection cho mọi execute; async drop pitfall — Drop trait đồng bộ, rollback async cần .await → pattern explicit match Ok => commit / Err => rollback MANDATORY tránh dropped silently; SAVEPOINT nested transaction qua tx.begin() trong tx parent (Postgres SAVEPOINT/RELEASE/ROLLBACK TO) cho side-effect optional partial rollback; 3 isolation level Postgres — READ COMMITTED default 95% query Shop API, REPEATABLE READ snapshot consistent cho report, SERIALIZABLE highest detect conflict + retry cho financial; tạo 2 migration mới — orders + order_items M:N bridge table + payments với payment_type TEXT discriminator + payment_payload JSONB (B43 lock continued), FK strategy CASCADE order→items + RESTRICT items→products + UNIQUE payments.order_id (1 order = 1 payment); implement create_order_atomic service 5 step (lock products FOR UPDATE + insert order + insert items + decrement stock + insert payment) all-or-nothing semantic; OrderError thiserror enum 3 variant (Sqlx #[from], ProductNotFound, InsufficientStock) domain error tách biệt DB error vs business error; verify rollback test insufficient stock → toàn bộ rollback giữ DB nguyên trạng; HTTP endpoint POST /api/v1/orders handler defer G7 B66 — B54 chỉ implement service layer; foundation cho B55 sqlx error mapping sâu (SQLSTATE 23505/23503/23514).

15/06/2026
14 phút đọc
2 lượt xem
1

Mục Tiêu Bài Học

Sau bài học, bạn sẽ:

  • Hiểu ACID properties và tại sao transaction quan trọng.
  • Dùng pool.begin() tạo transaction + COMMIT/ROLLBACK pattern.
  • Hiểu async drop pitfallTransaction drop KHÔNG auto rollback async, cần explicit.
  • Implement SAVEPOINT nested transaction (partial rollback).
  • Phân biệt isolation levelREAD COMMITTED (default), REPEATABLE READ, SERIALIZABLE.
  • Tạo migration orders + payments table (core e-commerce schema).
  • Implement create_order service ATOMIC: insert order + decrement stock + insert payment trong 1 transaction.
  • Verify rollback khi 1 step fail (stock insufficient → toàn bộ rollback).
2

ACID Properties + Tại Sao Cần Transaction

ACID (Atomicity, Consistency, Isolation, Durability) là 4 thuộc tính nền tảng mọi relational database tuân thủ, lần đầu được Andreas Reuter và Theo Härder hệ thống hóa năm 1983.

  • Atomicity (tính nguyên tử) — tất cả statement trong transaction commit toàn bộ hoặc không có gì. Không có trạng thái "nửa chừng".
  • Consistency (tính nhất quán) — sau khi transaction xong, DB vẫn tuân thủ mọi constraint (CHECK, UNIQUE, FK, NOT NULL).
  • Isolation (tính cô lập) — concurrent transaction không thấy data tạm thời chưa commit của transaction khác (mức cô lập tùy isolation level — Bước 6).
  • Durability (tính bền vững) — sau COMMIT trả thành công, data persist trên đĩa, không mất kể cả server crash ngay sau đó (Postgres ghi WAL trước khi báo commit thành công).

Scenario Shop API thực tế: tạo order với 2 sản phẩm, cần 5 bước phối hợp:

create_order(user, [product_a x2, product_b x1]):
  1. INSERT orders (user_id, total) → order_id
  2. INSERT order_items (order_id, product_id, qty) x 2
  3. UPDATE products SET stock = stock - qty (check stock >= qty)
  4. INSERT payments (order_id, type, payload)
  5. Trả về order_id

Khi KHÔNG dùng transaction, mọi statement chạy độc lập (auto-commit). Vấn đề:

  • Step 3 fail (stock insufficient): order đã insert (step 1), order_items đã insert (step 2), nhưng stock không update được. DB bẩn — có order treo không hoàn thiện, user thấy order trong lịch sử nhưng kho không trừ.
  • Step 4 fail (DB connection drop): order đã insert + items + stock decrement, nhưng payment record chưa tạo. Nếu sau đó nhận webhook xác nhận thanh toán → không có row payment để match → tiền đã nhận nhưng order trạng thái pending vĩnh viễn.

Solution: bao toàn bộ 5 step trong 1 transaction. Bất kỳ step fail → ROLLBACK toàn bộ. Hoặc tất cả thành công → COMMIT atomic. Không có trạng thái nửa chừng nào lộ ra ngoài.

3

pool.begin() + Transaction Lifecycle

Method PgPool::begin() mở 1 transaction mới. sqlx send BEGIN tới Postgres, trả về Transaction<'_, Postgres> giữ connection riêng cho đến khi commit hoặc rollback.

use sqlx::PgPool;

async fn demo(pool: &PgPool) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;  // BEGIN

    sqlx::query!("INSERT INTO orders (user_id, total) VALUES ($1, $2)", 1_i64, 100_i64)
        .execute(&mut *tx)
        .await?;

    sqlx::query!("UPDATE products SET stock = stock - 1 WHERE id = $1", 42_i64)
        .execute(&mut *tx)
        .await?;

    tx.commit().await?;  // COMMIT explicit
    Ok(())
}

Cú pháp &mut *tx nhìn lạ lùng nhưng cần thiết. Transaction là wrapper deref qua trait DerefMut về &mut PgConnection. Phía sqlx, mọi execute/fetch_one nhận impl Executor — connection mới impl trait này, không phải transaction wrapper. Phép *tx deref về PgConnection, &mut mượn mutable. Viết &mut tx không deref → compile error.

Sau khi gọi tx.commit().await?, sqlx send COMMIT + giải phóng connection về pool. Sau đó tx đã consume (method commit nhận self by-value), không gọi tiếp được.

Hai method chính trên Transaction:

  • tx.commit().await? — send COMMIT, persist mọi thay đổi.
  • tx.rollback().await? — send ROLLBACK, undo toàn bộ.

Pattern Rust idiomatic dùng ? operator để bubble error sớm. Nếu bất kỳ step fail, ? return error ngay, tx chưa kịp .commit() — sqlx cố gắng rollback ở Drop (nhưng có pitfall async — Bước 4).

async fn create_order_atomic(pool: &PgPool) -> Result<i64, AppError> {
    let mut tx = pool.begin().await?;

    let order_id = insert_order(&mut tx).await?;
    insert_items(&mut tx, order_id).await?;
    decrement_stock(&mut tx).await?;
    insert_payment(&mut tx, order_id).await?;

    tx.commit().await?;  // chỉ commit khi tất cả OK
    Ok(order_id)
}

Lifetime của Transaction<'_, Postgres> gắn với pool — borrow checker đảm bảo transaction không sống lâu hơn pool. Mỗi transaction giữ 1 connection riêng, không share connection giữa nhiều transaction; max concurrent transaction = max_connections pool config (lock B51: 20).

4

Async Drop Pitfall + Best Practice

Rust trait Drop là đồng bộ — method fn drop(&mut self) không thể .await. Trong khi đó ROLLBACK qua sqlx là async — cần gửi câu lệnh qua TCP tới Postgres, chờ phản hồi.

sqlx workaround bằng cách spawn task background trong Drop impl nếu transaction chưa được commit/rollback explicit. Task này cố gắng gửi ROLLBACK qua connection trước khi trả về pool. Vấn đề:

  • Nếu runtime đang shutdown, task spawn không kịp chạy → connection trả về pool ở trạng thái "đang trong transaction" → request kế tiếp dùng connection này bị conflict.
  • Nếu connection đã drop (network glitch), rollback không gửi được → Postgres tự rollback khi timeout (mặc định idle_in_transaction_session_timeout) nhưng có độ trễ.
  • Log warning: "transaction was not committed or rolled back before drop, executing rollback in background" — gây spam log nếu xảy ra thường xuyên.

Pattern an toàn — explicit match Ok/Err:

async fn create_order_safe(pool: &PgPool) -> Result<i64, OrderError> {
    let mut tx = pool.begin().await?;

    match do_work(&mut tx).await {
        Ok(order_id) => {
            tx.commit().await?;
            Ok(order_id)
        }
        Err(e) => {
            // Explicit rollback — đảm bảo gửi ROLLBACK qua connection
            // trước khi trả về pool, không phụ thuộc Drop
            tx.rollback().await?;
            Err(e)
        }
    }
}

Alternative — helper wrapper function tổng quát:

use sqlx::{PgPool, Postgres, Transaction};
use futures::future::BoxFuture;

pub async fn with_tx<F, R, E>(pool: &PgPool, f: F) -> Result<R, E>
where
    F: for<'a> FnOnce(&'a mut Transaction<'_, Postgres>) -> BoxFuture<'a, Result<R, E>>,
    E: From<sqlx::Error>,
{
    let mut tx = pool.begin().await.map_err(E::from)?;
    match f(&mut tx).await {
        Ok(result) => {
            tx.commit().await.map_err(E::from)?;
            Ok(result)
        }
        Err(e) => {
            let _ = tx.rollback().await;  // best effort, ignore error
            Err(e)
        }
    }
}

Quyết định lock Shop API: dùng pattern 1 (explicit match) cho mọi service tạo transaction — đơn giản, đọc rõ flow, không phụ thuộc generic-heavy helper. Helper with_tx để dành cho trường hợp cần factor lại nhiều service cùng pattern (xét lại G14 khi có 5+ service dùng transaction).

Quy tắc vĩnh viễn: KHÔNG bao giờ dựa vào Drop để rollback. Mọi đường thoát hàm phải gọi tx.commit() hoặc tx.rollback() tường minh.

5

SAVEPOINT — Nested Transaction (Partial Rollback)

Postgres KHÔNG hỗ trợ true nested transaction (không có BEGIN bên trong BEGIN). Thay vào đó, Postgres cung cấp SAVEPOINT — đánh dấu 1 điểm trong transaction hiện tại để có thể rollback ngược về điểm đó mà không hủy toàn bộ transaction outer.

sqlx ánh xạ savepoint qua method tx.begin() gọi trên 1 transaction đã mở. Method trả về 1 Transaction lồng — commit thì release savepoint, rollback thì rollback tới savepoint.

async fn create_order_with_optional_notify(
    pool: &PgPool,
) -> Result<i64, OrderError> {
    let mut tx = pool.begin().await?;

    // Required step
    let order_id = insert_order(&mut tx).await?;

    // Optional step: gửi notification — fail không ảnh hưởng order
    {
        let mut sp = tx.begin().await?;  // SAVEPOINT
        match send_notification(&mut sp, order_id).await {
            Ok(_) => sp.commit().await?,    // RELEASE SAVEPOINT
            Err(_) => sp.rollback().await?, // ROLLBACK TO SAVEPOINT
        }
    }

    // Outer transaction vẫn tiếp tục
    decrement_stock(&mut tx).await?;
    tx.commit().await?;
    Ok(order_id)
}

SQL underlying mà Postgres thực thi:

BEGIN;
  INSERT INTO orders ...;
  SAVEPOINT sp_1;
    INSERT INTO notifications ...;
  RELEASE SAVEPOINT sp_1;   -- hoặc ROLLBACK TO sp_1
  UPDATE products SET stock = ...;
COMMIT;

Use case Shop API điển hình cho savepoint:

  • Audit log — ghi audit trail là side-effect quan trọng nhưng không critical; nếu audit insert fail (vd JSON serialize lỗi) không nên block order.
  • Notification — gửi event tới notification service (B71 chuẩn bị); fail không ảnh hưởng order chính.
  • Analytics tracking — log event analytics (count product view, conversion); fail không ảnh hưởng UX.

Quyết định lock Shop API: dùng SAVEPOINT cho side-effect optional. Required step (insert order, decrement stock, insert payment) KHÔNG bọc savepoint — fail là rollback toàn bộ.

6

Isolation Level — READ COMMITTED, REPEATABLE READ, SERIALIZABLE

SQL standard định nghĩa 4 isolation level. Postgres support 3 level thực sự (READ UNCOMMITTED bị nâng lên READ COMMITTED vì Postgres MVCC không cho phép đọc data uncommitted).

  • READ COMMITTED (mặc định Postgres) — mỗi statement thấy snapshot data đã commit tại thời điểm statement bắt đầu. 2 lần SELECT trong cùng transaction có thể thấy giá trị khác nhau nếu transaction khác commit giữa chừng (phantom read, non-repeatable read). Dùng cho 95% query Shop API.
  • REPEATABLE READ — snapshot ở thời điểm BEGIN. Mọi SELECT trong transaction thấy data y hệt như lúc BEGIN. Tránh được non-repeatable read và phantom read. Phù hợp cho report query lớn cần snapshot nhất quán (vd báo cáo doanh thu tháng — không muốn nửa chừng giá trị thay đổi).
  • SERIALIZABLE — như thể transaction chạy sequential. Postgres detect conflict + abort 1 trong 2 transaction → application phải retry. Chậm nhất, isolation cao nhất. Dùng cho financial critical (chuyển tiền giữa 2 tài khoản phải đảm bảo không bị race).

Set isolation level trong sqlx qua statement SET TRANSACTION sau khi mở transaction:

let mut tx = pool.begin().await?;

sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
    .execute(&mut *tx)
    .await?;

// ... query trong snapshot
let total = sqlx::query!("SELECT SUM(total) AS total FROM orders WHERE created_at >= $1", since)
    .fetch_one(&mut *tx)
    .await?;

tx.commit().await?;

Quyết định lock Shop API:

  • READ COMMITTED mặc định cho mọi transaction CRUD (lock B54).
  • REPEATABLE READ chỉ cho report endpoint admin dashboard (G14 chuẩn bị) — đảm bảo số liệu snapshot consistent.
  • SERIALIZABLE chỉ khi domain thực sự critical (preview B66 stock decrement) — nhưng thường SELECT ... FOR UPDATE pessimistic lock đủ, không cần nâng isolation.

Alternative: dùng SELECT ... FOR UPDATE row lock pessimistic — Postgres giữ row lock tới khi commit, transaction khác chờ. Cách này predictable hơn SERIALIZABLE (không bị abort + retry ngẫu nhiên), thường được prefer cho scenario stock decrement.

7

Migration orders + payments Table

Tạo migration thứ 3 cho table orders + order_items (M:N bridge):

sqlx migrate add --source crates/shop-db/migrations create_orders

Filename: 20260615140000_create_orders.sql. Edit nội dung:

-- File: crates/shop-db/migrations/20260615140000_create_orders.sql
CREATE TABLE orders (
    id BIGSERIAL PRIMARY KEY,
    user_id BIGINT NOT NULL,
    total NUMERIC(15, 2) NOT NULL CHECK (total > 0),
    status TEXT NOT NULL DEFAULT 'pending'
        CHECK (status IN ('pending', 'paid', 'shipped', 'delivered', 'cancelled')),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX orders_user_idx ON orders(user_id);
CREATE INDEX orders_status_idx ON orders(status);
CREATE INDEX orders_created_at_idx ON orders(created_at DESC);

CREATE TABLE order_items (
    id BIGSERIAL PRIMARY KEY,
    order_id BIGINT NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
    product_id BIGINT NOT NULL REFERENCES products(id) ON DELETE RESTRICT,
    quantity INT NOT NULL CHECK (quantity > 0),
    unit_price NUMERIC(15, 2) NOT NULL CHECK (unit_price > 0),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX order_items_order_idx ON order_items(order_id);
CREATE INDEX order_items_product_idx ON order_items(product_id);

Migration thứ 4 cho table payments:

sqlx migrate add --source crates/shop-db/migrations create_payments
-- File: crates/shop-db/migrations/20260615140100_create_payments.sql
CREATE TABLE payments (
    id BIGSERIAL PRIMARY KEY,
    order_id BIGINT NOT NULL UNIQUE REFERENCES orders(id) ON DELETE CASCADE,
    payment_type TEXT NOT NULL
        CHECK (payment_type IN ('stripe', 'bank_transfer', 'cod')),
    payment_payload JSONB NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending'
        CHECK (status IN ('pending', 'success', 'failed')),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX payments_order_idx ON payments(order_id);
CREATE INDEX payments_status_idx ON payments(status);

Apply 2 migration:

sqlx migrate run --source crates/shop-db/migrations
# Applied 20260615140000/migrate create_orders
# Applied 20260615140100/migrate create_payments

Quyết định schema lock B54:

  • order_items bridge table — quan hệ M:N giữa ordersproducts qua bảng trung gian. Mỗi row 1 line item với quantity + unit_price snapshot (giữ giá tại thời điểm mua, không reference giá hiện tại).
  • FK CASCADE order_items.order_id → orders — xóa order tự xóa items kèm. Logic: items không tồn tại độc lập.
  • FK RESTRICT order_items.product_id → products — cấm xóa product nếu còn order item tham chiếu. Lý do: lịch sử order phải tra được product nào đã bán.
  • payments.order_id UNIQUE — 1 order chỉ có 1 payment record (lock B43). Refund tạo row riêng ở bảng refunds (G21 chuẩn bị) thay vì update payment.
  • payment_payload JSONB — payload riêng cho từng payment_type (Stripe: charge_id, BankTransfer: bank_code + account_number, Cod: phone + address). Lock B43 continued — TEXT discriminator + JSONB pattern.

Sau apply migration, generate offline cache cho mọi macro mới (lock B53):

cargo sqlx prepare --workspace --database-url $DATABASE_URL
8

Implement create_order ATOMIC

Tạo module mới crates/shop-db/src/orders.rs chứa service layer atomic:

// File: crates/shop-db/src/orders.rs
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde_json::Value;
use sqlx::PgPool;

#[derive(Debug, Clone)]
pub struct CreateOrderItem {
    pub product_id: i64,
    pub quantity: i32,
}

#[derive(Debug)]
pub struct OrderRow {
    pub id: i64,
    pub user_id: i64,
    pub total: Decimal,
    pub status: String,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

#[derive(Debug, thiserror::Error)]
pub enum OrderError {
    #[error(transparent)]
    Sqlx(#[from] sqlx::Error),

    #[error("product {0} not found")]
    ProductNotFound(i64),

    #[error("insufficient stock for product {product_id}: requested {requested}, available {available}")]
    InsufficientStock {
        product_id: i64,
        requested: i32,
        available: i32,
    },
}

Function chính create_order_atomic — 5 step trong 1 transaction:

pub async fn create_order_atomic(
    pool: &PgPool,
    user_id: i64,
    items: Vec<CreateOrderItem>,
    payment_type: &str,
    payment_payload: Value,
) -> Result<OrderRow, OrderError> {
    let mut tx = pool.begin().await?;

    // Step 1: Lock products FOR UPDATE + check stock + tính total
    let mut total = Decimal::ZERO;
    let mut item_details: Vec<(i64, i32, Decimal)> = Vec::new();

    for item in &items {
        let product = sqlx::query!(
            r#"SELECT id, price, stock FROM products WHERE id = $1 FOR UPDATE"#,
            item.product_id
        )
        .fetch_optional(&mut *tx)
        .await?
        .ok_or(OrderError::ProductNotFound(item.product_id))?;

        if product.stock < item.quantity {
            tx.rollback().await?;
            return Err(OrderError::InsufficientStock {
                product_id: item.product_id,
                requested: item.quantity,
                available: product.stock,
            });
        }

        total += product.price * Decimal::from(item.quantity);
        item_details.push((item.product_id, item.quantity, product.price));
    }

    // Step 2: Insert order
    let order = sqlx::query_as!(
        OrderRow,
        r#"
        INSERT INTO orders (user_id, total)
        VALUES ($1, $2)
        RETURNING id, user_id, total, status, created_at, updated_at
        "#,
        user_id,
        total
    )
    .fetch_one(&mut *tx)
    .await?;

    // Step 3: Insert order_items
    for (product_id, quantity, unit_price) in &item_details {
        sqlx::query!(
            r#"
            INSERT INTO order_items (order_id, product_id, quantity, unit_price)
            VALUES ($1, $2, $3, $4)
            "#,
            order.id,
            product_id,
            quantity,
            unit_price
        )
        .execute(&mut *tx)
        .await?;
    }

    // Step 4: Decrement stock
    for (product_id, quantity, _) in &item_details {
        sqlx::query!(
            r#"UPDATE products SET stock = stock - $2 WHERE id = $1"#,
            product_id,
            quantity
        )
        .execute(&mut *tx)
        .await?;
    }

    // Step 5: Insert payment
    sqlx::query!(
        r#"
        INSERT INTO payments (order_id, payment_type, payment_payload)
        VALUES ($1, $2, $3)
        "#,
        order.id,
        payment_type,
        payment_payload
    )
    .execute(&mut *tx)
    .await?;

    // Tất cả OK → COMMIT
    tx.commit().await?;
    Ok(order)
}

Update crates/shop-db/src/lib.rs re-export module:

// File: crates/shop-db/src/lib.rs
pub mod orders;
pub mod payments;  // skeleton placeholder cho B71+
pub mod pool;
pub mod products;

pub use pool::create_pool;

Module payments.rs để trống placeholder, sẽ implement chi tiết khi tích hợp Stripe ở B71+ (handler webhook, lookup payment by order_id).

Note quan trọng về FOR UPDATE row lock ở Step 1: Postgres giữ row lock trên products row cho đến khi transaction commit/rollback. Transaction khác cố gắng SELECT ... FOR UPDATE hoặc UPDATE cùng row sẽ chờ. Pattern này prevent race condition kiểu:

  • T1: SELECT stock → đọc 5
  • T2: SELECT stock → đọc 5 (cùng lúc với T1)
  • T1: kiểm 5 >= 3 OK → UPDATE stock = 2
  • T2: kiểm 5 >= 4 OK → UPDATE stock = 1
  • Kết quả: cả 2 order pass check nhưng tổng bán 7 sản phẩm trong khi stock thực 5 — overselling.

Với FOR UPDATE, T2 chờ T1 commit rồi đọc lại stock = 2 → check 2 >= 4 fail → InsufficientStock error. Quyết định lock Shop API: dùng pessimistic lock FOR UPDATE cho stock decrement; isolation READ COMMITTED đủ — không cần nâng SERIALIZABLE.

9

Verify ATOMIC + Rollback Test

Apply migration + start server:

sqlx migrate run --source crates/shop-db/migrations
AUTO_MIGRATE=true cargo run -p shop-api

Test 1 — setup product test data qua HTTP endpoint:

curl -X POST http://localhost:3000/api/v1/products \
  -H 'Content-Type: application/json' \
  -d '{"name":"iPhone 15","slug":"iphone-15","price":"25000000.00","stock":10}'

HTTP endpoint POST /api/v1/orders chưa expose ở B54 — defer G7 (B66) khi viết HTTP handler chuyển State<AppState> + ValidatedJson<CreateOrderDto> → gọi shop_db::orders::create_order_atomic. B54 chỉ implement service layer; verify qua integration test hoặc psql trực tiếp.

Verify state trước test rollback qua psql:

docker compose exec postgres psql -U shop -d shop_dev -c \
  "SELECT id, slug, stock FROM products WHERE slug = 'iphone-15';"
# id | slug      | stock
# ---+-----------+-------
#  1 | iphone-15 |    10

Test 2 — gọi service trực tiếp với quantity = 100 (vượt stock = 10) để trigger rollback. Đoạn pseudo-test trong tests/orders_atomic.rs:

#[sqlx::test(migrations = "crates/shop-db/migrations")]
async fn create_order_rollback_on_insufficient_stock(pool: PgPool) {
    // Seed: 1 product với stock = 10
    sqlx::query!(
        r#"INSERT INTO products (name, slug, price, stock) VALUES ($1, $2, $3, $4)"#,
        "iPhone 15", "iphone-15", Decimal::new(2500000000, 2), 10_i32
    )
    .execute(&pool)
    .await
    .unwrap();

    // Act: tạo order với quantity vượt stock
    let result = create_order_atomic(
        &pool,
        1,
        vec![CreateOrderItem { product_id: 1, quantity: 100 }],
        "cod",
        serde_json::json!({"phone": "+84912345678"}),
    )
    .await;

    // Assert error
    assert!(matches!(result, Err(OrderError::InsufficientStock { .. })));

    // Assert DB nguyên trạng — rollback toàn bộ
    let order_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM orders")
        .fetch_one(&pool).await.unwrap();
    assert_eq!(order_count.0, 0);

    let item_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM order_items")
        .fetch_one(&pool).await.unwrap();
    assert_eq!(item_count.0, 0);

    let stock: (i32,) = sqlx::query_as("SELECT stock FROM products WHERE id = 1")
        .fetch_one(&pool).await.unwrap();
    assert_eq!(stock.0, 10);  // stock vẫn = 10, không bị decrement
}

Verify thủ công qua psql sau test:

-- Trước test
SELECT COUNT(*) FROM orders;        -- 0
SELECT stock FROM products WHERE id = 1;  -- 10

-- Sau test fail (InsufficientStock error)
SELECT COUNT(*) FROM orders;        -- vẫn 0 (rollback OK)
SELECT COUNT(*) FROM order_items;   -- vẫn 0
SELECT COUNT(*) FROM payments;      -- vẫn 0
SELECT stock FROM products WHERE id = 1;  -- vẫn 10

Mọi side-effect DB đã rollback. Atomicity hoạt động đúng.

Suggested commit: B54: ATOMIC create_order với transaction + savepoint + 2 migration orders+payments.

10

Tổng Kết

  • ACID properties — transaction đảm bảo Atomicity + Isolation chính, Consistency + Durability là tính chất nền của Postgres.
  • pool.begin() tạo Transaction<'_, Postgres>; tx.commit() / tx.rollback() explicit.
  • &mut *tx syntax cho execute query trên transaction (deref về &mut PgConnection).
  • Async drop pitfall — Drop trait đồng bộ, rollback async cần await; pattern explicit match Ok => commit / Err => rollback MANDATORY.
  • SAVEPOINT nested transaction qua tx.begin() trong tx parent — partial rollback cho side-effect optional (audit log, notification, analytics).
  • Isolation level 3 mức Postgres: READ COMMITTED default 95% query, REPEATABLE READ report snapshot, SERIALIZABLE financial critical.
  • Postgres mặc định READ COMMITTED đủ cho hầu hết Shop API; không cần nâng lên trừ scenario đặc biệt.
  • SELECT ... FOR UPDATE pessimistic row lock — alternative SERIALIZABLE, predictable hơn cho stock decrement.
  • 2 migration mớiorders + order_items (M:N bridge) + payments (TEXT discriminator + JSONB payload).
  • FK strategy — CASCADE order→items, RESTRICT order_items→products, UNIQUE payments.order_id.
  • create_order_atomic 5 step — lock products + insert order + insert items + decrement stock + insert payment.
  • OrderError enum — Sqlx, ProductNotFound, InsufficientStock domain error rõ ràng tách biệt DB và business.
  • HTTP endpoint POST /api/v1/orders defer G7 (B66) — B54 chỉ implement service layer.
  • File path lock: crates/shop-db/src/orders.rs, 2 migration mới 20260615140000_create_orders.sql + 20260615140100_create_payments.sql.
11

Bài Tập Củng Cố

Tự trả lời, đáp án ở cuối:

  1. 4 ACID properties là gì? Transaction giải quyết property nào trong scenario create_order Shop API?
  2. Async drop pitfall của sqlx Transaction là gì? Best practice xử lý ra sao? Cho ví dụ code.
  3. SAVEPOINT khác nested transaction thật ra sao? Postgres có support nested transaction không?
  4. 3 isolation level Postgres khác nhau ra sao? Khi nào dùng REPEATABLE READ?
  5. SELECT ... FOR UPDATE row lock so với SERIALIZABLE isolation — pros/cons mỗi cách?
Đáp án
  1. 4 ACID + transaction trong create_order: ACID viết tắt Atomicity (tất cả statement commit toàn bộ hoặc không có gì — không trạng thái nửa chừng), Consistency (sau commit DB vẫn tuân thủ mọi constraint CHECK + UNIQUE + FK + NOT NULL), Isolation (concurrent transaction không thấy data tạm thời chưa commit của transaction khác — mức cô lập tùy isolation level), Durability (sau COMMIT trả thành công, data persist trên đĩa qua WAL, không mất kể cả server crash). Trong scenario create_order Shop API (5 step: insert order → insert items → check + decrement stock → insert payment), transaction giải quyết chủ yếu Atomicity — nếu step 3 fail (stock insufficient) hoặc step 4 fail (connection drop), rollback toàn bộ → không có order treo + không có items orphan + stock không decrement nửa chừng. Isolation phụ trợ — concurrent 2 user mua cùng product không đè nhau qua FOR UPDATE row lock (lock B54). Consistency đảm bảo qua DB constraint (CHECK total > 0, CHECK stock >= 0 nếu thêm vào products, FK CASCADE/RESTRICT, UNIQUE payments.order_id) — DB tự reject statement vi phạm constraint, transaction rollback. Durability đảm bảo qua Postgres WAL ghi disk trước khi báo COMMIT thành công — nếu Postgres crash ngay sau COMMIT, recover từ WAL khi restart không mất data.
  2. Async drop pitfall + best practice: Rust trait Drop đồng bộ (method fn drop(&mut self) không thể .await), trong khi ROLLBACK qua sqlx là async (gửi câu lệnh TCP tới Postgres chờ phản hồi). sqlx workaround bằng spawn task background trong Drop impl nếu transaction chưa commit/rollback explicit. Vấn đề: (a) runtime shutdown task spawn không kịp → connection trả pool ở trạng thái "đang trong transaction" → request kế tiếp dùng connection conflict; (b) connection drop network glitch → rollback không gửi được → Postgres tự rollback khi timeout có độ trễ; (c) log warning "transaction was not committed or rolled back before drop" spam log. Best practice: pattern explicit match Ok/Err. Code: let mut tx = pool.begin().await?; match do_work(&mut tx).await { Ok(result) => { tx.commit().await?; Ok(result) } Err(e) => { tx.rollback().await?; Err(e) } }. Quy tắc vĩnh viễn Shop API: KHÔNG bao giờ dựa vào Drop để rollback, mọi đường thoát hàm phải gọi tx.commit() hoặc tx.rollback() tường minh. Alternative pattern helper with_tx<F, R, E>(pool, f) wrap closure callback — gọn nhưng generic-heavy, để dành cho trường hợp 5+ service cùng pattern (xét lại G14).
  3. SAVEPOINT vs nested transaction + Postgres support: Postgres KHÔNG support true nested transaction (không có BEGIN trong BEGIN). Thay vào đó cung cấp SAVEPOINT — đánh dấu 1 điểm trong transaction hiện tại để có thể rollback ngược về điểm đó mà không hủy toàn bộ outer transaction. SQL: BEGIN; INSERT ...; SAVEPOINT sp1; INSERT ...; ROLLBACK TO sp1; UPDATE ...; COMMIT; — outer transaction vẫn alive, chỉ phần giữa SAVEPOINT + ROLLBACK TO bị undo. sqlx ánh xạ qua method tx.begin() gọi trên 1 transaction đã mở — trả về Transaction lồng, commit thì release savepoint, rollback thì rollback tới savepoint. Khác biệt với true nested: nested thật (vd MS SQL Server với BEGIN TRAN nested) inner transaction có thể commit độc lập với outer — outer rollback inner vẫn giữ. SAVEPOINT không tách biệt như vậy — outer rollback xóa luôn savepoint inner đã release. Use case Shop API dùng SAVEPOINT cho side-effect optional (audit log, notification, analytics tracking) — required step (insert order, decrement stock, insert payment) KHÔNG bọc savepoint vì fail là rollback toàn bộ.
  4. 3 isolation level Postgres + khi nào REPEATABLE READ: READ COMMITTED (mặc định) — mỗi statement thấy snapshot data đã commit tại thời điểm statement bắt đầu. 2 lần SELECT trong cùng transaction có thể thấy giá trị khác nhau nếu transaction khác commit giữa chừng (phantom read, non-repeatable read). Phù hợp 95% query Shop API CRUD đơn giản. REPEATABLE READ — snapshot ở thời điểm BEGIN. Mọi SELECT trong transaction thấy data y hệt như lúc BEGIN. Tránh được non-repeatable read và phantom read. SERIALIZABLE — như thể transaction chạy sequential. Postgres detect conflict bằng cách track read/write dependency, abort 1 trong 2 transaction → application phải retry. Chậm nhất, isolation cao nhất. Khi nào REPEATABLE READ: dùng cho report query lớn cần snapshot nhất quán — vd báo cáo doanh thu tháng query nhiều bảng (orders, payments, users) trong cùng transaction; không muốn nửa chừng có transaction khác commit làm số liệu lệch. Cũng dùng cho export dữ liệu lớn — đảm bảo dataset xuất ra nhất quán cùng 1 thời điểm. Lock Shop API: REPEATABLE READ chỉ cho report endpoint admin dashboard (G14 chuẩn bị), CRUD đơn giản giữ READ COMMITTED default. Set qua SET TRANSACTION ISOLATION LEVEL REPEATABLE READ sau khi gọi pool.begin().
  5. SELECT ... FOR UPDATE vs SERIALIZABLE pros/cons: FOR UPDATE pessimistic row lock — Postgres giữ row lock trên row được SELECT FOR UPDATE cho đến khi transaction commit/rollback. Transaction khác cố gắng SELECT FOR UPDATE hoặc UPDATE cùng row sẽ chờ. Pros: (a) predictable — chỉ những transaction touch cùng row mới block, không random abort; (b) đơn giản hiểu — semantic giống mutex; (c) hiệu năng tốt cho hot row decrement scenario (stock decrement) — Postgres optimize lock conflict; (d) không cần retry logic — chờ lock thay vì abort + retry. Cons: (a) deadlock nếu 2 transaction lock 2 row theo thứ tự khác nhau — Postgres detect + abort 1, application phải retry; (b) lock granular cấp row, không protect được logic spanning nhiều bảng dạng "tổng số order trong tháng phải < X"; (c) phải nhớ thêm FOR UPDATE mọi nơi cần protect — dễ quên. SERIALIZABLE — Postgres detect conflict bằng cách track read/write dependency tự động, abort khi có cycle. Pros: (a) protect logic phức tạp (vd check tổng trước insert) không cần FOR UPDATE thủ công; (b) declarative — không phải nhớ thêm lock từng nơi. Cons: (a) abort + retry overhead cao khi contention; (b) random — khó debug khi nào transaction bị abort; (c) yêu cầu application implement retry logic với exponential backoff; (d) hiệu năng kém hơn row lock cho scenario hot row. Quyết định lock Shop API: dùng FOR UPDATE cho stock decrement (predictable, đơn giản, hiệu năng tốt). SERIALIZABLE chỉ dùng khi domain critical (financial transfer money giữa 2 account) — preview B66 deep.
12

Bài Tiếp Theo

— chi tiết error sqlx::Error variants: Database (PgDatabaseError với code SQLSTATE), RowNotFound, Tls, Configuration, Pool timeout; map sang AppError với error code SQLSTATE phổ biến (23505 unique violation, 23503 FK violation, 23514 check violation); custom AppError variant cho domain error (InsufficientStock, etc).