Bài 16: Working với Databases - PostgresQL

PostgreSQL với psycopg2

PostgreSQL là production-grade database với nhiều features advanced.

Cài Đặt

# Install psycopg2pip install psycopg2-binary # Or compile từ source (production)pip install psycopg2

Kết Nối PostgreSQL

import psycopg2from psycopg2 import sql # Kết nối databaseconn = psycopg2.connect(    host="localhost",    port=5432,    database="mydb",    user="postgres",    password="password") # Hoặc dùng connection stringconn = psycopg2.connect(    "postgresql://postgres:password@localhost:5432/mydb") cursor = conn.cursor() # Execute querycursor.execute("SELECT version()")version = cursor.fetchone()print(f"PostgreSQL version: {version[0]}") conn.close()

CRUD Operations với PostgreSQL

import psycopg2from psycopg2.extras import RealDictCursor conn = psycopg2.connect(    "postgresql://postgres:password@localhost:5432/mydb") # Use RealDictCursor để get dict resultscursor = conn.cursor(cursor_factory=RealDictCursor) # Create tablecursor.execute("""    CREATE TABLE IF NOT EXISTS products (        id SERIAL PRIMARY KEY,        name VARCHAR(100) NOT NULL,        price DECIMAL(10, 2) NOT NULL,        stock INTEGER DEFAULT 0,        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP    )""") # Insertcursor.execute(    "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s) RETURNING id",    ("Laptop", 999.99, 10))product_id = cursor.fetchone()["id"]print(f"Inserted product ID: {product_id}") # Insert manyproducts = [    ("Mouse", 29.99, 50),    ("Keyboard", 79.99, 30),    ("Monitor", 299.99, 15),]cursor.executemany(    "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s)",    products) # Querycursor.execute("SELECT * FROM products WHERE price > %s", (50,))for product in cursor.fetchall():    print(f"{product['name']}: ${product['price']}") # Updatecursor.execute(    "UPDATE products SET stock = stock - %s WHERE id = %s",    (1, product_id)) # Deletecursor.execute("DELETE FROM products WHERE stock = 0") conn.commit()conn.close()

Safe Query Construction

from psycopg2 import sql # ❌ Unsafe - SQL injection risktable = "products"query = f"SELECT * FROM {table}"  # Dangerous! # ✅ Safe - Use sql.Identifiercursor.execute(    sql.SQL("SELECT * FROM {}").format(        sql.Identifier("products")    )) # ✅ Safe - Dynamic columnscolumns = ["name", "price", "stock"]cursor.execute(    sql.SQL("SELECT {} FROM products").format(        sql.SQL(", ").join(map(sql.Identifier, columns))    )) # ✅ Safe - Dynamic WHERE conditionsconditions = {"price": 100, "stock": 10}where_clause = sql.SQL(" AND ").join(    sql.SQL("{} > %s").format(sql.Identifier(k))    for k in conditions.keys())cursor.execute(    sql.SQL("SELECT * FROM products WHERE {}").format(where_clause),    list(conditions.values()))

JSON Support

import psycopg2from psycopg2.extras import Json, RealDictCursorimport json conn = psycopg2.connect("postgresql://postgres:password@localhost:5432/mydb")cursor = conn.cursor(cursor_factory=RealDictCursor) # Create table với JSONB columncursor.execute("""    CREATE TABLE IF NOT EXISTS users (        id SERIAL PRIMARY KEY,        username VARCHAR(50) NOT NULL,        metadata JSONB    )""") # Insert JSON datametadata = {    "preferences": {"theme": "dark", "language": "en"},    "settings": {"notifications": True}}cursor.execute(    "INSERT INTO users (username, metadata) VALUES (%s, %s)",    ("john_doe", Json(metadata))) # Query JSON fieldscursor.execute("""    SELECT * FROM users     WHERE metadata->>'preferences'->>'theme' = %s""", ("dark",)) # Update JSON fieldcursor.execute("""    UPDATE users     SET metadata = jsonb_set(metadata, '{preferences,theme}', %s)    WHERE username = %s""", (Json("light"), "john_doe")) conn.commit()conn.close()

Connection Pooling

Connection pooling tái sử dụng connections thay vì tạo mới mỗi lần.

Simple Connection Pool

from psycopg2 import poolimport psycopg2.extras class DatabasePool:    """Connection pool manager"""        def __init__(        self,        minconn: int = 1,        maxconn: int = 10,        **kwargs    ):        self.pool = pool.SimpleConnectionPool(            minconn,            maxconn,            **kwargs        )        def get_connection(self):        """Get connection từ pool"""        return self.pool.getconn()        def put_connection(self, conn):        """Return connection về pool"""        self.pool.putconn(conn)        def close_all(self):        """Close tất cả connections"""        self.pool.closeall() # Sử dụngdb_pool = DatabasePool(    minconn=2,    maxconn=10,    host="localhost",    database="mydb",    user="postgres",    password="password") # Get connectionconn = db_pool.get_connection()cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) try:    cursor.execute("SELECT * FROM products")    products = cursor.fetchall()    conn.commit()finally:    cursor.close()    db_pool.put_connection(conn) # Cleanupdb_pool.close_all()

Threaded Connection Pool

from psycopg2 import poolfrom contextlib import contextmanagerimport psycopg2.extras class ThreadedDatabasePool:    """Thread-safe connection pool"""        def __init__(self, minconn: int = 2, maxconn: int = 20, **kwargs):        self.pool = pool.ThreadedConnectionPool(            minconn,            maxconn,            **kwargs        )        @contextmanager    def get_cursor(self, cursor_factory=None):        """Context manager cho connection và cursor"""        conn = self.pool.getconn()        cursor = conn.cursor(cursor_factory=cursor_factory)                try:            yield cursor            conn.commit()        except Exception:            conn.rollback()            raise        finally:            cursor.close()            self.pool.putconn(conn)        def execute(self, query: str, params: tuple = None):        """Execute query và return results"""        with self.get_cursor(            cursor_factory=psycopg2.extras.RealDictCursor        ) as cursor:            cursor.execute(query, params or ())            try:                return cursor.fetchall()            except psycopg2.ProgrammingError:                # No results to fetch (INSERT/UPDATE/DELETE)                return None        def close(self):        """Close pool"""        self.pool.closeall() # Sử dụngdb = ThreadedDatabasePool(    minconn=2,    maxconn=20,    host="localhost",    database="mydb",    user="postgres",    password="password") # Simple queryproducts = db.execute("SELECT * FROM products WHERE price > %s", (100,))for product in products:    print(product["name"]) # With cursorwith db.get_cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:    cursor.execute("INSERT INTO products (name, price) VALUES (%s, %s)", ("Item", 50))    cursor.execute("SELECT lastval()")    product_id = cursor.fetchone()["lastval"]    print(f"Inserted ID: {product_id}") db.close()

Advanced Pool với Retry Logic

import psycopg2from psycopg2 import poolfrom contextlib import contextmanagerimport timeimport logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) class ResilientDatabasePool:    """Connection pool với retry và health checks"""        def __init__(        self,        minconn: int = 2,        maxconn: int = 20,        max_retries: int = 3,        retry_delay: float = 1.0,        **conn_params    ):        self.conn_params = conn_params        self.max_retries = max_retries        self.retry_delay = retry_delay                self.pool = pool.ThreadedConnectionPool(            minconn,            maxconn,            **conn_params        )        @contextmanager    def get_connection(self):        """Get connection với retry logic"""        conn = None        for attempt in range(self.max_retries):            try:                conn = self.pool.getconn()                                # Health check                cursor = conn.cursor()                cursor.execute("SELECT 1")                cursor.close()                                yield conn                conn.commit()                return                            except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:                logger.warning(f"Connection error (attempt {attempt + 1}): {e}")                                if conn:                    try:                        self.pool.putconn(conn, close=True)                    except:                        pass                                if attempt < self.max_retries - 1:                    time.sleep(self.retry_delay * (attempt + 1))                else:                    raise                                except Exception:                if conn:                    conn.rollback()                raise                            finally:                if conn:                    self.pool.putconn(conn)        def execute_with_retry(self, query: str, params: tuple = None):        """Execute query với retry"""        with self.get_connection() as conn:            cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)            cursor.execute(query, params or ())                        try:                return cursor.fetchall()            except psycopg2.ProgrammingError:                return None        def health_check(self) -> bool:        """Check pool health"""        try:            with self.get_connection() as conn:                cursor = conn.cursor()                cursor.execute("SELECT 1")                cursor.close()            return True        except Exception as e:            logger.error(f"Health check failed: {e}")            return False # Sử dụngdb = ResilientDatabasePool(    minconn=2,    maxconn=20,    max_retries=3,    host="localhost",    database="mydb",    user="postgres",    password="password") # Health checkif db.health_check():    logger.info("✅ Database is healthy") # Execute với auto-retrytry:    products = db.execute_with_retry(        "SELECT * FROM products WHERE price > %s",        (100,)    )except Exception as e:    logger.error(f"Failed after retries: {e}")

ORM Concepts với SQLAlchemy

ORM (Object-Relational Mapping) maps Python classes to database tables.

Cài Đặt SQLAlchemy

pip install sqlalchemypip install psycopg2-binary  # PostgreSQL driver

Basic SQLAlchemy Setup

from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKeyfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, relationshipfrom datetime import datetime # Database URLDATABASE_URL = "postgresql://postgres:password@localhost:5432/mydb" # Create engineengine = create_engine(DATABASE_URL, echo=True) # Session factorySessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Base class cho modelsBase = declarative_base() # Define modelsclass User(Base):    __tablename__ = "users"        id = Column(Integer, primary_key=True, index=True)    username = Column(String(50), unique=True, nullable=False)    email = Column(String(100), unique=True, nullable=False)    created_at = Column(DateTime, default=datetime.utcnow)        # Relationship    posts = relationship("Post", back_populates="author") class Post(Base):    __tablename__ = "posts"        id = Column(Integer, primary_key=True, index=True)    title = Column(String(200), nullable=False)    content = Column(String)    author_id = Column(Integer, ForeignKey("users.id"))    created_at = Column(DateTime, default=datetime.utcnow)        # Relationship    author = relationship("User", back_populates="posts") # Create tablesBase.metadata.create_all(bind=engine) # Use sessiondb = SessionLocal() try:    # Create user    user = User(username="john_doe", email="[email protected]")    db.add(user)    db.commit()    db.refresh(user)        # Create post    post = Post(title="My First Post", content="Hello World!", author_id=user.id)    db.add(post)    db.commit()        # Query    users = db.query(User).filter(User.username == "john_doe").all()    for user in users:        print(f"User: {user.username}")        for post in user.posts:            print(f"  Post: {post.title}")    finally:    db.close()

SQLAlchemy Repository Pattern

from sqlalchemy.orm import Sessionfrom typing import List, Optional, TypeVar, Genericfrom sqlalchemy.ext.declarative import DeclarativeMeta T = TypeVar("T") class BaseRepository(Generic[T]):    """Generic repository pattern"""        def __init__(self, model: DeclarativeMeta, db: Session):        self.model = model        self.db = db        def create(self, **kwargs) -> T:        """Create new record"""        instance = self.model(**kwargs)        self.db.add(instance)        self.db.commit()        self.db.refresh(instance)        return instance        def get(self, id: int) -> Optional[T]:        """Get by ID"""        return self.db.query(self.model).filter(self.model.id == id).first()        def get_all(self, skip: int = 0, limit: int = 100) -> List[T]:        """Get all records"""        return self.db.query(self.model).offset(skip).limit(limit).all()        def update(self, id: int, **kwargs) -> Optional[T]:        """Update record"""        instance = self.get(id)        if instance:            for key, value in kwargs.items():                setattr(instance, key, value)            self.db.commit()            self.db.refresh(instance)        return instance        def delete(self, id: int) -> bool:        """Delete record"""        instance = self.get(id)        if instance:            self.db.delete(instance)            self.db.commit()            return True        return False class UserRepository(BaseRepository[User]):    """User-specific repository"""        def __init__(self, db: Session):        super().__init__(User, db)        def get_by_username(self, username: str) -> Optional[User]:        """Get user by username"""        return self.db.query(User).filter(User.username == username).first()        def get_by_email(self, email: str) -> Optional[User]:        """Get user by email"""        return self.db.query(User).filter(User.email == email).first() # Sử dụngdb = SessionLocal()user_repo = UserRepository(db) # Createuser = user_repo.create(username="jane_doe", email="[email protected]")print(f"Created user: {user.id}") # Get by usernameuser = user_repo.get_by_username("jane_doe")if user:    print(f"Found: {user.email}") # Updateuser_repo.update(user.id, email="[email protected]") # Deleteuser_repo.delete(user.id) db.close()

5 Ứng Dụng Thực Tế

1. E-commerce Database với Connection Pool

import psycopg2from psycopg2 import poolfrom psycopg2.extras import RealDictCursorfrom contextlib import contextmanagerfrom typing import List, Optional, Dictfrom decimal import Decimal class EcommerceDatabase:    """E-commerce database với connection pooling"""        def __init__(self, **conn_params):        self.pool = pool.ThreadedConnectionPool(            minconn=2,            maxconn=20,            **conn_params        )        self.init_schema()        @contextmanager    def get_cursor(self):        """Get cursor từ pool"""        conn = self.pool.getconn()        cursor = conn.cursor(cursor_factory=RealDictCursor)        try:            yield cursor            conn.commit()        except Exception:            conn.rollback()            raise        finally:            cursor.close()            self.pool.putconn(conn)        def init_schema(self):        """Initialize database schema"""        with self.get_cursor() as cursor:            # Products table            cursor.execute("""                CREATE TABLE IF NOT EXISTS products (                    id SERIAL PRIMARY KEY,                    name VARCHAR(200) NOT NULL,                    price DECIMAL(10, 2) NOT NULL,                    stock INTEGER DEFAULT 0,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)                        # Orders table            cursor.execute("""                CREATE TABLE IF NOT EXISTS orders (                    id SERIAL PRIMARY KEY,                    customer_name VARCHAR(100) NOT NULL,                    customer_email VARCHAR(100) NOT NULL,                    total_amount DECIMAL(10, 2) NOT NULL,                    status VARCHAR(20) DEFAULT 'pending',                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)                        # Order items table            cursor.execute("""                CREATE TABLE IF NOT EXISTS order_items (                    id SERIAL PRIMARY KEY,                    order_id INTEGER REFERENCES orders(id),                    product_id INTEGER REFERENCES products(id),                    quantity INTEGER NOT NULL,                    price DECIMAL(10, 2) NOT NULL                )            """)        def add_product(self, name: str, price: Decimal, stock: int) -> int:        """Add new product"""        with self.get_cursor() as cursor:            cursor.execute(                "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s) RETURNING id",                (name, price, stock)            )            return cursor.fetchone()["id"]        def create_order(        self,        customer_name: str,        customer_email: str,        items: List[Dict]  # [{"product_id": 1, "quantity": 2}, ...]    ) -> Optional[int]:        """Create order với transaction"""        with self.get_cursor() as cursor:            try:                # Calculate total và check stock                total_amount = Decimal("0.00")                                for item in items:                    # Check stock                    cursor.execute(                        "SELECT price, stock FROM products WHERE id = %s FOR UPDATE",                        (item["product_id"],)                    )                    product = cursor.fetchone()                                        if not product or product["stock"] < item["quantity"]:                        raise ValueError(f"Insufficient stock for product {item['product_id']}")                                        total_amount += product["price"] * item["quantity"]                                # Create order                cursor.execute(                    "INSERT INTO orders (customer_name, customer_email, total_amount) VALUES (%s, %s, %s) RETURNING id",                    (customer_name, customer_email, total_amount)                )                order_id = cursor.fetchone()["id"]                                # Create order items và update stock                for item in items:                    cursor.execute(                        "SELECT price FROM products WHERE id = %s",                        (item["product_id"],)                    )                    price = cursor.fetchone()["price"]                                        cursor.execute(                        "INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (%s, %s, %s, %s)",                        (order_id, item["product_id"], item["quantity"], price)                    )                                        cursor.execute(                        "UPDATE products SET stock = stock - %s WHERE id = %s",                        (item["quantity"], item["product_id"])                    )                                return order_id                            except Exception as e:                print(f"Order creation failed: {e}")                raise        def get_order_details(self, order_id: int) -> Optional[Dict]:        """Get order với items"""        with self.get_cursor() as cursor:            # Get order            cursor.execute(                "SELECT * FROM orders WHERE id = %s",                (order_id,)            )            order = cursor.fetchone()                        if not order:                return None                        # Get order items            cursor.execute("""                SELECT                     oi.quantity,                    oi.price,                    p.name as product_name                FROM order_items oi                JOIN products p ON oi.product_id = p.id                WHERE oi.order_id = %s            """, (order_id,))                        order["items"] = cursor.fetchall()            return dict(order) # Sử dụngecommerce = EcommerceDatabase(    host="localhost",    database="ecommerce",    user="postgres",    password="password") # Add productslaptop_id = ecommerce.add_product("Laptop", Decimal("999.99"), 10)mouse_id = ecommerce.add_product("Mouse", Decimal("29.99"), 50) # Create orderorder_id = ecommerce.create_order(    "John Doe",    "[email protected]",    [        {"product_id": laptop_id, "quantity": 1},        {"product_id": mouse_id, "quantity": 2}    ]) if order_id:    print(f"✅ Order created: {order_id}")        # Get order details    order = ecommerce.get_order_details(order_id)    print(f"Total: ${order['total_amount']}")    print("Items:")    for item in order["items"]:        print(f"  {item['product_name']}: {item['quantity']} x ${item['price']}")

2. Analytics Database với Materialized Views

import psycopg2from psycopg2.extras import RealDictCursorfrom typing import List, Dictfrom datetime import datetime, timedelta class AnalyticsDatabase:    """Analytics database với materialized views"""        def __init__(self, conn_string: str):        self.conn_string = conn_string        self.init_schema()        def get_connection(self):        """Get connection"""        return psycopg2.connect(self.conn_string)        def init_schema(self):        """Initialize schema"""        with self.get_connection() as conn:            cursor = conn.cursor()                        # Events table            cursor.execute("""                CREATE TABLE IF NOT EXISTS events (                    id SERIAL PRIMARY KEY,                    user_id INTEGER NOT NULL,                    event_type VARCHAR(50) NOT NULL,                    event_data JSONB,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)                        # Index for faster queries            cursor.execute("""                CREATE INDEX IF NOT EXISTS idx_events_created_at                 ON events(created_at)            """)                        cursor.execute("""                CREATE INDEX IF NOT EXISTS idx_events_user_id                 ON events(user_id)            """)                        # Materialized view cho daily stats            cursor.execute("""                CREATE MATERIALIZED VIEW IF NOT EXISTS daily_stats AS                SELECT                     DATE(created_at) as date,                    event_type,                    COUNT(*) as event_count,                    COUNT(DISTINCT user_id) as unique_users                FROM events                GROUP BY DATE(created_at), event_type            """)                        # Index on materialized view            cursor.execute("""                CREATE UNIQUE INDEX IF NOT EXISTS idx_daily_stats_date_type                ON daily_stats(date, event_type)            """)        def track_event(self, user_id: int, event_type: str, event_data: Dict = None):        """Track user event"""        with self.get_connection() as conn:            cursor = conn.cursor()            cursor.execute(                "INSERT INTO events (user_id, event_type, event_data) VALUES (%s, %s, %s)",                (user_id, event_type, psycopg2.extras.Json(event_data or {}))            )        def refresh_stats(self):        """Refresh materialized view"""        with self.get_connection() as conn:            cursor = conn.cursor()            cursor.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY daily_stats")            print("✅ Stats refreshed")        def get_daily_stats(self, days: int = 7) -> List[Dict]:        """Get daily statistics"""        with self.get_connection() as conn:            cursor = conn.cursor(cursor_factory=RealDictCursor)                        start_date = datetime.now() - timedelta(days=days)            cursor.execute(                "SELECT * FROM daily_stats WHERE date >= %s ORDER BY date DESC",                (start_date.date(),)            )            return cursor.fetchall()        def get_user_activity(self, user_id: int, limit: int = 100) -> List[Dict]:        """Get user activity"""        with self.get_connection() as conn:            cursor = conn.cursor(cursor_factory=RealDictCursor)            cursor.execute(                "SELECT * FROM events WHERE user_id = %s ORDER BY created_at DESC LIMIT %s",                (user_id, limit)            )            return cursor.fetchall() # Sử dụnganalytics = AnalyticsDatabase("postgresql://postgres:password@localhost:5432/analytics") # Track eventsanalytics.track_event(1, "page_view", {"page": "/home"})analytics.track_event(1, "button_click", {"button": "signup"})analytics.track_event(2, "page_view", {"page": "/products"}) # Refresh stats (run periodically, e.g., via cron)analytics.refresh_stats() # Get statsstats = analytics.get_daily_stats(7)for stat in stats:    print(f"{stat['date']}: {stat['event_type']} - {stat['event_count']} events, {stat['unique_users']} users")

3. Multi-Tenant Database

import psycopg2from psycopg2.extras import RealDictCursorfrom contextlib import contextmanagerfrom typing import Optional, List, Dict class MultiTenantDatabase:    """Multi-tenant database với schema isolation"""        def __init__(self, conn_string: str):        self.conn_string = conn_string        self.init_public_schema()        @contextmanager    def get_connection(self, schema: str = "public"):        """Get connection với schema context"""        conn = psycopg2.connect(self.conn_string)        cursor = conn.cursor(cursor_factory=RealDictCursor)                try:            # Set schema            cursor.execute(f"SET search_path TO {schema}")            yield cursor            conn.commit()        except Exception:            conn.rollback()            raise        finally:            cursor.close()            conn.close()        def init_public_schema(self):        """Initialize public schema với tenants table"""        with self.get_connection() as cursor:            cursor.execute("""                CREATE TABLE IF NOT EXISTS tenants (                    id SERIAL PRIMARY KEY,                    name VARCHAR(100) NOT NULL,                    schema_name VARCHAR(63) NOT NULL UNIQUE,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)        def create_tenant(self, name: str) -> str:        """Create new tenant với isolated schema"""        schema_name = f"tenant_{name.lower().replace(' ', '_')}"                with self.get_connection() as cursor:            # Create schema            cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")                        # Register tenant            cursor.execute(                "INSERT INTO tenants (name, schema_name) VALUES (%s, %s) RETURNING id",                (name, schema_name)            )            tenant_id = cursor.fetchone()["id"]                        print(f"✅ Created tenant: {name} (ID: {tenant_id}, Schema: {schema_name})")                # Initialize tenant schema        self.init_tenant_schema(schema_name)                return schema_name        def init_tenant_schema(self, schema: str):        """Initialize tenant-specific tables"""        with self.get_connection(schema) as cursor:            cursor.execute("""                CREATE TABLE IF NOT EXISTS users (                    id SERIAL PRIMARY KEY,                    username VARCHAR(50) NOT NULL UNIQUE,                    email VARCHAR(100) NOT NULL,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)                        cursor.execute("""                CREATE TABLE IF NOT EXISTS data (                    id SERIAL PRIMARY KEY,                    user_id INTEGER REFERENCES users(id),                    content TEXT,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)        def add_user(self, schema: str, username: str, email: str) -> int:        """Add user to tenant"""        with self.get_connection(schema) as cursor:            cursor.execute(                "INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",                (username, email)            )            return cursor.fetchone()["id"]        def get_tenant_users(self, schema: str) -> List[Dict]:        """Get all users for tenant"""        with self.get_connection(schema) as cursor:            cursor.execute("SELECT * FROM users")            return cursor.fetchall() # Sử dụngmulti_tenant = MultiTenantDatabase("postgresql://postgres:password@localhost:5432/saas") # Create tenantsacme_schema = multi_tenant.create_tenant("Acme Corp")globex_schema = multi_tenant.create_tenant("Globex Inc") # Add users to different tenantsacme_user = multi_tenant.add_user(acme_schema, "john_acme", "[email protected]")globex_user = multi_tenant.add_user(globex_schema, "jane_globex", "[email protected]") # Get tenant users (isolated)acme_users = multi_tenant.get_tenant_users(acme_schema)print(f"Acme users: {len(acme_users)}")  # 1 user globex_users = multi_tenant.get_tenant_users(globex_schema)print(f"Globex users: {len(globex_users)}")  # 1 user

4. Database Migration System

import psycopg2from typing import List, Callablefrom datetime import datetimeimport hashlib class Migration:    """Single migration"""        def __init__(self, version: str, name: str, up: Callable, down: Callable):        self.version = version        self.name = name        self.up = up        self.down = down        self.checksum = self._calculate_checksum()        def _calculate_checksum(self) -> str:        """Calculate migration checksum"""        content = f"{self.version}{self.name}{self.up.__code__.co_code}"        return hashlib.md5(content.encode()).hexdigest() class MigrationManager:    """Database migration manager"""        def __init__(self, conn_string: str):        self.conn_string = conn_string        self.migrations: List[Migration] = []        self.init_migrations_table()        def get_connection(self):        """Get database connection"""        return psycopg2.connect(self.conn_string)        def init_migrations_table(self):        """Initialize migrations tracking table"""        with self.get_connection() as conn:            cursor = conn.cursor()            cursor.execute("""                CREATE TABLE IF NOT EXISTS schema_migrations (                    version VARCHAR(20) PRIMARY KEY,                    name VARCHAR(200) NOT NULL,                    checksum VARCHAR(32) NOT NULL,                    applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)        def register(self, version: str, name: str):        """Decorator để register migration"""        def decorator(func):            # Extract up and down from function            def up(cursor):                func(cursor)                        def down(cursor):                pass  # Implement rollback logic                        migration = Migration(version, name, up, down)            self.migrations.append(migration)            return func                return decorator        def get_applied_migrations(self) -> List[str]:        """Get list of applied migrations"""        with self.get_connection() as conn:            cursor = conn.cursor()            cursor.execute("SELECT version FROM schema_migrations ORDER BY version")            return [row[0] for row in cursor.fetchall()]        def migrate(self):        """Apply pending migrations"""        applied = self.get_applied_migrations()        pending = [m for m in self.migrations if m.version not in applied]                if not pending:            print("✅ No pending migrations")            return                print(f"Applying {len(pending)} migrations...")                for migration in sorted(pending, key=lambda m: m.version):            with self.get_connection() as conn:                cursor = conn.cursor()                                try:                    print(f"  Applying {migration.version}: {migration.name}")                                        # Run migration                    migration.up(cursor)                                        # Record migration                    cursor.execute(                        "INSERT INTO schema_migrations (version, name, checksum) VALUES (%s, %s, %s)",                        (migration.version, migration.name, migration.checksum)                    )                                        conn.commit()                    print(f"  ✅ Applied {migration.version}")                                    except Exception as e:                    conn.rollback()                    print(f"  ❌ Failed {migration.version}: {e}")                    raise # Sử dụngmigrator = MigrationManager("postgresql://postgres:password@localhost:5432/mydb") # Define migrations@migrator.register("001", "create_users_table")def migration_001(cursor):    cursor.execute("""        CREATE TABLE users (            id SERIAL PRIMARY KEY,            username VARCHAR(50) NOT NULL UNIQUE,            email VARCHAR(100) NOT NULL        )    """) @migrator.register("002", "add_users_created_at")def migration_002(cursor):    cursor.execute("""        ALTER TABLE users         ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP    """) @migrator.register("003", "create_posts_table")def migration_003(cursor):    cursor.execute("""        CREATE TABLE posts (            id SERIAL PRIMARY KEY,            user_id INTEGER REFERENCES users(id),            title VARCHAR(200) NOT NULL,            content TEXT,            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP        )    """) # Run migrationsmigrator.migrate()

5. Database Backup and Restore

import psycopg2import subprocessfrom datetime import datetimefrom pathlib import Pathimport gzipimport shutil class DatabaseBackup:    """Database backup và restore utility"""        def __init__(        self,        host: str,        database: str,        user: str,        password: str,        backup_dir: str = "./backups"    ):        self.host = host        self.database = database        self.user = user        self.password = password        self.backup_dir = Path(backup_dir)        self.backup_dir.mkdir(exist_ok=True)        def backup(self, compress: bool = True) -> str:        """Backup database"""        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")        filename = f"{self.database}_{timestamp}.sql"        filepath = self.backup_dir / filename                # pg_dump command        cmd = [            "pg_dump",            "-h", self.host,            "-U", self.user,            "-d", self.database,            "-f", str(filepath),            "--no-owner",            "--no-acl"        ]                # Set password        env = {"PGPASSWORD": self.password}                print(f"Creating backup: {filename}")        subprocess.run(cmd, env=env, check=True)                # Compress        if compress:            compressed_path = f"{filepath}.gz"            with open(filepath, "rb") as f_in:                with gzip.open(compressed_path, "wb") as f_out:                    shutil.copyfileobj(f_in, f_out)                        filepath.unlink()  # Remove uncompressed            filepath = Path(compressed_path)            print(f"✅ Compressed backup: {compressed_path}")                print(f"✅ Backup created: {filepath}")        return str(filepath)        def restore(self, backup_file: str):        """Restore database từ backup"""        backup_path = Path(backup_file)                if not backup_path.exists():            raise FileNotFoundError(f"Backup file not found: {backup_file}")                # Decompress if needed        if backup_path.suffix == ".gz":            decompressed = backup_path.with_suffix("")            with gzip.open(backup_path, "rb") as f_in:                with open(decompressed, "wb") as f_out:                    shutil.copyfileobj(f_in, f_out)            backup_path = decompressed                # psql command        cmd = [            "psql",            "-h", self.host,            "-U", self.user,            "-d", self.database,            "-f", str(backup_path)        ]                env = {"PGPASSWORD": self.password}                print(f"Restoring from: {backup_path}")        subprocess.run(cmd, env=env, check=True)        print(f"✅ Database restored")        def list_backups(self) -> list:        """List all backups"""        backups = sorted(            self.backup_dir.glob(f"{self.database}_*.sql*"),            reverse=True        )        return [str(b) for b in backups]        def cleanup_old_backups(self, keep_count: int = 5):        """Keep only recent backups"""        backups = self.list_backups()                if len(backups) > keep_count:            for backup in backups[keep_count:]:                Path(backup).unlink()                print(f"Removed old backup: {backup}") # Sử dụngbackup_manager = DatabaseBackup(    host="localhost",    database="mydb",    user="postgres",    password="password",    backup_dir="./backups") # Create backupbackup_file = backup_manager.backup(compress=True) # List backupsbackups = backup_manager.list_backups()print(f"Available backups: {len(backups)}") # Restore (nếu cần)# backup_manager.restore(backup_file) # Cleanup old backupsbackup_manager.cleanup_old_backups(keep_count=5)

Best Practices

1. Connection Management

# ✅ Always use context managerswith psycopg2.connect(conn_string) as conn:    cursor = conn.cursor()    # Auto commit/rollback # ✅ Use connection pooling cho concurrent requestspool = psycopg2.pool.ThreadedConnectionPool(2, 20, conn_string) # ❌ Don't create connection per requestconn = psycopg2.connect(conn_string)  # Expensive!

2. SQL Injection Prevention

# ✅ Use parameterized queriescursor.execute("SELECT * FROM users WHERE username = %s", (username,)) # ❌ Never use string formattingcursor.execute(f"SELECT * FROM users WHERE username = '{username}'")  # Dangerous!

3. Transaction Management

# ✅ Use transactions cho multiple operationswith conn:    cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")    cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")    # Auto commit both or rollback both # ✅ Explicit transaction controlcursor.execute("BEGIN")try:    # operations    cursor.execute("COMMIT")except:    cursor.execute("ROLLBACK")    raise

4. Error Handling

import psycopg2from psycopg2 import errorcodes try:    cursor.execute(query)except psycopg2.IntegrityError as e:    if e.pgcode == errorcodes.UNIQUE_VIOLATION:        print("Duplicate entry")except psycopg2.OperationalError:    print("Connection error")except psycopg2.DatabaseError as e:    print(f"Database error: {e}")

Bài Tập Thực Hành

Bài 1: Connection Pool

Implement connection pool với health checks và automatic reconnection.

Bài 2: Repository Pattern

Tạo generic repository pattern cho User và Post models với PostgreSQL.

Bài 3: Migration System

Implement database migration system với version tracking và rollback.

Bài 4: Multi-Tenant App

Tạo multi-tenant application với schema isolation.

Bài 5: Backup Automation

Implement automated backup system với scheduled backups và retention policy.

Tóm Tắt

Trong Part 2 chúng ta đã học:

  1. PostgreSQL - psycopg2, CRUD operations, JSON support
  2. Connection Pooling - SimpleConnectionPool, ThreadedConnectionPool
  3. ORM Concepts - SQLAlchemy basics, repository pattern
  4. Advanced Patterns - E-commerce, analytics, multi-tenant, migrations, backups
  5. Best Practices - Connection management, SQL injection prevention, transactions

Database operations là core của applications - master các concepts này để build scalable, secure applications!


Bài tiếp theo: Bài 17: Logging - Học cách implement comprehensive logging system! 📝