Bài 16: Working với Databases - SQLite3
Mục Tiêu Bài Học
Sau khi hoàn thành bài này, bạn sẽ:
- ✅ Sử dụng sqlite3 module
- ✅ Làm việc với database connectors
- ✅ Viết SQL queries trong Python
- ✅ Hiểu connection pooling
- ✅ Tìm hiểu ORM concepts
- ✅ Áp dụng best practices
Database trong Python
Python hỗ trợ nhiều databases:
- SQLite - Built-in, file-based, nhẹ
- PostgreSQL - Production-grade, feature-rich
- MySQL - Popular, web applications
- MongoDB - NoSQL, document-based
1. SQLite3 - Built-in Database
SQLite là database có sẵn trong Python, lưu data trong file .db.
Kết Nối và Tạo Database
import sqlite3 # Tạo/kết nối databaseconn = sqlite3.connect("myapp.db") # In-memory database (test)conn = sqlite3.connect(":memory:") # Cursor để execute queriescursor = conn.cursor() # Đóng connectionconn.close()
Tạo Tables
import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Tạo users tablecursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL, age INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") # Tạo posts table với foreign keycursor.execute(""" CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, content TEXT, user_id INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) )""") # Commit changesconn.commit()conn.close()
Insert Data
import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Insert single rowcursor.execute( "INSERT INTO users (username, email, age) VALUES (?, ?, ?)", ("john_doe", "[email protected]", 30)) # Insert multiple rowsusers = [ ("jane_smith", "[email protected]", 25), ("bob_wilson", "[email protected]", 35), ("alice_brown", "[email protected]", 28),]cursor.executemany( "INSERT INTO users (username, email, age) VALUES (?, ?, ?)", users) # Get last inserted IDlast_id = cursor.lastrowidprint(f"Inserted user with ID: {last_id}") conn.commit()conn.close()
Query Data
import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Fetch all rowscursor.execute("SELECT * FROM users")rows = cursor.fetchall()for row in rows: print(row) # (1, 'john_doe', '[email protected]', 30, '2024-01-01 10:00:00') # Fetch one rowcursor.execute("SELECT * FROM users WHERE username = ?", ("john_doe",))user = cursor.fetchone()print(user) # Fetch many rowscursor.execute("SELECT * FROM users WHERE age > ?", (25,))users = cursor.fetchmany(2) # Lấy 2 rows đầuprint(users) # Iterate over resultscursor.execute("SELECT username, email FROM users")for username, email in cursor: print(f"{username}: {email}") conn.close()
Row Factory - Dictionary Results
import sqlite3 conn = sqlite3.connect("myapp.db") # Set row factory to return dictionariesconn.row_factory = sqlite3.Rowcursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", ("john_doe",))user = cursor.fetchone() # Access by column nameprint(user["username"]) # john_doeprint(user["email"]) # [email protected] # Convert to dictuser_dict = dict(user)print(user_dict) # {'id': 1, 'username': 'john_doe', ...} conn.close()
Update và Delete
import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() # Updatecursor.execute( "UPDATE users SET age = ? WHERE username = ?", (31, "john_doe"))print(f"Updated {cursor.rowcount} rows") # Deletecursor.execute( "DELETE FROM users WHERE age < ?", (20,))print(f"Deleted {cursor.rowcount} rows") conn.commit()conn.close()
Context Manager
import sqlite3 # Tự động commit/rollback và closewith sqlite3.connect("myapp.db") as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO users (username, email, age) VALUES (?, ?, ?)", ("new_user", "[email protected]", 25) ) # Auto commit nếu thành công # Auto rollback nếu có exception # Connection tự động close
2. Transactions
Basic Transactions
import sqlite3 conn = sqlite3.connect("myapp.db")cursor = conn.cursor() try: # Begin transaction (implicit) cursor.execute("UPDATE users SET age = age + 1 WHERE username = ?", ("john_doe",)) cursor.execute("INSERT INTO posts (title, user_id) VALUES (?, ?)", ("New Post", 1)) # Commit transaction conn.commit() print("✅ Transaction successful") except sqlite3.Error as e: # Rollback on error conn.rollback() print(f"❌ Transaction failed: {e}") finally: conn.close()
Transaction Isolation Levels
import sqlite3 # DEFERRED (default) - Lock khi writeconn = sqlite3.connect("myapp.db", isolation_level="DEFERRED") # IMMEDIATE - Lock ngay khi beginconn = sqlite3.connect("myapp.db", isolation_level="IMMEDIATE") # EXCLUSIVE - Exclusive lockconn = sqlite3.connect("myapp.db", isolation_level="EXCLUSIVE") # Autocommit modeconn = sqlite3.connect("myapp.db", isolation_level=None)
Manual Transaction Control
import sqlite3 conn = sqlite3.connect("myapp.db")# Disable autocommitconn.isolation_level = None cursor = conn.cursor() # Start transactioncursor.execute("BEGIN TRANSACTION") try: cursor.execute("UPDATE users SET age = age + 1") cursor.execute("INSERT INTO posts (title, user_id) VALUES (?, ?)", ("Post", 1)) # Commit cursor.execute("COMMIT") except sqlite3.Error: # Rollback cursor.execute("ROLLBACK") raise conn.close()
3. Database Design Patterns
Database Connection Manager
import sqlite3from contextlib import contextmanagerfrom typing import Generator class DatabaseManager: """Quản lý database connections""" def __init__(self, db_path: str): self.db_path = db_path @contextmanager def get_connection(self) -> Generator[sqlite3.Connection, None, None]: """Context manager cho connections""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def execute(self, query: str, params: tuple = ()) -> list: """Execute query và return results""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(query, params) return cursor.fetchall() def execute_many(self, query: str, params_list: list) -> int: """Execute query với multiple parameters""" with self.get_connection() as conn: cursor = conn.cursor() cursor.executemany(query, params_list) return cursor.rowcount # Sử dụngdb = DatabaseManager("myapp.db") # Queryusers = db.execute("SELECT * FROM users WHERE age > ?", (25,))for user in users: print(dict(user)) # Insert manyusers_data = [ ("user1", "[email protected]", 30), ("user2", "[email protected]", 25),]db.execute_many( "INSERT INTO users (username, email, age) VALUES (?, ?, ?)", users_data)
Repository Pattern
import sqlite3from typing import Optional, List, Dict, Anyfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass User: """User model""" id: Optional[int] = None username: str = "" email: str = "" age: int = 0 created_at: Optional[datetime] = None @classmethod def from_row(cls, row: sqlite3.Row) -> "User": """Create User từ database row""" return cls( id=row["id"], username=row["username"], email=row["email"], age=row["age"], created_at=datetime.fromisoformat(row["created_at"]) ) class UserRepository: """Repository pattern cho User""" def __init__(self, db: DatabaseManager): self.db = db def create_table(self): """Tạo users table""" query = """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL, age INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ self.db.execute(query) def create(self, user: User) -> User: """Tạo user mới""" query = "INSERT INTO users (username, email, age) VALUES (?, ?, ?)" with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute(query, (user.username, user.email, user.age)) user.id = cursor.lastrowid return user def find_by_id(self, user_id: int) -> Optional[User]: """Tìm user by ID""" rows = self.db.execute("SELECT * FROM users WHERE id = ?", (user_id,)) return User.from_row(rows[0]) if rows else None def find_by_username(self, username: str) -> Optional[User]: """Tìm user by username""" rows = self.db.execute( "SELECT * FROM users WHERE username = ?", (username,) ) return User.from_row(rows[0]) if rows else None def find_all(self) -> List[User]: """Lấy tất cả users""" rows = self.db.execute("SELECT * FROM users") return [User.from_row(row) for row in rows] def update(self, user: User) -> bool: """Update user""" query = "UPDATE users SET username = ?, email = ?, age = ? WHERE id = ?" with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute(query, (user.username, user.email, user.age, user.id)) return cursor.rowcount > 0 def delete(self, user_id: int) -> bool: """Delete user""" with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = ?", (user_id,)) return cursor.rowcount > 0 # Sử dụngdb = DatabaseManager("myapp.db")user_repo = UserRepository(db) # Create tableuser_repo.create_table() # Create usernew_user = User(username="john_doe", email="[email protected]", age=30)created_user = user_repo.create(new_user)print(f"Created user with ID: {created_user.id}") # Find useruser = user_repo.find_by_username("john_doe")if user: print(f"Found: {user.username} ({user.email})") # Update useruser.age = 31user_repo.update(user) # List all usersusers = user_repo.find_all()print(f"Total users: {len(users)}")
4. Query Builder
from typing import List, Any, Optional class QueryBuilder: """Simple query builder""" def __init__(self, table: str): self.table = table self._select: List[str] = ["*"] self._where: List[str] = [] self._params: List[Any] = [] self._order_by: Optional[str] = None self._limit: Optional[int] = None self._offset: Optional[int] = None def select(self, *columns: str) -> "QueryBuilder": """Select columns""" self._select = list(columns) return self def where(self, condition: str, *params: Any) -> "QueryBuilder": """Add WHERE condition""" self._where.append(condition) self._params.extend(params) return self def order_by(self, column: str, direction: str = "ASC") -> "QueryBuilder": """Add ORDER BY""" self._order_by = f"{column} {direction}" return self def limit(self, limit: int) -> "QueryBuilder": """Add LIMIT""" self._limit = limit return self def offset(self, offset: int) -> "QueryBuilder": """Add OFFSET""" self._offset = offset return self def build(self) -> tuple[str, List[Any]]: """Build SQL query""" # SELECT query = f"SELECT {', '.join(self._select)} FROM {self.table}" # WHERE if self._where: query += " WHERE " + " AND ".join(self._where) # ORDER BY if self._order_by: query += f" ORDER BY {self._order_by}" # LIMIT if self._limit: query += f" LIMIT {self._limit}" # OFFSET if self._offset: query += f" OFFSET {self._offset}" return query, self._params def execute(self, db: DatabaseManager) -> List[sqlite3.Row]: """Execute query""" query, params = self.build() return db.execute(query, tuple(params)) # Sử dụngdb = DatabaseManager("myapp.db") # Simple queryusers = ( QueryBuilder("users") .select("username", "email") .where("age > ?", 25) .order_by("username", "ASC") .limit(10) .execute(db)) # Complex queryactive_users = ( QueryBuilder("users") .where("age > ?", 18) .where("email LIKE ?", "%@example.com") .order_by("created_at", "DESC") .limit(5) .offset(10) .execute(db)) for user in active_users: print(dict(user))
5 Ứng Dụng Thực Tế
1. Blog System
import sqlite3from typing import List, Optionalfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass Post: id: Optional[int] = None title: str = "" content: str = "" author_id: int = 0 published: bool = False created_at: Optional[datetime] = None class BlogDatabase: """Blog database manager""" def __init__(self, db_path: str): self.db_path = db_path self.init_database() def init_database(self): """Initialize database schema""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Users table cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Posts table cursor.execute(""" CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, content TEXT NOT NULL, author_id INTEGER NOT NULL, published BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (author_id) REFERENCES users(id) ) """) # Comments table cursor.execute(""" CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, post_id INTEGER NOT NULL, user_id INTEGER NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (post_id) REFERENCES posts(id), FOREIGN KEY (user_id) REFERENCES users(id) ) """) # Tags table cursor.execute(""" CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ) """) # Post-Tag junction table cursor.execute(""" CREATE TABLE IF NOT EXISTS post_tags ( post_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (post_id, tag_id), FOREIGN KEY (post_id) REFERENCES posts(id), FOREIGN KEY (tag_id) REFERENCES tags(id) ) """) def create_post(self, title: str, content: str, author_id: int) -> int: """Create new post""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO posts (title, content, author_id) VALUES (?, ?, ?)", (title, content, author_id) ) return cursor.lastrowid def publish_post(self, post_id: int): """Publish a post""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "UPDATE posts SET published = 1 WHERE id = ?", (post_id,) ) def get_published_posts(self) -> List[dict]: """Get all published posts với author info""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT p.id, p.title, p.content, p.created_at, u.username as author FROM posts p JOIN users u ON p.author_id = u.id WHERE p.published = 1 ORDER BY p.created_at DESC """) return [dict(row) for row in cursor.fetchall()] def add_comment(self, post_id: int, user_id: int, content: str): """Add comment to post""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO comments (post_id, user_id, content) VALUES (?, ?, ?)", (post_id, user_id, content) ) def get_post_with_comments(self, post_id: int) -> dict: """Get post with all comments""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get post cursor.execute(""" SELECT p.*, u.username as author FROM posts p JOIN users u ON p.author_id = u.id WHERE p.id = ? """, (post_id,)) post = dict(cursor.fetchone()) # Get comments cursor.execute(""" SELECT c.content, c.created_at, u.username as author FROM comments c JOIN users u ON c.user_id = u.id WHERE c.post_id = ? ORDER BY c.created_at ASC """, (post_id,)) post["comments"] = [dict(row) for row in cursor.fetchall()] return post # Sử dụngblog = BlogDatabase("blog.db") # Create userwith sqlite3.connect("blog.db") as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO users (username, email) VALUES (?, ?)", ("john_doe", "[email protected]") ) author_id = cursor.lastrowid # Create and publish postpost_id = blog.create_post( "My First Post", "This is my first blog post!", author_id)blog.publish_post(post_id) # Add commentsblog.add_comment(post_id, author_id, "Great post!") # Get postsposts = blog.get_published_posts()for post in posts: print(f"{post['title']} by {post['author']}")
2. User Authentication System
import sqlite3import hashlibimport secretsfrom typing import Optionalfrom datetime import datetime, timedelta class AuthDatabase: """User authentication với password hashing""" def __init__(self, db_path: str): self.db_path = db_path self.init_database() def init_database(self): """Initialize auth tables""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, salt TEXT NOT NULL, is_active BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, token TEXT NOT NULL UNIQUE, expires_at TIMESTAMP NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) """) def _hash_password(self, password: str, salt: str) -> str: """Hash password với salt""" return hashlib.pbkdf2_hmac( "sha256", password.encode(), salt.encode(), 100000 ).hex() def register(self, username: str, email: str, password: str) -> Optional[int]: """Register new user""" salt = secrets.token_hex(16) password_hash = self._hash_password(password, salt) try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO users (username, email, password_hash, salt) VALUES (?, ?, ?, ?)", (username, email, password_hash, salt) ) return cursor.lastrowid except sqlite3.IntegrityError: return None # Username or email already exists def login(self, username: str, password: str) -> Optional[str]: """Login user và return session token""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get user cursor.execute( "SELECT * FROM users WHERE username = ? AND is_active = 1", (username,) ) user = cursor.fetchone() if not user: return None # Verify password password_hash = self._hash_password(password, user["salt"]) if password_hash != user["password_hash"]: return None # Create session token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(days=7) cursor.execute( "INSERT INTO sessions (user_id, token, expires_at) VALUES (?, ?, ?)", (user["id"], token, expires_at) ) return token def verify_session(self, token: str) -> Optional[dict]: """Verify session token""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT u.id, u.username, u.email FROM sessions s JOIN users u ON s.user_id = u.id WHERE s.token = ? AND s.expires_at > ? AND u.is_active = 1 """, (token, datetime.now())) user = cursor.fetchone() return dict(user) if user else None def logout(self, token: str): """Logout - delete session""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM sessions WHERE token = ?", (token,)) # Sử dụngauth = AuthDatabase("auth.db") # Registeruser_id = auth.register("john_doe", "[email protected]", "secure_password123")if user_id: print(f"✅ Registered user ID: {user_id}") # Logintoken = auth.login("john_doe", "secure_password123")if token: print(f"✅ Login successful, token: {token[:20]}...") # Verify session user = auth.verify_session(token) if user: print(f"✅ Session valid for: {user['username']}") # Logout auth.logout(token) print("✅ Logged out")
3. Caching Layer
import sqlite3import jsonimport timefrom typing import Any, Optional class DatabaseCache: """Database-backed cache với TTL""" def __init__(self, db_path: str): self.db_path = db_path self.init_database() def init_database(self): """Initialize cache table""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS cache ( key TEXT PRIMARY KEY, value TEXT NOT NULL, expires_at REAL NOT NULL ) """) # Index for cleanup cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_expires_at ON cache(expires_at) """) def set(self, key: str, value: Any, ttl: int = 3600): """Set cache với TTL (seconds)""" expires_at = time.time() + ttl value_json = json.dumps(value) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)", (key, value_json, expires_at) ) def get(self, key: str) -> Optional[Any]: """Get cache value""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT value, expires_at FROM cache WHERE key = ?", (key,) ) row = cursor.fetchone() if not row: return None # Check expiration if row["expires_at"] < time.time(): self.delete(key) return None return json.loads(row["value"]) def delete(self, key: str): """Delete cache entry""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM cache WHERE key = ?", (key,)) def cleanup(self): """Remove expired entries""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "DELETE FROM cache WHERE expires_at < ?", (time.time(),) ) return cursor.rowcount # Sử dụngcache = DatabaseCache("cache.db") # Set cachecache.set("user:123", {"name": "John", "email": "[email protected]"}, ttl=60) # Get cacheuser = cache.get("user:123")if user: print(f"Cached user: {user['name']}") # Cleanup expiredremoved = cache.cleanup()print(f"Cleaned up {removed} expired entries")
Bài tiếp theo: Bài 16.2: Working with Databases - PostgresQL