Bài 21: Python Advanced - Practices & Projects (Phần 4)
Hoàn thiện Project 3: RESTful API Client Library
Bước 2: Authentication
# api_client/auth.pyfrom typing import Dict, Optionalfrom abc import ABC, abstractmethod class AuthProvider(ABC): """Base authentication provider.""" @abstractmethod def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]: """Apply authentication to headers.""" pass class BearerAuth(AuthProvider): """Bearer token authentication.""" def __init__(self, token: str): self.token = token def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]: """Add Bearer token to headers.""" headers['Authorization'] = f'Bearer {self.token}' return headers class BasicAuth(AuthProvider): """Basic authentication.""" def __init__(self, username: str, password: str): import base64 credentials = f"{username}:{password}" self.token = base64.b64encode(credentials.encode()).decode() def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]: """Add Basic auth to headers.""" headers['Authorization'] = f'Basic {self.token}' return headers class APIKeyAuth(AuthProvider): """API key authentication.""" def __init__(self, api_key: str, header_name: str = 'X-API-Key'): self.api_key = api_key self.header_name = header_name def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]: """Add API key to headers.""" headers[self.header_name] = self.api_key return headers
Bước 3: Rate Limiting
# api_client/rate_limiter.pyimport asynciofrom datetime import datetime, timedeltafrom collections import dequeimport time class RateLimiter: """Rate limiter with token bucket algorithm.""" def __init__(self, requests_per_second: int = 10): """ Initialize rate limiter. Args: requests_per_second: Maximum requests per second """ self.requests_per_second = requests_per_second self.min_interval = 1.0 / requests_per_second self.last_request_time = None def acquire(self): """Wait if necessary to respect rate limit.""" if self.last_request_time: elapsed = time.time() - self.last_request_time if elapsed < self.min_interval: wait_time = self.min_interval - elapsed time.sleep(wait_time) self.last_request_time = time.time() class SlidingWindowRateLimiter: """Rate limiter with sliding window.""" def __init__(self, max_requests: int = 100, window_seconds: int = 60): """ Initialize sliding window rate limiter. Args: max_requests: Maximum requests in window window_seconds: Time window in seconds """ self.max_requests = max_requests self.window_seconds = window_seconds self.requests = deque() def acquire(self): """Wait if necessary to respect rate limit.""" now = datetime.now() window_start = now - timedelta(seconds=self.window_seconds) # Remove old requests while self.requests and self.requests[0] < window_start: self.requests.popleft() # Check if we can make request if len(self.requests) >= self.max_requests: # Calculate wait time oldest_request = self.requests[0] wait_until = oldest_request + timedelta(seconds=self.window_seconds) wait_seconds = (wait_until - now).total_seconds() if wait_seconds > 0: time.sleep(wait_seconds) return self.acquire() # Retry # Record request self.requests.append(now)
Bước 4: Pagination Support
# api_client/pagination.pyfrom typing import Iterator, Dict, Any, Optional, Callableimport logging logger = logging.getLogger(__name__) class PaginatedResponse: """Handle paginated API responses.""" def __init__(self, client: 'APIClient', endpoint: str, page_param: str = 'page', per_page_param: str = 'per_page', per_page: int = 100, **kwargs): """ Initialize paginated response. Args: client: API client instance endpoint: API endpoint page_param: Page parameter name per_page_param: Per page parameter name per_page: Items per page **kwargs: Additional request parameters """ self.client = client self.endpoint = endpoint self.page_param = page_param self.per_page_param = per_page_param self.per_page = per_page self.kwargs = kwargs def iter_pages(self) -> Iterator[Dict[str, Any]]: """Iterate through all pages.""" page = 1 while True: params = self.kwargs.get('params', {}).copy() params[self.page_param] = page params[self.per_page_param] = self.per_page self.kwargs['params'] = params try: response = self.client.get(self.endpoint, **self.kwargs) data = response.json() # Handle different response formats items = self._extract_items(data) if not items: break yield from items # Check if there are more pages if not self._has_next_page(data, items): break page += 1 except Exception as e: logger.error(f"Error fetching page {page}: {e}") break def _extract_items(self, data: Dict) -> list: """Extract items from response.""" # Try common patterns if isinstance(data, list): return data if 'items' in data: return data['items'] if 'results' in data: return data['results'] if 'data' in data: return data['data'] return [] def _has_next_page(self, data: Dict, items: list) -> bool: """Check if there are more pages.""" # If fewer items than per_page, no more pages if len(items) < self.per_page: return False # Check for next page indicator if 'has_next' in data: return data['has_next'] if 'next' in data: return data['next'] is not None # Assume there might be more if we got full page return True def all(self) -> list: """Get all items from all pages.""" return list(self.iter_pages())
Bước 5: Complete API Client
# api_client/client.py (Updated)from .auth import AuthProviderfrom .rate_limiter import RateLimiter, SlidingWindowRateLimiterfrom .pagination import PaginatedResponse class APIClient: """Complete API client with all features.""" def __init__(self, base_url: str, auth: AuthProvider = None, rate_limiter: RateLimiter = None, **kwargs): """ Initialize API client. Args: base_url: Base URL for API auth: Authentication provider rate_limiter: Rate limiter instance **kwargs: Additional HTTPClient parameters """ from .http_client import HTTPClient self.http_client = HTTPClient(base_url, **kwargs) self.auth = auth self.rate_limiter = rate_limiter # Add auth interceptor if self.auth: def auth_interceptor(method, url, kwargs): headers = kwargs.get('headers', {}) kwargs['headers'] = self.auth.apply_auth(headers) return kwargs self.http_client.add_request_interceptor(auth_interceptor) def request(self, method: str, endpoint: str, **kwargs): """Make request with rate limiting.""" if self.rate_limiter: self.rate_limiter.acquire() return self.http_client.request(method, endpoint, **kwargs) def get(self, endpoint: str, **kwargs): """GET request.""" return self.request('GET', endpoint, **kwargs) def post(self, endpoint: str, **kwargs): """POST request.""" return self.request('POST', endpoint, **kwargs) def put(self, endpoint: str, **kwargs): """PUT request.""" return self.request('PUT', endpoint, **kwargs) def patch(self, endpoint: str, **kwargs): """PATCH request.""" return self.request('PATCH', endpoint, **kwargs) def delete(self, endpoint: str, **kwargs): """DELETE request.""" return self.request('DELETE', endpoint, **kwargs) def paginate(self, endpoint: str, **kwargs) -> PaginatedResponse: """Get paginated response.""" return PaginatedResponse(self, endpoint, **kwargs) # Example usagefrom api_client import APIClient, BearerAuth, RateLimiter # Create clientclient = APIClient( base_url='https://api.github.com', auth=BearerAuth('your_token_here'), rate_limiter=RateLimiter(requests_per_second=5), timeout=30, max_retries=3) # Make requestsresponse = client.get('/user')user = response.json()print(f"User: {user['login']}") # Paginated requestrepos = client.paginate('/user/repos', params={'per_page': 100}).all()print(f"Total repos: {len(repos)}")
Bài Tập Tự Làm: Async Task Queue System 🔄
Mô tả: Xây dựng hệ thống task queue với async processing, priority queue, retry logic, và progress tracking.
Yêu cầu chức năng:
Core Features:
Task Management
- Submit tasks với priority
- Task scheduling (immediate, delayed, scheduled)
- Task status tracking (pending, running, completed, failed)
- Task retry với exponential backoff
- Task cancellation
- Task dependencies
Worker Pool
- Multiple async workers
- Worker health monitoring
- Auto-scaling workers
- Worker statistics
Queue Operations
- Priority queue (high, normal, low)
- FIFO within same priority
- Persistent queue (survive restarts)
- Queue statistics
Progress Tracking
- Real-time progress updates
- Progress callbacks
- ETA calculation
- Result storage
Hướng Dẫn Chi Tiết:
1. Thiết Kế Classes
"""Task Queue System Structure: task_queue/├── __init__.py├── models/│ ├── __init__.py│ ├── task.py # Task model│ └── result.py # Result model├── queue/│ ├── __init__.py│ ├── priority_queue.py # Priority queue│ └── persistent.py # Persistent storage├── workers/│ ├── __init__.py│ ├── worker.py # Worker implementation│ └── pool.py # Worker pool├── scheduler/│ ├── __init__.py│ └── scheduler.py # Task scheduler├── storage/│ ├── __init__.py│ └── database.py # SQLite/PostgreSQL storage├── main.py # Main queue manager└── cli.py # CLI interface""" from dataclasses import dataclassfrom typing import Callable, Any, Optional, Dictfrom datetime import datetimefrom enum import Enumimport uuid class TaskStatus(Enum): """Task status enumeration.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class TaskPriority(Enum): """Task priority levels.""" HIGH = 1 NORMAL = 5 LOW = 10 @dataclassclass Task: """ Task model. Attributes: id: Unique task ID func: Function to execute args: Function arguments kwargs: Function keyword arguments priority: Task priority max_retries: Maximum retry attempts retry_count: Current retry count status: Task status created_at: Creation timestamp started_at: Start timestamp completed_at: Completion timestamp result: Task result error: Error message if failed progress: Current progress (0-100) dependencies: List of task IDs this task depends on """ id: str func: Callable args: tuple kwargs: dict priority: TaskPriority = TaskPriority.NORMAL max_retries: int = 3 retry_count: int = 0 status: TaskStatus = TaskStatus.PENDING created_at: datetime = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None result: Any = None error: Optional[str] = None progress: float = 0.0 dependencies: list = None def __post_init__(self): if self.created_at is None: self.created_at = datetime.now() if self.dependencies is None: self.dependencies = []
2. Priority Queue Implementation
# queue/priority_queue.pyimport heapqfrom typing import Optional, Listimport asynciofrom datetime import datetime class PriorityQueue: """ Priority queue for tasks. Implementation: - Use heapq for priority queue - FIFO within same priority - Thread-safe with asyncio.Lock Methods: - put(task): Add task to queue - get(): Get highest priority task - peek(): Look at next task without removing - size(): Get queue size - is_empty(): Check if queue is empty - clear(): Clear all tasks - get_tasks_by_status(status): Filter tasks """ def __init__(self): self._queue = [] self._lock = asyncio.Lock() self._counter = 0 # For FIFO within same priority async def put(self, task: Task): """ Add task to queue. Priority tuple: (priority_value, counter, task) - priority_value: Lower is higher priority - counter: Ensures FIFO for same priority - task: The actual task """ async with self._lock: heapq.heappush( self._queue, (task.priority.value, self._counter, task) ) self._counter += 1 async def get(self) -> Optional[Task]: """Get and remove highest priority task.""" async with self._lock: if self._queue: _, _, task = heapq.heappop(self._queue) return task return None async def peek(self) -> Optional[Task]: """Look at next task without removing.""" async with self._lock: if self._queue: _, _, task = self._queue[0] return task return None async def size(self) -> int: """Get queue size.""" async with self._lock: return len(self._queue) async def is_empty(self) -> bool: """Check if queue is empty.""" return await self.size() == 0
3. Worker Implementation
# workers/worker.pyimport asynciofrom typing import Optionalimport logging logger = logging.getLogger(__name__) class Worker: """ Async worker for processing tasks. Attributes: worker_id: Unique worker ID queue: Reference to task queue is_running: Worker status current_task: Currently processing task tasks_completed: Number of completed tasks tasks_failed: Number of failed tasks Methods: start(): Start worker stop(): Stop worker process_task(task): Process single task retry_task(task): Retry failed task get_statistics(): Get worker stats """ def __init__(self, worker_id: str, queue: PriorityQueue): self.worker_id = worker_id self.queue = queue self.is_running = False self.current_task: Optional[Task] = None self.tasks_completed = 0 self.tasks_failed = 0 async def start(self): """Start worker loop.""" self.is_running = True logger.info(f"Worker {self.worker_id} started") while self.is_running: try: # Get task from queue task = await self.queue.get() if task is None: await asyncio.sleep(0.1) # No tasks, wait continue # Process task await self.process_task(task) except Exception as e: logger.error(f"Worker {self.worker_id} error: {e}") async def process_task(self, task: Task): """ Process single task. Steps: 1. Check dependencies 2. Update status to RUNNING 3. Execute task function 4. Handle success/failure 5. Update progress and result """ self.current_task = task # Update status task.status = TaskStatus.RUNNING task.started_at = datetime.now() try: logger.info(f"Worker {self.worker_id} processing task {task.id}") # Execute task if asyncio.iscoroutinefunction(task.func): result = await task.func(*task.args, **task.kwargs) else: result = task.func(*task.args, **task.kwargs) # Success task.status = TaskStatus.COMPLETED task.result = result task.progress = 100.0 task.completed_at = datetime.now() self.tasks_completed += 1 logger.info(f"Task {task.id} completed by worker {self.worker_id}") except Exception as e: # Failed logger.error(f"Task {task.id} failed: {e}") task.error = str(e) # Retry if possible if task.retry_count < task.max_retries: task.retry_count += 1 task.status = TaskStatus.PENDING await self.queue.put(task) # Re-queue logger.info(f"Task {task.id} re-queued (retry {task.retry_count})") else: task.status = TaskStatus.FAILED task.completed_at = datetime.now() self.tasks_failed += 1 finally: self.current_task = None async def stop(self): """Stop worker.""" self.is_running = False logger.info(f"Worker {self.worker_id} stopped")
4. Worker Pool
# workers/pool.pyimport asynciofrom typing import List class WorkerPool: """ Manage pool of workers. Attributes: queue: Task queue num_workers: Number of workers workers: List of worker instances Methods: start(): Start all workers stop(): Stop all workers scale(num): Scale worker count get_statistics(): Get pool statistics """ def __init__(self, queue: PriorityQueue, num_workers: int = 4): self.queue = queue self.num_workers = num_workers self.workers: List[Worker] = [] async def start(self): """Start all workers.""" for i in range(self.num_workers): worker = Worker(f"worker-{i}", self.queue) self.workers.append(worker) asyncio.create_task(worker.start()) async def stop(self): """Stop all workers.""" for worker in self.workers: await worker.stop() def get_statistics(self) -> dict: """Get pool statistics.""" return { 'total_workers': len(self.workers), 'active_workers': sum(1 for w in self.workers if w.current_task), 'total_completed': sum(w.tasks_completed for w in self.workers), 'total_failed': sum(w.tasks_failed for w in self.workers) }
5. Task Queue Manager
# main.pyimport asynciofrom typing import Callable, Any class TaskQueueManager: """ Main task queue manager. Usage: manager = TaskQueueManager(num_workers=4) await manager.start() # Submit task task_id = await manager.submit_task( my_function, args=(1, 2), priority=TaskPriority.HIGH ) # Get result result = await manager.get_result(task_id) await manager.stop() Methods: submit_task(func, args, kwargs, priority): Submit task get_task(task_id): Get task by ID cancel_task(task_id): Cancel task get_result(task_id, timeout): Wait for result get_statistics(): Get system statistics """ def __init__(self, num_workers: int = 4): self.queue = PriorityQueue() self.pool = WorkerPool(self.queue, num_workers) self.tasks = {} # task_id -> Task async def start(self): """Start queue manager.""" await self.pool.start() async def stop(self): """Stop queue manager.""" await self.pool.stop() async def submit_task(self, func: Callable, args: tuple = (), kwargs: dict = None, priority: TaskPriority = TaskPriority.NORMAL, max_retries: int = 3) -> str: """Submit task to queue.""" task = Task( id=str(uuid.uuid4()), func=func, args=args, kwargs=kwargs or {}, priority=priority, max_retries=max_retries ) self.tasks[task.id] = task await self.queue.put(task) return task.id async def get_result(self, task_id: str, timeout: float = None) -> Any: """Wait for task result.""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") start_time = asyncio.get_event_loop().time() while task.status not in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]: await asyncio.sleep(0.1) if timeout: elapsed = asyncio.get_event_loop().time() - start_time if elapsed > timeout: raise TimeoutError(f"Task {task_id} timeout") if task.status == TaskStatus.COMPLETED: return task.result elif task.status == TaskStatus.FAILED: raise Exception(f"Task failed: {task.error}") else: raise Exception(f"Task cancelled")
6. Example Usage
# examples/usage.pyimport asyncioimport time async def example_task(duration: int, name: str): """Example async task.""" print(f"Task {name} started (duration: {duration}s)") await asyncio.sleep(duration) print(f"Task {name} completed") return f"Result from {name}" async def main(): """Example usage.""" # Create manager manager = TaskQueueManager(num_workers=3) await manager.start() # Submit tasks task_ids = [] # High priority tasks for i in range(3): task_id = await manager.submit_task( example_task, args=(2, f"high-{i}"), priority=TaskPriority.HIGH ) task_ids.append(task_id) # Normal priority tasks for i in range(5): task_id = await manager.submit_task( example_task, args=(1, f"normal-{i}"), priority=TaskPriority.NORMAL ) task_ids.append(task_id) # Wait for results print("\nWaiting for results...") for task_id in task_ids: try: result = await manager.get_result(task_id, timeout=30) print(f"Result: {result}") except Exception as e: print(f"Error: {e}") # Statistics stats = manager.pool.get_statistics() print(f"\nStatistics: {stats}") await manager.stop() if __name__ == '__main__': asyncio.run(main())
7. Testing Requirements
# tests/test_queue.pyimport pytestimport asyncio @pytest.mark.asyncioasync def test_priority_queue(): """Test priority queue.""" queue = PriorityQueue() # Add tasks high_task = Task(id="1", func=lambda: None, priority=TaskPriority.HIGH) normal_task = Task(id="2", func=lambda: None, priority=TaskPriority.NORMAL) low_task = Task(id="3", func=lambda: None, priority=TaskPriority.LOW) await queue.put(normal_task) await queue.put(low_task) await queue.put(high_task) # Should get high priority first task = await queue.get() assert task.id == "1" task = await queue.get() assert task.id == "2" task = await queue.get() assert task.id == "3" @pytest.mark.asyncioasync def test_worker_retry(): """Test worker retry logic.""" queue = PriorityQueue() worker = Worker("test-worker", queue) # Create failing task def failing_task(): raise Exception("Test error") task = Task( id="fail-task", func=failing_task, max_retries=2 ) await queue.put(task) # Process task (should retry) await worker.process_task(task) # Should be re-queued assert task.retry_count == 1 assert task.status == TaskStatus.PENDING
Mở rộng thêm:
- Persistent Queue: Lưu tasks vào database
- Web Dashboard: Monitor tasks với Flask/FastAPI
- Distributed Queue: Multiple machines với Redis
- Task Chaining: Pipeline of tasks
- Scheduled Tasks: Cron-like scheduling
- Task Results Cache: Cache results với TTL
- Dead Letter Queue: Failed tasks handling
- Metrics & Monitoring: Prometheus metrics
Tổng Kết Module Python Advanced 🎓
Đã Học Được:
Core Concepts:
- ✅ Decorators & Generators
- ✅ Context Managers
- ✅ Iterators & Iterables
- ✅ Metaclasses
- ✅ Multi-threading & Multi-processing
- ✅ Async Programming (asyncio, aiohttp)
Advanced Topics:
- ✅ Regular Expressions
- ✅ File I/O Advanced
- ✅ JSON & CSV Processing
- ✅ Testing (unittest, pytest)
- ✅ Error Handling
- ✅ Type Hints
- ✅ Functional Programming
Professional Development:
- ✅ Virtual Environments
- ✅ Package Structure
- ✅ Working with Databases
- ✅ Logging
- ✅ Performance Optimization
- ✅ Best Practices (SOLID, Design Patterns)
- ✅ Working with APIs
Projects Completed:
- ✅ Async Web Scraper với caching, retry, rate limiting
- ✅ Data Processing Pipeline với generators
- ✅ RESTful API Client Library
- ✅ Task Queue System (bài tập tự làm)
Kỹ Năng Đạt Được:
- 🎯 Viết async code hiệu quả
- 🎯 Design patterns và SOLID principles
- 🎯 Testing và debugging
- 🎯 Performance optimization
- 🎯 Database operations
- 🎯 API integration
- 🎯 Production-ready code
Roadmap Tiếp Theo:
Module 03: Django Framework 🌐
- Django basics
- Models & ORM
- Views & Templates
- Forms & Validation
- Authentication & Authorization
- REST APIs với Django REST Framework
- Deployment
Module 04: Advanced Django 🚀
- Celery task queue
- WebSockets với Channels
- GraphQL
- Microservices architecture
- Docker & Kubernetes
- CI/CD pipelines
Tips Để Thành Công:
- Practice Daily - Code mỗi ngày
- Build Projects - Áp dụng vào projects thực tế
- Read Code - Đọc code của others trên GitHub
- Contribute - Contribute to open source
- Document - Viết documentation tốt
- Test - Write tests cho code
- Optimize - Profile và optimize performance
- Share - Chia sẻ kiến thức với community
Keep coding and never stop learning! 💻🚀