Bài 16: Working với Databases - PostgresQL
PostgreSQL với psycopg2
PostgreSQL là production-grade database với nhiều features advanced.
Cài Đặt
# Install psycopg2pip install psycopg2-binary # Or compile từ source (production)pip install psycopg2
Kết Nối PostgreSQL
import psycopg2from psycopg2 import sql # Kết nối databaseconn = psycopg2.connect( host="localhost", port=5432, database="mydb", user="postgres", password="password") # Hoặc dùng connection stringconn = psycopg2.connect( "postgresql://postgres:password@localhost:5432/mydb") cursor = conn.cursor() # Execute querycursor.execute("SELECT version()")version = cursor.fetchone()print(f"PostgreSQL version: {version[0]}") conn.close()
CRUD Operations với PostgreSQL
import psycopg2from psycopg2.extras import RealDictCursor conn = psycopg2.connect( "postgresql://postgres:password@localhost:5432/mydb") # Use RealDictCursor để get dict resultscursor = conn.cursor(cursor_factory=RealDictCursor) # Create tablecursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, price DECIMAL(10, 2) NOT NULL, stock INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") # Insertcursor.execute( "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s) RETURNING id", ("Laptop", 999.99, 10))product_id = cursor.fetchone()["id"]print(f"Inserted product ID: {product_id}") # Insert manyproducts = [ ("Mouse", 29.99, 50), ("Keyboard", 79.99, 30), ("Monitor", 299.99, 15),]cursor.executemany( "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s)", products) # Querycursor.execute("SELECT * FROM products WHERE price > %s", (50,))for product in cursor.fetchall(): print(f"{product['name']}: ${product['price']}") # Updatecursor.execute( "UPDATE products SET stock = stock - %s WHERE id = %s", (1, product_id)) # Deletecursor.execute("DELETE FROM products WHERE stock = 0") conn.commit()conn.close()
Safe Query Construction
from psycopg2 import sql # ❌ Unsafe - SQL injection risktable = "products"query = f"SELECT * FROM {table}" # Dangerous! # ✅ Safe - Use sql.Identifiercursor.execute( sql.SQL("SELECT * FROM {}").format( sql.Identifier("products") )) # ✅ Safe - Dynamic columnscolumns = ["name", "price", "stock"]cursor.execute( sql.SQL("SELECT {} FROM products").format( sql.SQL(", ").join(map(sql.Identifier, columns)) )) # ✅ Safe - Dynamic WHERE conditionsconditions = {"price": 100, "stock": 10}where_clause = sql.SQL(" AND ").join( sql.SQL("{} > %s").format(sql.Identifier(k)) for k in conditions.keys())cursor.execute( sql.SQL("SELECT * FROM products WHERE {}").format(where_clause), list(conditions.values()))
JSON Support
import psycopg2from psycopg2.extras import Json, RealDictCursorimport json conn = psycopg2.connect("postgresql://postgres:password@localhost:5432/mydb")cursor = conn.cursor(cursor_factory=RealDictCursor) # Create table với JSONB columncursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, username VARCHAR(50) NOT NULL, metadata JSONB )""") # Insert JSON datametadata = { "preferences": {"theme": "dark", "language": "en"}, "settings": {"notifications": True}}cursor.execute( "INSERT INTO users (username, metadata) VALUES (%s, %s)", ("john_doe", Json(metadata))) # Query JSON fieldscursor.execute(""" SELECT * FROM users WHERE metadata->>'preferences'->>'theme' = %s""", ("dark",)) # Update JSON fieldcursor.execute(""" UPDATE users SET metadata = jsonb_set(metadata, '{preferences,theme}', %s) WHERE username = %s""", (Json("light"), "john_doe")) conn.commit()conn.close()
Connection Pooling
Connection pooling tái sử dụng connections thay vì tạo mới mỗi lần.
Simple Connection Pool
from psycopg2 import poolimport psycopg2.extras class DatabasePool: """Connection pool manager""" def __init__( self, minconn: int = 1, maxconn: int = 10, **kwargs ): self.pool = pool.SimpleConnectionPool( minconn, maxconn, **kwargs ) def get_connection(self): """Get connection từ pool""" return self.pool.getconn() def put_connection(self, conn): """Return connection về pool""" self.pool.putconn(conn) def close_all(self): """Close tất cả connections""" self.pool.closeall() # Sử dụngdb_pool = DatabasePool( minconn=2, maxconn=10, host="localhost", database="mydb", user="postgres", password="password") # Get connectionconn = db_pool.get_connection()cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) try: cursor.execute("SELECT * FROM products") products = cursor.fetchall() conn.commit()finally: cursor.close() db_pool.put_connection(conn) # Cleanupdb_pool.close_all()
Threaded Connection Pool
from psycopg2 import poolfrom contextlib import contextmanagerimport psycopg2.extras class ThreadedDatabasePool: """Thread-safe connection pool""" def __init__(self, minconn: int = 2, maxconn: int = 20, **kwargs): self.pool = pool.ThreadedConnectionPool( minconn, maxconn, **kwargs ) @contextmanager def get_cursor(self, cursor_factory=None): """Context manager cho connection và cursor""" conn = self.pool.getconn() cursor = conn.cursor(cursor_factory=cursor_factory) try: yield cursor conn.commit() except Exception: conn.rollback() raise finally: cursor.close() self.pool.putconn(conn) def execute(self, query: str, params: tuple = None): """Execute query và return results""" with self.get_cursor( cursor_factory=psycopg2.extras.RealDictCursor ) as cursor: cursor.execute(query, params or ()) try: return cursor.fetchall() except psycopg2.ProgrammingError: # No results to fetch (INSERT/UPDATE/DELETE) return None def close(self): """Close pool""" self.pool.closeall() # Sử dụngdb = ThreadedDatabasePool( minconn=2, maxconn=20, host="localhost", database="mydb", user="postgres", password="password") # Simple queryproducts = db.execute("SELECT * FROM products WHERE price > %s", (100,))for product in products: print(product["name"]) # With cursorwith db.get_cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute("INSERT INTO products (name, price) VALUES (%s, %s)", ("Item", 50)) cursor.execute("SELECT lastval()") product_id = cursor.fetchone()["lastval"] print(f"Inserted ID: {product_id}") db.close()
Advanced Pool với Retry Logic
import psycopg2from psycopg2 import poolfrom contextlib import contextmanagerimport timeimport logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) class ResilientDatabasePool: """Connection pool với retry và health checks""" def __init__( self, minconn: int = 2, maxconn: int = 20, max_retries: int = 3, retry_delay: float = 1.0, **conn_params ): self.conn_params = conn_params self.max_retries = max_retries self.retry_delay = retry_delay self.pool = pool.ThreadedConnectionPool( minconn, maxconn, **conn_params ) @contextmanager def get_connection(self): """Get connection với retry logic""" conn = None for attempt in range(self.max_retries): try: conn = self.pool.getconn() # Health check cursor = conn.cursor() cursor.execute("SELECT 1") cursor.close() yield conn conn.commit() return except (psycopg2.OperationalError, psycopg2.InterfaceError) as e: logger.warning(f"Connection error (attempt {attempt + 1}): {e}") if conn: try: self.pool.putconn(conn, close=True) except: pass if attempt < self.max_retries - 1: time.sleep(self.retry_delay * (attempt + 1)) else: raise except Exception: if conn: conn.rollback() raise finally: if conn: self.pool.putconn(conn) def execute_with_retry(self, query: str, params: tuple = None): """Execute query với retry""" with self.get_connection() as conn: cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute(query, params or ()) try: return cursor.fetchall() except psycopg2.ProgrammingError: return None def health_check(self) -> bool: """Check pool health""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.close() return True except Exception as e: logger.error(f"Health check failed: {e}") return False # Sử dụngdb = ResilientDatabasePool( minconn=2, maxconn=20, max_retries=3, host="localhost", database="mydb", user="postgres", password="password") # Health checkif db.health_check(): logger.info("✅ Database is healthy") # Execute với auto-retrytry: products = db.execute_with_retry( "SELECT * FROM products WHERE price > %s", (100,) )except Exception as e: logger.error(f"Failed after retries: {e}")
ORM Concepts với SQLAlchemy
ORM (Object-Relational Mapping) maps Python classes to database tables.
Cài Đặt SQLAlchemy
pip install sqlalchemypip install psycopg2-binary # PostgreSQL driver
Basic SQLAlchemy Setup
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKeyfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, relationshipfrom datetime import datetime # Database URLDATABASE_URL = "postgresql://postgres:password@localhost:5432/mydb" # Create engineengine = create_engine(DATABASE_URL, echo=True) # Session factorySessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Base class cho modelsBase = declarative_base() # Define modelsclass User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) username = Column(String(50), unique=True, nullable=False) email = Column(String(100), unique=True, nullable=False) created_at = Column(DateTime, default=datetime.utcnow) # Relationship posts = relationship("Post", back_populates="author") class Post(Base): __tablename__ = "posts" id = Column(Integer, primary_key=True, index=True) title = Column(String(200), nullable=False) content = Column(String) author_id = Column(Integer, ForeignKey("users.id")) created_at = Column(DateTime, default=datetime.utcnow) # Relationship author = relationship("User", back_populates="posts") # Create tablesBase.metadata.create_all(bind=engine) # Use sessiondb = SessionLocal() try: # Create user user = User(username="john_doe", email="[email protected]") db.add(user) db.commit() db.refresh(user) # Create post post = Post(title="My First Post", content="Hello World!", author_id=user.id) db.add(post) db.commit() # Query users = db.query(User).filter(User.username == "john_doe").all() for user in users: print(f"User: {user.username}") for post in user.posts: print(f" Post: {post.title}") finally: db.close()
SQLAlchemy Repository Pattern
from sqlalchemy.orm import Sessionfrom typing import List, Optional, TypeVar, Genericfrom sqlalchemy.ext.declarative import DeclarativeMeta T = TypeVar("T") class BaseRepository(Generic[T]): """Generic repository pattern""" def __init__(self, model: DeclarativeMeta, db: Session): self.model = model self.db = db def create(self, **kwargs) -> T: """Create new record""" instance = self.model(**kwargs) self.db.add(instance) self.db.commit() self.db.refresh(instance) return instance def get(self, id: int) -> Optional[T]: """Get by ID""" return self.db.query(self.model).filter(self.model.id == id).first() def get_all(self, skip: int = 0, limit: int = 100) -> List[T]: """Get all records""" return self.db.query(self.model).offset(skip).limit(limit).all() def update(self, id: int, **kwargs) -> Optional[T]: """Update record""" instance = self.get(id) if instance: for key, value in kwargs.items(): setattr(instance, key, value) self.db.commit() self.db.refresh(instance) return instance def delete(self, id: int) -> bool: """Delete record""" instance = self.get(id) if instance: self.db.delete(instance) self.db.commit() return True return False class UserRepository(BaseRepository[User]): """User-specific repository""" def __init__(self, db: Session): super().__init__(User, db) def get_by_username(self, username: str) -> Optional[User]: """Get user by username""" return self.db.query(User).filter(User.username == username).first() def get_by_email(self, email: str) -> Optional[User]: """Get user by email""" return self.db.query(User).filter(User.email == email).first() # Sử dụngdb = SessionLocal()user_repo = UserRepository(db) # Createuser = user_repo.create(username="jane_doe", email="[email protected]")print(f"Created user: {user.id}") # Get by usernameuser = user_repo.get_by_username("jane_doe")if user: print(f"Found: {user.email}") # Updateuser_repo.update(user.id, email="[email protected]") # Deleteuser_repo.delete(user.id) db.close()
5 Ứng Dụng Thực Tế
1. E-commerce Database với Connection Pool
import psycopg2from psycopg2 import poolfrom psycopg2.extras import RealDictCursorfrom contextlib import contextmanagerfrom typing import List, Optional, Dictfrom decimal import Decimal class EcommerceDatabase: """E-commerce database với connection pooling""" def __init__(self, **conn_params): self.pool = pool.ThreadedConnectionPool( minconn=2, maxconn=20, **conn_params ) self.init_schema() @contextmanager def get_cursor(self): """Get cursor từ pool""" conn = self.pool.getconn() cursor = conn.cursor(cursor_factory=RealDictCursor) try: yield cursor conn.commit() except Exception: conn.rollback() raise finally: cursor.close() self.pool.putconn(conn) def init_schema(self): """Initialize database schema""" with self.get_cursor() as cursor: # Products table cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id SERIAL PRIMARY KEY, name VARCHAR(200) NOT NULL, price DECIMAL(10, 2) NOT NULL, stock INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Orders table cursor.execute(""" CREATE TABLE IF NOT EXISTS orders ( id SERIAL PRIMARY KEY, customer_name VARCHAR(100) NOT NULL, customer_email VARCHAR(100) NOT NULL, total_amount DECIMAL(10, 2) NOT NULL, status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Order items table cursor.execute(""" CREATE TABLE IF NOT EXISTS order_items ( id SERIAL PRIMARY KEY, order_id INTEGER REFERENCES orders(id), product_id INTEGER REFERENCES products(id), quantity INTEGER NOT NULL, price DECIMAL(10, 2) NOT NULL ) """) def add_product(self, name: str, price: Decimal, stock: int) -> int: """Add new product""" with self.get_cursor() as cursor: cursor.execute( "INSERT INTO products (name, price, stock) VALUES (%s, %s, %s) RETURNING id", (name, price, stock) ) return cursor.fetchone()["id"] def create_order( self, customer_name: str, customer_email: str, items: List[Dict] # [{"product_id": 1, "quantity": 2}, ...] ) -> Optional[int]: """Create order với transaction""" with self.get_cursor() as cursor: try: # Calculate total và check stock total_amount = Decimal("0.00") for item in items: # Check stock cursor.execute( "SELECT price, stock FROM products WHERE id = %s FOR UPDATE", (item["product_id"],) ) product = cursor.fetchone() if not product or product["stock"] < item["quantity"]: raise ValueError(f"Insufficient stock for product {item['product_id']}") total_amount += product["price"] * item["quantity"] # Create order cursor.execute( "INSERT INTO orders (customer_name, customer_email, total_amount) VALUES (%s, %s, %s) RETURNING id", (customer_name, customer_email, total_amount) ) order_id = cursor.fetchone()["id"] # Create order items và update stock for item in items: cursor.execute( "SELECT price FROM products WHERE id = %s", (item["product_id"],) ) price = cursor.fetchone()["price"] cursor.execute( "INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (%s, %s, %s, %s)", (order_id, item["product_id"], item["quantity"], price) ) cursor.execute( "UPDATE products SET stock = stock - %s WHERE id = %s", (item["quantity"], item["product_id"]) ) return order_id except Exception as e: print(f"Order creation failed: {e}") raise def get_order_details(self, order_id: int) -> Optional[Dict]: """Get order với items""" with self.get_cursor() as cursor: # Get order cursor.execute( "SELECT * FROM orders WHERE id = %s", (order_id,) ) order = cursor.fetchone() if not order: return None # Get order items cursor.execute(""" SELECT oi.quantity, oi.price, p.name as product_name FROM order_items oi JOIN products p ON oi.product_id = p.id WHERE oi.order_id = %s """, (order_id,)) order["items"] = cursor.fetchall() return dict(order) # Sử dụngecommerce = EcommerceDatabase( host="localhost", database="ecommerce", user="postgres", password="password") # Add productslaptop_id = ecommerce.add_product("Laptop", Decimal("999.99"), 10)mouse_id = ecommerce.add_product("Mouse", Decimal("29.99"), 50) # Create orderorder_id = ecommerce.create_order( "John Doe", "[email protected]", [ {"product_id": laptop_id, "quantity": 1}, {"product_id": mouse_id, "quantity": 2} ]) if order_id: print(f"✅ Order created: {order_id}") # Get order details order = ecommerce.get_order_details(order_id) print(f"Total: ${order['total_amount']}") print("Items:") for item in order["items"]: print(f" {item['product_name']}: {item['quantity']} x ${item['price']}")
2. Analytics Database với Materialized Views
import psycopg2from psycopg2.extras import RealDictCursorfrom typing import List, Dictfrom datetime import datetime, timedelta class AnalyticsDatabase: """Analytics database với materialized views""" def __init__(self, conn_string: str): self.conn_string = conn_string self.init_schema() def get_connection(self): """Get connection""" return psycopg2.connect(self.conn_string) def init_schema(self): """Initialize schema""" with self.get_connection() as conn: cursor = conn.cursor() # Events table cursor.execute(""" CREATE TABLE IF NOT EXISTS events ( id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL, event_type VARCHAR(50) NOT NULL, event_data JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Index for faster queries cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_events_user_id ON events(user_id) """) # Materialized view cho daily stats cursor.execute(""" CREATE MATERIALIZED VIEW IF NOT EXISTS daily_stats AS SELECT DATE(created_at) as date, event_type, COUNT(*) as event_count, COUNT(DISTINCT user_id) as unique_users FROM events GROUP BY DATE(created_at), event_type """) # Index on materialized view cursor.execute(""" CREATE UNIQUE INDEX IF NOT EXISTS idx_daily_stats_date_type ON daily_stats(date, event_type) """) def track_event(self, user_id: int, event_type: str, event_data: Dict = None): """Track user event""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO events (user_id, event_type, event_data) VALUES (%s, %s, %s)", (user_id, event_type, psycopg2.extras.Json(event_data or {})) ) def refresh_stats(self): """Refresh materialized view""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY daily_stats") print("✅ Stats refreshed") def get_daily_stats(self, days: int = 7) -> List[Dict]: """Get daily statistics""" with self.get_connection() as conn: cursor = conn.cursor(cursor_factory=RealDictCursor) start_date = datetime.now() - timedelta(days=days) cursor.execute( "SELECT * FROM daily_stats WHERE date >= %s ORDER BY date DESC", (start_date.date(),) ) return cursor.fetchall() def get_user_activity(self, user_id: int, limit: int = 100) -> List[Dict]: """Get user activity""" with self.get_connection() as conn: cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute( "SELECT * FROM events WHERE user_id = %s ORDER BY created_at DESC LIMIT %s", (user_id, limit) ) return cursor.fetchall() # Sử dụnganalytics = AnalyticsDatabase("postgresql://postgres:password@localhost:5432/analytics") # Track eventsanalytics.track_event(1, "page_view", {"page": "/home"})analytics.track_event(1, "button_click", {"button": "signup"})analytics.track_event(2, "page_view", {"page": "/products"}) # Refresh stats (run periodically, e.g., via cron)analytics.refresh_stats() # Get statsstats = analytics.get_daily_stats(7)for stat in stats: print(f"{stat['date']}: {stat['event_type']} - {stat['event_count']} events, {stat['unique_users']} users")
3. Multi-Tenant Database
import psycopg2from psycopg2.extras import RealDictCursorfrom contextlib import contextmanagerfrom typing import Optional, List, Dict class MultiTenantDatabase: """Multi-tenant database với schema isolation""" def __init__(self, conn_string: str): self.conn_string = conn_string self.init_public_schema() @contextmanager def get_connection(self, schema: str = "public"): """Get connection với schema context""" conn = psycopg2.connect(self.conn_string) cursor = conn.cursor(cursor_factory=RealDictCursor) try: # Set schema cursor.execute(f"SET search_path TO {schema}") yield cursor conn.commit() except Exception: conn.rollback() raise finally: cursor.close() conn.close() def init_public_schema(self): """Initialize public schema với tenants table""" with self.get_connection() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS tenants ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, schema_name VARCHAR(63) NOT NULL UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) def create_tenant(self, name: str) -> str: """Create new tenant với isolated schema""" schema_name = f"tenant_{name.lower().replace(' ', '_')}" with self.get_connection() as cursor: # Create schema cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}") # Register tenant cursor.execute( "INSERT INTO tenants (name, schema_name) VALUES (%s, %s) RETURNING id", (name, schema_name) ) tenant_id = cursor.fetchone()["id"] print(f"✅ Created tenant: {name} (ID: {tenant_id}, Schema: {schema_name})") # Initialize tenant schema self.init_tenant_schema(schema_name) return schema_name def init_tenant_schema(self, schema: str): """Initialize tenant-specific tables""" with self.get_connection(schema) as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, username VARCHAR(50) NOT NULL UNIQUE, email VARCHAR(100) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS data ( id SERIAL PRIMARY KEY, user_id INTEGER REFERENCES users(id), content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) def add_user(self, schema: str, username: str, email: str) -> int: """Add user to tenant""" with self.get_connection(schema) as cursor: cursor.execute( "INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id", (username, email) ) return cursor.fetchone()["id"] def get_tenant_users(self, schema: str) -> List[Dict]: """Get all users for tenant""" with self.get_connection(schema) as cursor: cursor.execute("SELECT * FROM users") return cursor.fetchall() # Sử dụngmulti_tenant = MultiTenantDatabase("postgresql://postgres:password@localhost:5432/saas") # Create tenantsacme_schema = multi_tenant.create_tenant("Acme Corp")globex_schema = multi_tenant.create_tenant("Globex Inc") # Add users to different tenantsacme_user = multi_tenant.add_user(acme_schema, "john_acme", "[email protected]")globex_user = multi_tenant.add_user(globex_schema, "jane_globex", "[email protected]") # Get tenant users (isolated)acme_users = multi_tenant.get_tenant_users(acme_schema)print(f"Acme users: {len(acme_users)}") # 1 user globex_users = multi_tenant.get_tenant_users(globex_schema)print(f"Globex users: {len(globex_users)}") # 1 user
4. Database Migration System
import psycopg2from typing import List, Callablefrom datetime import datetimeimport hashlib class Migration: """Single migration""" def __init__(self, version: str, name: str, up: Callable, down: Callable): self.version = version self.name = name self.up = up self.down = down self.checksum = self._calculate_checksum() def _calculate_checksum(self) -> str: """Calculate migration checksum""" content = f"{self.version}{self.name}{self.up.__code__.co_code}" return hashlib.md5(content.encode()).hexdigest() class MigrationManager: """Database migration manager""" def __init__(self, conn_string: str): self.conn_string = conn_string self.migrations: List[Migration] = [] self.init_migrations_table() def get_connection(self): """Get database connection""" return psycopg2.connect(self.conn_string) def init_migrations_table(self): """Initialize migrations tracking table""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(20) PRIMARY KEY, name VARCHAR(200) NOT NULL, checksum VARCHAR(32) NOT NULL, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) def register(self, version: str, name: str): """Decorator để register migration""" def decorator(func): # Extract up and down from function def up(cursor): func(cursor) def down(cursor): pass # Implement rollback logic migration = Migration(version, name, up, down) self.migrations.append(migration) return func return decorator def get_applied_migrations(self) -> List[str]: """Get list of applied migrations""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT version FROM schema_migrations ORDER BY version") return [row[0] for row in cursor.fetchall()] def migrate(self): """Apply pending migrations""" applied = self.get_applied_migrations() pending = [m for m in self.migrations if m.version not in applied] if not pending: print("✅ No pending migrations") return print(f"Applying {len(pending)} migrations...") for migration in sorted(pending, key=lambda m: m.version): with self.get_connection() as conn: cursor = conn.cursor() try: print(f" Applying {migration.version}: {migration.name}") # Run migration migration.up(cursor) # Record migration cursor.execute( "INSERT INTO schema_migrations (version, name, checksum) VALUES (%s, %s, %s)", (migration.version, migration.name, migration.checksum) ) conn.commit() print(f" ✅ Applied {migration.version}") except Exception as e: conn.rollback() print(f" ❌ Failed {migration.version}: {e}") raise # Sử dụngmigrator = MigrationManager("postgresql://postgres:password@localhost:5432/mydb") # Define migrations@migrator.register("001", "create_users_table")def migration_001(cursor): cursor.execute(""" CREATE TABLE users ( id SERIAL PRIMARY KEY, username VARCHAR(50) NOT NULL UNIQUE, email VARCHAR(100) NOT NULL ) """) @migrator.register("002", "add_users_created_at")def migration_002(cursor): cursor.execute(""" ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP """) @migrator.register("003", "create_posts_table")def migration_003(cursor): cursor.execute(""" CREATE TABLE posts ( id SERIAL PRIMARY KEY, user_id INTEGER REFERENCES users(id), title VARCHAR(200) NOT NULL, content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Run migrationsmigrator.migrate()
5. Database Backup and Restore
import psycopg2import subprocessfrom datetime import datetimefrom pathlib import Pathimport gzipimport shutil class DatabaseBackup: """Database backup và restore utility""" def __init__( self, host: str, database: str, user: str, password: str, backup_dir: str = "./backups" ): self.host = host self.database = database self.user = user self.password = password self.backup_dir = Path(backup_dir) self.backup_dir.mkdir(exist_ok=True) def backup(self, compress: bool = True) -> str: """Backup database""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{self.database}_{timestamp}.sql" filepath = self.backup_dir / filename # pg_dump command cmd = [ "pg_dump", "-h", self.host, "-U", self.user, "-d", self.database, "-f", str(filepath), "--no-owner", "--no-acl" ] # Set password env = {"PGPASSWORD": self.password} print(f"Creating backup: {filename}") subprocess.run(cmd, env=env, check=True) # Compress if compress: compressed_path = f"{filepath}.gz" with open(filepath, "rb") as f_in: with gzip.open(compressed_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) filepath.unlink() # Remove uncompressed filepath = Path(compressed_path) print(f"✅ Compressed backup: {compressed_path}") print(f"✅ Backup created: {filepath}") return str(filepath) def restore(self, backup_file: str): """Restore database từ backup""" backup_path = Path(backup_file) if not backup_path.exists(): raise FileNotFoundError(f"Backup file not found: {backup_file}") # Decompress if needed if backup_path.suffix == ".gz": decompressed = backup_path.with_suffix("") with gzip.open(backup_path, "rb") as f_in: with open(decompressed, "wb") as f_out: shutil.copyfileobj(f_in, f_out) backup_path = decompressed # psql command cmd = [ "psql", "-h", self.host, "-U", self.user, "-d", self.database, "-f", str(backup_path) ] env = {"PGPASSWORD": self.password} print(f"Restoring from: {backup_path}") subprocess.run(cmd, env=env, check=True) print(f"✅ Database restored") def list_backups(self) -> list: """List all backups""" backups = sorted( self.backup_dir.glob(f"{self.database}_*.sql*"), reverse=True ) return [str(b) for b in backups] def cleanup_old_backups(self, keep_count: int = 5): """Keep only recent backups""" backups = self.list_backups() if len(backups) > keep_count: for backup in backups[keep_count:]: Path(backup).unlink() print(f"Removed old backup: {backup}") # Sử dụngbackup_manager = DatabaseBackup( host="localhost", database="mydb", user="postgres", password="password", backup_dir="./backups") # Create backupbackup_file = backup_manager.backup(compress=True) # List backupsbackups = backup_manager.list_backups()print(f"Available backups: {len(backups)}") # Restore (nếu cần)# backup_manager.restore(backup_file) # Cleanup old backupsbackup_manager.cleanup_old_backups(keep_count=5)
Best Practices
1. Connection Management
# ✅ Always use context managerswith psycopg2.connect(conn_string) as conn: cursor = conn.cursor() # Auto commit/rollback # ✅ Use connection pooling cho concurrent requestspool = psycopg2.pool.ThreadedConnectionPool(2, 20, conn_string) # ❌ Don't create connection per requestconn = psycopg2.connect(conn_string) # Expensive!
2. SQL Injection Prevention
# ✅ Use parameterized queriescursor.execute("SELECT * FROM users WHERE username = %s", (username,)) # ❌ Never use string formattingcursor.execute(f"SELECT * FROM users WHERE username = '{username}'") # Dangerous!
3. Transaction Management
# ✅ Use transactions cho multiple operationswith conn: cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Auto commit both or rollback both # ✅ Explicit transaction controlcursor.execute("BEGIN")try: # operations cursor.execute("COMMIT")except: cursor.execute("ROLLBACK") raise
4. Error Handling
import psycopg2from psycopg2 import errorcodes try: cursor.execute(query)except psycopg2.IntegrityError as e: if e.pgcode == errorcodes.UNIQUE_VIOLATION: print("Duplicate entry")except psycopg2.OperationalError: print("Connection error")except psycopg2.DatabaseError as e: print(f"Database error: {e}")
Bài Tập Thực Hành
Bài 1: Connection Pool
Implement connection pool với health checks và automatic reconnection.
Bài 2: Repository Pattern
Tạo generic repository pattern cho User và Post models với PostgreSQL.
Bài 3: Migration System
Implement database migration system với version tracking và rollback.
Bài 4: Multi-Tenant App
Tạo multi-tenant application với schema isolation.
Bài 5: Backup Automation
Implement automated backup system với scheduled backups và retention policy.
Tóm Tắt
Trong Part 2 chúng ta đã học:
- ✅ PostgreSQL - psycopg2, CRUD operations, JSON support
- ✅ Connection Pooling - SimpleConnectionPool, ThreadedConnectionPool
- ✅ ORM Concepts - SQLAlchemy basics, repository pattern
- ✅ Advanced Patterns - E-commerce, analytics, multi-tenant, migrations, backups
- ✅ Best Practices - Connection management, SQL injection prevention, transactions
Database operations là core của applications - master các concepts này để build scalable, secure applications!
Bài tiếp theo: Bài 17: Logging - Học cách implement comprehensive logging system! 📝