Bài 16: Working với Databases - SQLite3

Mục Tiêu Bài Học

Sau khi hoàn thành bài này, bạn sẽ:

  • ✅ Sử dụng sqlite3 module
  • ✅ Làm việc với database connectors
  • ✅ Viết SQL queries trong Python
  • ✅ Hiểu connection pooling
  • ✅ Tìm hiểu ORM concepts
  • ✅ Áp dụng best practices

Database trong Python

Python hỗ trợ nhiều databases:

  • SQLite - Built-in, file-based, nhẹ
  • PostgreSQL - Production-grade, feature-rich
  • MySQL - Popular, web applications
  • MongoDB - NoSQL, document-based

1. SQLite3 - Built-in Database

SQLite là database có sẵn trong Python, lưu data trong file .db.

Kết Nối và Tạo Database

import sqlite3 # Tạo/kết nối databaseconn = sqlite3.connect("myapp.db") # In-memory database (test)conn = sqlite3.connect(":memory:") # Cursor để execute queriescursor = conn.cursor() # Đóng connectionconn.close()

Tạo Tables

import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Tạo users tablecursor.execute("""    CREATE TABLE IF NOT EXISTS users (        id INTEGER PRIMARY KEY AUTOINCREMENT,        username TEXT NOT NULL UNIQUE,        email TEXT NOT NULL,        age INTEGER,        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP    )""") # Tạo posts table với foreign keycursor.execute("""    CREATE TABLE IF NOT EXISTS posts (        id INTEGER PRIMARY KEY AUTOINCREMENT,        title TEXT NOT NULL,        content TEXT,        user_id INTEGER,        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,        FOREIGN KEY (user_id) REFERENCES users(id)    )""") # Commit changesconn.commit()conn.close()

Insert Data

import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Insert single rowcursor.execute(    "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",    ("john_doe", "[email protected]", 30)) # Insert multiple rowsusers = [    ("jane_smith", "[email protected]", 25),    ("bob_wilson", "[email protected]", 35),    ("alice_brown", "[email protected]", 28),]cursor.executemany(    "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",    users) # Get last inserted IDlast_id = cursor.lastrowidprint(f"Inserted user with ID: {last_id}") conn.commit()conn.close()

Query Data

import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Fetch all rowscursor.execute("SELECT * FROM users")rows = cursor.fetchall()for row in rows:    print(row)  # (1, 'john_doe', '[email protected]', 30, '2024-01-01 10:00:00') # Fetch one rowcursor.execute("SELECT * FROM users WHERE username = ?", ("john_doe",))user = cursor.fetchone()print(user) # Fetch many rowscursor.execute("SELECT * FROM users WHERE age > ?", (25,))users = cursor.fetchmany(2)  # Lấy 2 rows đầuprint(users) # Iterate over resultscursor.execute("SELECT username, email FROM users")for username, email in cursor:    print(f"{username}: {email}") conn.close()

Row Factory - Dictionary Results

import sqlite3 conn = sqlite3.connect("myapp.db") # Set row factory to return dictionariesconn.row_factory = sqlite3.Rowcursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", ("john_doe",))user = cursor.fetchone() # Access by column nameprint(user["username"])  # john_doeprint(user["email"])     # [email protected] # Convert to dictuser_dict = dict(user)print(user_dict)  # {'id': 1, 'username': 'john_doe', ...} conn.close()

Update và Delete

import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Updatecursor.execute(    "UPDATE users SET age = ? WHERE username = ?",    (31, "john_doe"))print(f"Updated {cursor.rowcount} rows") # Deletecursor.execute(    "DELETE FROM users WHERE age < ?",    (20,))print(f"Deleted {cursor.rowcount} rows") conn.commit()conn.close()

Context Manager

import sqlite3 # Tự động commit/rollback và closewith sqlite3.connect("myapp.db") as conn:    cursor = conn.cursor()        cursor.execute(        "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",        ("new_user", "[email protected]", 25)    )        # Auto commit nếu thành công    # Auto rollback nếu có exception # Connection tự động close

2. Transactions

Basic Transactions

import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() try:    # Begin transaction (implicit)    cursor.execute("UPDATE users SET age = age + 1 WHERE username = ?", ("john_doe",))    cursor.execute("INSERT INTO posts (title, user_id) VALUES (?, ?)", ("New Post", 1))        # Commit transaction    conn.commit()    print("✅ Transaction successful")    except sqlite3.Error as e:    # Rollback on error    conn.rollback()    print(f"❌ Transaction failed: {e}")    finally:    conn.close()

Transaction Isolation Levels

import sqlite3 # DEFERRED (default) - Lock khi writeconn = sqlite3.connect("myapp.db", isolation_level="DEFERRED") # IMMEDIATE - Lock ngay khi beginconn = sqlite3.connect("myapp.db", isolation_level="IMMEDIATE") # EXCLUSIVE - Exclusive lockconn = sqlite3.connect("myapp.db", isolation_level="EXCLUSIVE") # Autocommit modeconn = sqlite3.connect("myapp.db", isolation_level=None)

Manual Transaction Control

import sqlite3 conn = sqlite3.connect("myapp.db")# Disable autocommitconn.isolation_level = None cursor = conn.cursor() # Start transactioncursor.execute("BEGIN TRANSACTION") try:    cursor.execute("UPDATE users SET age = age + 1")    cursor.execute("INSERT INTO posts (title, user_id) VALUES (?, ?)", ("Post", 1))        # Commit    cursor.execute("COMMIT")    except sqlite3.Error:    # Rollback    cursor.execute("ROLLBACK")    raise conn.close()

3. Database Design Patterns

Database Connection Manager

import sqlite3from contextlib import contextmanagerfrom typing import Generator class DatabaseManager:    """Quản lý database connections"""        def __init__(self, db_path: str):        self.db_path = db_path        @contextmanager    def get_connection(self) -> Generator[sqlite3.Connection, None, None]:        """Context manager cho connections"""        conn = sqlite3.connect(self.db_path)        conn.row_factory = sqlite3.Row        try:            yield conn            conn.commit()        except Exception:            conn.rollback()            raise        finally:            conn.close()        def execute(self, query: str, params: tuple = ()) -> list:        """Execute query và return results"""        with self.get_connection() as conn:            cursor = conn.cursor()            cursor.execute(query, params)            return cursor.fetchall()        def execute_many(self, query: str, params_list: list) -> int:        """Execute query với multiple parameters"""        with self.get_connection() as conn:            cursor = conn.cursor()            cursor.executemany(query, params_list)            return cursor.rowcount # Sử dụngdb = DatabaseManager("myapp.db") # Queryusers = db.execute("SELECT * FROM users WHERE age > ?", (25,))for user in users:    print(dict(user)) # Insert manyusers_data = [    ("user1", "[email protected]", 30),    ("user2", "[email protected]", 25),]db.execute_many(    "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",    users_data)

Repository Pattern

import sqlite3from typing import Optional, List, Dict, Anyfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass User:    """User model"""    id: Optional[int] = None    username: str = ""    email: str = ""    age: int = 0    created_at: Optional[datetime] = None        @classmethod    def from_row(cls, row: sqlite3.Row) -> "User":        """Create User từ database row"""        return cls(            id=row["id"],            username=row["username"],            email=row["email"],            age=row["age"],            created_at=datetime.fromisoformat(row["created_at"])        ) class UserRepository:    """Repository pattern cho User"""        def __init__(self, db: DatabaseManager):        self.db = db        def create_table(self):        """Tạo users table"""        query = """            CREATE TABLE IF NOT EXISTS users (                id INTEGER PRIMARY KEY AUTOINCREMENT,                username TEXT NOT NULL UNIQUE,                email TEXT NOT NULL,                age INTEGER,                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP            )        """        self.db.execute(query)        def create(self, user: User) -> User:        """Tạo user mới"""        query = "INSERT INTO users (username, email, age) VALUES (?, ?, ?)"        with self.db.get_connection() as conn:            cursor = conn.cursor()            cursor.execute(query, (user.username, user.email, user.age))            user.id = cursor.lastrowid        return user        def find_by_id(self, user_id: int) -> Optional[User]:        """Tìm user by ID"""        rows = self.db.execute("SELECT * FROM users WHERE id = ?", (user_id,))        return User.from_row(rows[0]) if rows else None        def find_by_username(self, username: str) -> Optional[User]:        """Tìm user by username"""        rows = self.db.execute(            "SELECT * FROM users WHERE username = ?",            (username,)        )        return User.from_row(rows[0]) if rows else None        def find_all(self) -> List[User]:        """Lấy tất cả users"""        rows = self.db.execute("SELECT * FROM users")        return [User.from_row(row) for row in rows]        def update(self, user: User) -> bool:        """Update user"""        query = "UPDATE users SET username = ?, email = ?, age = ? WHERE id = ?"        with self.db.get_connection() as conn:            cursor = conn.cursor()            cursor.execute(query, (user.username, user.email, user.age, user.id))            return cursor.rowcount > 0        def delete(self, user_id: int) -> bool:        """Delete user"""        with self.db.get_connection() as conn:            cursor = conn.cursor()            cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))            return cursor.rowcount > 0 # Sử dụngdb = DatabaseManager("myapp.db")user_repo = UserRepository(db) # Create tableuser_repo.create_table() # Create usernew_user = User(username="john_doe", email="[email protected]", age=30)created_user = user_repo.create(new_user)print(f"Created user with ID: {created_user.id}") # Find useruser = user_repo.find_by_username("john_doe")if user:    print(f"Found: {user.username} ({user.email})") # Update useruser.age = 31user_repo.update(user) # List all usersusers = user_repo.find_all()print(f"Total users: {len(users)}")

4. Query Builder

from typing import List, Any, Optional class QueryBuilder:    """Simple query builder"""        def __init__(self, table: str):        self.table = table        self._select: List[str] = ["*"]        self._where: List[str] = []        self._params: List[Any] = []        self._order_by: Optional[str] = None        self._limit: Optional[int] = None        self._offset: Optional[int] = None        def select(self, *columns: str) -> "QueryBuilder":        """Select columns"""        self._select = list(columns)        return self        def where(self, condition: str, *params: Any) -> "QueryBuilder":        """Add WHERE condition"""        self._where.append(condition)        self._params.extend(params)        return self        def order_by(self, column: str, direction: str = "ASC") -> "QueryBuilder":        """Add ORDER BY"""        self._order_by = f"{column} {direction}"        return self        def limit(self, limit: int) -> "QueryBuilder":        """Add LIMIT"""        self._limit = limit        return self        def offset(self, offset: int) -> "QueryBuilder":        """Add OFFSET"""        self._offset = offset        return self        def build(self) -> tuple[str, List[Any]]:        """Build SQL query"""        # SELECT        query = f"SELECT {', '.join(self._select)} FROM {self.table}"                # WHERE        if self._where:            query += " WHERE " + " AND ".join(self._where)                # ORDER BY        if self._order_by:            query += f" ORDER BY {self._order_by}"                # LIMIT        if self._limit:            query += f" LIMIT {self._limit}"                # OFFSET        if self._offset:            query += f" OFFSET {self._offset}"                return query, self._params        def execute(self, db: DatabaseManager) -> List[sqlite3.Row]:        """Execute query"""        query, params = self.build()        return db.execute(query, tuple(params)) # Sử dụngdb = DatabaseManager("myapp.db") # Simple queryusers = (    QueryBuilder("users")    .select("username", "email")    .where("age > ?", 25)    .order_by("username", "ASC")    .limit(10)    .execute(db)) # Complex queryactive_users = (    QueryBuilder("users")    .where("age > ?", 18)    .where("email LIKE ?", "%@example.com")    .order_by("created_at", "DESC")    .limit(5)    .offset(10)    .execute(db)) for user in active_users:    print(dict(user))

5 Ứng Dụng Thực Tế

1. Blog System

import sqlite3from typing import List, Optionalfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass Post:    id: Optional[int] = None    title: str = ""    content: str = ""    author_id: int = 0    published: bool = False    created_at: Optional[datetime] = None class BlogDatabase:    """Blog database manager"""        def __init__(self, db_path: str):        self.db_path = db_path        self.init_database()        def init_database(self):        """Initialize database schema"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()                        # Users table            cursor.execute("""                CREATE TABLE IF NOT EXISTS users (                    id INTEGER PRIMARY KEY AUTOINCREMENT,                    username TEXT NOT NULL UNIQUE,                    email TEXT NOT NULL,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)                        # Posts table            cursor.execute("""                CREATE TABLE IF NOT EXISTS posts (                    id INTEGER PRIMARY KEY AUTOINCREMENT,                    title TEXT NOT NULL,                    content TEXT NOT NULL,                    author_id INTEGER NOT NULL,                    published BOOLEAN DEFAULT 0,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                    FOREIGN KEY (author_id) REFERENCES users(id)                )            """)                        # Comments table            cursor.execute("""                CREATE TABLE IF NOT EXISTS comments (                    id INTEGER PRIMARY KEY AUTOINCREMENT,                    post_id INTEGER NOT NULL,                    user_id INTEGER NOT NULL,                    content TEXT NOT NULL,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                    FOREIGN KEY (post_id) REFERENCES posts(id),                    FOREIGN KEY (user_id) REFERENCES users(id)                )            """)                        # Tags table            cursor.execute("""                CREATE TABLE IF NOT EXISTS tags (                    id INTEGER PRIMARY KEY AUTOINCREMENT,                    name TEXT NOT NULL UNIQUE                )            """)                        # Post-Tag junction table            cursor.execute("""                CREATE TABLE IF NOT EXISTS post_tags (                    post_id INTEGER NOT NULL,                    tag_id INTEGER NOT NULL,                    PRIMARY KEY (post_id, tag_id),                    FOREIGN KEY (post_id) REFERENCES posts(id),                    FOREIGN KEY (tag_id) REFERENCES tags(id)                )            """)        def create_post(self, title: str, content: str, author_id: int) -> int:        """Create new post"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute(                "INSERT INTO posts (title, content, author_id) VALUES (?, ?, ?)",                (title, content, author_id)            )            return cursor.lastrowid        def publish_post(self, post_id: int):        """Publish a post"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute(                "UPDATE posts SET published = 1 WHERE id = ?",                (post_id,)            )        def get_published_posts(self) -> List[dict]:        """Get all published posts với author info"""        with sqlite3.connect(self.db_path) as conn:            conn.row_factory = sqlite3.Row            cursor = conn.cursor()            cursor.execute("""                SELECT                     p.id,                    p.title,                    p.content,                    p.created_at,                    u.username as author                FROM posts p                JOIN users u ON p.author_id = u.id                WHERE p.published = 1                ORDER BY p.created_at DESC            """)            return [dict(row) for row in cursor.fetchall()]        def add_comment(self, post_id: int, user_id: int, content: str):        """Add comment to post"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute(                "INSERT INTO comments (post_id, user_id, content) VALUES (?, ?, ?)",                (post_id, user_id, content)            )        def get_post_with_comments(self, post_id: int) -> dict:        """Get post with all comments"""        with sqlite3.connect(self.db_path) as conn:            conn.row_factory = sqlite3.Row            cursor = conn.cursor()                        # Get post            cursor.execute("""                SELECT                     p.*,                    u.username as author                FROM posts p                JOIN users u ON p.author_id = u.id                WHERE p.id = ?            """, (post_id,))            post = dict(cursor.fetchone())                        # Get comments            cursor.execute("""                SELECT                     c.content,                    c.created_at,                    u.username as author                FROM comments c                JOIN users u ON c.user_id = u.id                WHERE c.post_id = ?                ORDER BY c.created_at ASC            """, (post_id,))            post["comments"] = [dict(row) for row in cursor.fetchall()]                        return post # Sử dụngblog = BlogDatabase("blog.db") # Create userwith sqlite3.connect("blog.db") as conn:    cursor = conn.cursor()    cursor.execute(        "INSERT INTO users (username, email) VALUES (?, ?)",        ("john_doe", "[email protected]")    )    author_id = cursor.lastrowid # Create and publish postpost_id = blog.create_post(    "My First Post",    "This is my first blog post!",    author_id)blog.publish_post(post_id) # Add commentsblog.add_comment(post_id, author_id, "Great post!") # Get postsposts = blog.get_published_posts()for post in posts:    print(f"{post['title']} by {post['author']}")

2. User Authentication System

import sqlite3import hashlibimport secretsfrom typing import Optionalfrom datetime import datetime, timedelta class AuthDatabase:    """User authentication với password hashing"""        def __init__(self, db_path: str):        self.db_path = db_path        self.init_database()        def init_database(self):        """Initialize auth tables"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()                        cursor.execute("""                CREATE TABLE IF NOT EXISTS users (                    id INTEGER PRIMARY KEY AUTOINCREMENT,                    username TEXT NOT NULL UNIQUE,                    email TEXT NOT NULL UNIQUE,                    password_hash TEXT NOT NULL,                    salt TEXT NOT NULL,                    is_active BOOLEAN DEFAULT 1,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP                )            """)                        cursor.execute("""                CREATE TABLE IF NOT EXISTS sessions (                    id INTEGER PRIMARY KEY AUTOINCREMENT,                    user_id INTEGER NOT NULL,                    token TEXT NOT NULL UNIQUE,                    expires_at TIMESTAMP NOT NULL,                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                    FOREIGN KEY (user_id) REFERENCES users(id)                )            """)        def _hash_password(self, password: str, salt: str) -> str:        """Hash password với salt"""        return hashlib.pbkdf2_hmac(            "sha256",            password.encode(),            salt.encode(),            100000        ).hex()        def register(self, username: str, email: str, password: str) -> Optional[int]:        """Register new user"""        salt = secrets.token_hex(16)        password_hash = self._hash_password(password, salt)                try:            with sqlite3.connect(self.db_path) as conn:                cursor = conn.cursor()                cursor.execute(                    "INSERT INTO users (username, email, password_hash, salt) VALUES (?, ?, ?, ?)",                    (username, email, password_hash, salt)                )                return cursor.lastrowid        except sqlite3.IntegrityError:            return None  # Username or email already exists        def login(self, username: str, password: str) -> Optional[str]:        """Login user và return session token"""        with sqlite3.connect(self.db_path) as conn:            conn.row_factory = sqlite3.Row            cursor = conn.cursor()                        # Get user            cursor.execute(                "SELECT * FROM users WHERE username = ? AND is_active = 1",                (username,)            )            user = cursor.fetchone()                        if not user:                return None                        # Verify password            password_hash = self._hash_password(password, user["salt"])            if password_hash != user["password_hash"]:                return None                        # Create session            token = secrets.token_urlsafe(32)            expires_at = datetime.now() + timedelta(days=7)                        cursor.execute(                "INSERT INTO sessions (user_id, token, expires_at) VALUES (?, ?, ?)",                (user["id"], token, expires_at)            )                        return token        def verify_session(self, token: str) -> Optional[dict]:        """Verify session token"""        with sqlite3.connect(self.db_path) as conn:            conn.row_factory = sqlite3.Row            cursor = conn.cursor()                        cursor.execute("""                SELECT u.id, u.username, u.email                FROM sessions s                JOIN users u ON s.user_id = u.id                WHERE s.token = ? AND s.expires_at > ? AND u.is_active = 1            """, (token, datetime.now()))                        user = cursor.fetchone()            return dict(user) if user else None        def logout(self, token: str):        """Logout - delete session"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute("DELETE FROM sessions WHERE token = ?", (token,)) # Sử dụngauth = AuthDatabase("auth.db") # Registeruser_id = auth.register("john_doe", "[email protected]", "secure_password123")if user_id:    print(f"✅ Registered user ID: {user_id}") # Logintoken = auth.login("john_doe", "secure_password123")if token:    print(f"✅ Login successful, token: {token[:20]}...")        # Verify session    user = auth.verify_session(token)    if user:        print(f"✅ Session valid for: {user['username']}")        # Logout    auth.logout(token)    print("✅ Logged out")

3. Caching Layer

import sqlite3import jsonimport timefrom typing import Any, Optional class DatabaseCache:    """Database-backed cache với TTL"""        def __init__(self, db_path: str):        self.db_path = db_path        self.init_database()        def init_database(self):        """Initialize cache table"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute("""                CREATE TABLE IF NOT EXISTS cache (                    key TEXT PRIMARY KEY,                    value TEXT NOT NULL,                    expires_at REAL NOT NULL                )            """)                        # Index for cleanup            cursor.execute("""                CREATE INDEX IF NOT EXISTS idx_expires_at                 ON cache(expires_at)            """)        def set(self, key: str, value: Any, ttl: int = 3600):        """Set cache với TTL (seconds)"""        expires_at = time.time() + ttl        value_json = json.dumps(value)                with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute(                "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",                (key, value_json, expires_at)            )        def get(self, key: str) -> Optional[Any]:        """Get cache value"""        with sqlite3.connect(self.db_path) as conn:            conn.row_factory = sqlite3.Row            cursor = conn.cursor()                        cursor.execute(                "SELECT value, expires_at FROM cache WHERE key = ?",                (key,)            )            row = cursor.fetchone()                        if not row:                return None                        # Check expiration            if row["expires_at"] < time.time():                self.delete(key)                return None                        return json.loads(row["value"])        def delete(self, key: str):        """Delete cache entry"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute("DELETE FROM cache WHERE key = ?", (key,))        def cleanup(self):        """Remove expired entries"""        with sqlite3.connect(self.db_path) as conn:            cursor = conn.cursor()            cursor.execute(                "DELETE FROM cache WHERE expires_at < ?",                (time.time(),)            )            return cursor.rowcount # Sử dụngcache = DatabaseCache("cache.db") # Set cachecache.set("user:123", {"name": "John", "email": "[email protected]"}, ttl=60) # Get cacheuser = cache.get("user:123")if user:    print(f"Cached user: {user['name']}") # Cleanup expiredremoved = cache.cleanup()print(f"Cleaned up {removed} expired entries")

Bài tiếp theo: Bài 16.2: Working with Databases - PostgresQL