Bài 21: Python Advanced - Practices & Projects (Phần 4)

Hoàn thiện Project 3: RESTful API Client Library

Bước 2: Authentication

# api_client/auth.pyfrom typing import Dict, Optionalfrom abc import ABC, abstractmethod class AuthProvider(ABC):    """Base authentication provider."""        @abstractmethod    def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]:        """Apply authentication to headers."""        pass class BearerAuth(AuthProvider):    """Bearer token authentication."""        def __init__(self, token: str):        self.token = token        def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]:        """Add Bearer token to headers."""        headers['Authorization'] = f'Bearer {self.token}'        return headers class BasicAuth(AuthProvider):    """Basic authentication."""        def __init__(self, username: str, password: str):        import base64        credentials = f"{username}:{password}"        self.token = base64.b64encode(credentials.encode()).decode()        def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]:        """Add Basic auth to headers."""        headers['Authorization'] = f'Basic {self.token}'        return headers class APIKeyAuth(AuthProvider):    """API key authentication."""        def __init__(self, api_key: str, header_name: str = 'X-API-Key'):        self.api_key = api_key        self.header_name = header_name        def apply_auth(self, headers: Dict[str, str]) -> Dict[str, str]:        """Add API key to headers."""        headers[self.header_name] = self.api_key        return headers

Bước 3: Rate Limiting

# api_client/rate_limiter.pyimport asynciofrom datetime import datetime, timedeltafrom collections import dequeimport time class RateLimiter:    """Rate limiter with token bucket algorithm."""        def __init__(self, requests_per_second: int = 10):        """        Initialize rate limiter.                Args:            requests_per_second: Maximum requests per second        """        self.requests_per_second = requests_per_second        self.min_interval = 1.0 / requests_per_second        self.last_request_time = None        def acquire(self):        """Wait if necessary to respect rate limit."""        if self.last_request_time:            elapsed = time.time() - self.last_request_time            if elapsed < self.min_interval:                wait_time = self.min_interval - elapsed                time.sleep(wait_time)                self.last_request_time = time.time() class SlidingWindowRateLimiter:    """Rate limiter with sliding window."""        def __init__(self, max_requests: int = 100, window_seconds: int = 60):        """        Initialize sliding window rate limiter.                Args:            max_requests: Maximum requests in window            window_seconds: Time window in seconds        """        self.max_requests = max_requests        self.window_seconds = window_seconds        self.requests = deque()        def acquire(self):        """Wait if necessary to respect rate limit."""        now = datetime.now()        window_start = now - timedelta(seconds=self.window_seconds)                # Remove old requests        while self.requests and self.requests[0] < window_start:            self.requests.popleft()                # Check if we can make request        if len(self.requests) >= self.max_requests:            # Calculate wait time            oldest_request = self.requests[0]            wait_until = oldest_request + timedelta(seconds=self.window_seconds)            wait_seconds = (wait_until - now).total_seconds()                        if wait_seconds > 0:                time.sleep(wait_seconds)                return self.acquire()  # Retry                # Record request        self.requests.append(now)

Bước 4: Pagination Support

# api_client/pagination.pyfrom typing import Iterator, Dict, Any, Optional, Callableimport logging logger = logging.getLogger(__name__) class PaginatedResponse:    """Handle paginated API responses."""        def __init__(self, client: 'APIClient', endpoint: str,                 page_param: str = 'page', per_page_param: str = 'per_page',                 per_page: int = 100, **kwargs):        """        Initialize paginated response.                Args:            client: API client instance            endpoint: API endpoint            page_param: Page parameter name            per_page_param: Per page parameter name            per_page: Items per page            **kwargs: Additional request parameters        """        self.client = client        self.endpoint = endpoint        self.page_param = page_param        self.per_page_param = per_page_param        self.per_page = per_page        self.kwargs = kwargs        def iter_pages(self) -> Iterator[Dict[str, Any]]:        """Iterate through all pages."""        page = 1                while True:            params = self.kwargs.get('params', {}).copy()            params[self.page_param] = page            params[self.per_page_param] = self.per_page                        self.kwargs['params'] = params                        try:                response = self.client.get(self.endpoint, **self.kwargs)                data = response.json()                                # Handle different response formats                items = self._extract_items(data)                                if not items:                    break                                yield from items                                # Check if there are more pages                if not self._has_next_page(data, items):                    break                                page += 1                        except Exception as e:                logger.error(f"Error fetching page {page}: {e}")                break        def _extract_items(self, data: Dict) -> list:        """Extract items from response."""        # Try common patterns        if isinstance(data, list):            return data        if 'items' in data:            return data['items']        if 'results' in data:            return data['results']        if 'data' in data:            return data['data']        return []        def _has_next_page(self, data: Dict, items: list) -> bool:        """Check if there are more pages."""        # If fewer items than per_page, no more pages        if len(items) < self.per_page:            return False                # Check for next page indicator        if 'has_next' in data:            return data['has_next']        if 'next' in data:            return data['next'] is not None                # Assume there might be more if we got full page        return True        def all(self) -> list:        """Get all items from all pages."""        return list(self.iter_pages())

Bước 5: Complete API Client

# api_client/client.py (Updated)from .auth import AuthProviderfrom .rate_limiter import RateLimiter, SlidingWindowRateLimiterfrom .pagination import PaginatedResponse class APIClient:    """Complete API client with all features."""        def __init__(self, base_url: str, auth: AuthProvider = None,                 rate_limiter: RateLimiter = None, **kwargs):        """        Initialize API client.                Args:            base_url: Base URL for API            auth: Authentication provider            rate_limiter: Rate limiter instance            **kwargs: Additional HTTPClient parameters        """        from .http_client import HTTPClient                self.http_client = HTTPClient(base_url, **kwargs)        self.auth = auth        self.rate_limiter = rate_limiter                # Add auth interceptor        if self.auth:            def auth_interceptor(method, url, kwargs):                headers = kwargs.get('headers', {})                kwargs['headers'] = self.auth.apply_auth(headers)                return kwargs                        self.http_client.add_request_interceptor(auth_interceptor)        def request(self, method: str, endpoint: str, **kwargs):        """Make request with rate limiting."""        if self.rate_limiter:            self.rate_limiter.acquire()                return self.http_client.request(method, endpoint, **kwargs)        def get(self, endpoint: str, **kwargs):        """GET request."""        return self.request('GET', endpoint, **kwargs)        def post(self, endpoint: str, **kwargs):        """POST request."""        return self.request('POST', endpoint, **kwargs)        def put(self, endpoint: str, **kwargs):        """PUT request."""        return self.request('PUT', endpoint, **kwargs)        def patch(self, endpoint: str, **kwargs):        """PATCH request."""        return self.request('PATCH', endpoint, **kwargs)        def delete(self, endpoint: str, **kwargs):        """DELETE request."""        return self.request('DELETE', endpoint, **kwargs)        def paginate(self, endpoint: str, **kwargs) -> PaginatedResponse:        """Get paginated response."""        return PaginatedResponse(self, endpoint, **kwargs) # Example usagefrom api_client import APIClient, BearerAuth, RateLimiter # Create clientclient = APIClient(    base_url='https://api.github.com',    auth=BearerAuth('your_token_here'),    rate_limiter=RateLimiter(requests_per_second=5),    timeout=30,    max_retries=3) # Make requestsresponse = client.get('/user')user = response.json()print(f"User: {user['login']}") # Paginated requestrepos = client.paginate('/user/repos', params={'per_page': 100}).all()print(f"Total repos: {len(repos)}")

Bài Tập Tự Làm: Async Task Queue System 🔄

Mô tả: Xây dựng hệ thống task queue với async processing, priority queue, retry logic, và progress tracking.

Yêu cầu chức năng:

Core Features:

  1. Task Management

    • Submit tasks với priority
    • Task scheduling (immediate, delayed, scheduled)
    • Task status tracking (pending, running, completed, failed)
    • Task retry với exponential backoff
    • Task cancellation
    • Task dependencies
  2. Worker Pool

    • Multiple async workers
    • Worker health monitoring
    • Auto-scaling workers
    • Worker statistics
  3. Queue Operations

    • Priority queue (high, normal, low)
    • FIFO within same priority
    • Persistent queue (survive restarts)
    • Queue statistics
  4. Progress Tracking

    • Real-time progress updates
    • Progress callbacks
    • ETA calculation
    • Result storage

Hướng Dẫn Chi Tiết:

1. Thiết Kế Classes

"""Task Queue System Structure: task_queue/├── __init__.py├── models/│   ├── __init__.py│   ├── task.py          # Task model│   └── result.py        # Result model├── queue/│   ├── __init__.py│   ├── priority_queue.py # Priority queue│   └── persistent.py    # Persistent storage├── workers/│   ├── __init__.py│   ├── worker.py        # Worker implementation│   └── pool.py          # Worker pool├── scheduler/│   ├── __init__.py│   └── scheduler.py     # Task scheduler├── storage/│   ├── __init__.py│   └── database.py      # SQLite/PostgreSQL storage├── main.py              # Main queue manager└── cli.py               # CLI interface""" from dataclasses import dataclassfrom typing import Callable, Any, Optional, Dictfrom datetime import datetimefrom enum import Enumimport uuid class TaskStatus(Enum):    """Task status enumeration."""    PENDING = "pending"    RUNNING = "running"    COMPLETED = "completed"    FAILED = "failed"    CANCELLED = "cancelled" class TaskPriority(Enum):    """Task priority levels."""    HIGH = 1    NORMAL = 5    LOW = 10 @dataclassclass Task:    """    Task model.        Attributes:        id: Unique task ID        func: Function to execute        args: Function arguments        kwargs: Function keyword arguments        priority: Task priority        max_retries: Maximum retry attempts        retry_count: Current retry count        status: Task status        created_at: Creation timestamp        started_at: Start timestamp        completed_at: Completion timestamp        result: Task result        error: Error message if failed        progress: Current progress (0-100)        dependencies: List of task IDs this task depends on    """    id: str    func: Callable    args: tuple    kwargs: dict    priority: TaskPriority = TaskPriority.NORMAL    max_retries: int = 3    retry_count: int = 0    status: TaskStatus = TaskStatus.PENDING    created_at: datetime = None    started_at: Optional[datetime] = None    completed_at: Optional[datetime] = None    result: Any = None    error: Optional[str] = None    progress: float = 0.0    dependencies: list = None        def __post_init__(self):        if self.created_at is None:            self.created_at = datetime.now()        if self.dependencies is None:            self.dependencies = []

2. Priority Queue Implementation

# queue/priority_queue.pyimport heapqfrom typing import Optional, Listimport asynciofrom datetime import datetime class PriorityQueue:    """    Priority queue for tasks.        Implementation:    - Use heapq for priority queue    - FIFO within same priority    - Thread-safe with asyncio.Lock        Methods:    - put(task): Add task to queue    - get(): Get highest priority task    - peek(): Look at next task without removing    - size(): Get queue size    - is_empty(): Check if queue is empty    - clear(): Clear all tasks    - get_tasks_by_status(status): Filter tasks    """        def __init__(self):        self._queue = []        self._lock = asyncio.Lock()        self._counter = 0  # For FIFO within same priority        async def put(self, task: Task):        """        Add task to queue.                Priority tuple: (priority_value, counter, task)        - priority_value: Lower is higher priority        - counter: Ensures FIFO for same priority        - task: The actual task        """        async with self._lock:            heapq.heappush(                self._queue,                (task.priority.value, self._counter, task)            )            self._counter += 1        async def get(self) -> Optional[Task]:        """Get and remove highest priority task."""        async with self._lock:            if self._queue:                _, _, task = heapq.heappop(self._queue)                return task            return None        async def peek(self) -> Optional[Task]:        """Look at next task without removing."""        async with self._lock:            if self._queue:                _, _, task = self._queue[0]                return task            return None        async def size(self) -> int:        """Get queue size."""        async with self._lock:            return len(self._queue)        async def is_empty(self) -> bool:        """Check if queue is empty."""        return await self.size() == 0

3. Worker Implementation

# workers/worker.pyimport asynciofrom typing import Optionalimport logging logger = logging.getLogger(__name__) class Worker:    """    Async worker for processing tasks.        Attributes:        worker_id: Unique worker ID        queue: Reference to task queue        is_running: Worker status        current_task: Currently processing task        tasks_completed: Number of completed tasks        tasks_failed: Number of failed tasks        Methods:        start(): Start worker        stop(): Stop worker        process_task(task): Process single task        retry_task(task): Retry failed task        get_statistics(): Get worker stats    """        def __init__(self, worker_id: str, queue: PriorityQueue):        self.worker_id = worker_id        self.queue = queue        self.is_running = False        self.current_task: Optional[Task] = None        self.tasks_completed = 0        self.tasks_failed = 0        async def start(self):        """Start worker loop."""        self.is_running = True        logger.info(f"Worker {self.worker_id} started")                while self.is_running:            try:                # Get task from queue                task = await self.queue.get()                                if task is None:                    await asyncio.sleep(0.1)  # No tasks, wait                    continue                                # Process task                await self.process_task(task)                        except Exception as e:                logger.error(f"Worker {self.worker_id} error: {e}")        async def process_task(self, task: Task):        """        Process single task.                Steps:        1. Check dependencies        2. Update status to RUNNING        3. Execute task function        4. Handle success/failure        5. Update progress and result        """        self.current_task = task                # Update status        task.status = TaskStatus.RUNNING        task.started_at = datetime.now()                try:            logger.info(f"Worker {self.worker_id} processing task {task.id}")                        # Execute task            if asyncio.iscoroutinefunction(task.func):                result = await task.func(*task.args, **task.kwargs)            else:                result = task.func(*task.args, **task.kwargs)                        # Success            task.status = TaskStatus.COMPLETED            task.result = result            task.progress = 100.0            task.completed_at = datetime.now()            self.tasks_completed += 1                        logger.info(f"Task {task.id} completed by worker {self.worker_id}")                except Exception as e:            # Failed            logger.error(f"Task {task.id} failed: {e}")            task.error = str(e)                        # Retry if possible            if task.retry_count < task.max_retries:                task.retry_count += 1                task.status = TaskStatus.PENDING                await self.queue.put(task)  # Re-queue                logger.info(f"Task {task.id} re-queued (retry {task.retry_count})")            else:                task.status = TaskStatus.FAILED                task.completed_at = datetime.now()                self.tasks_failed += 1                finally:            self.current_task = None        async def stop(self):        """Stop worker."""        self.is_running = False        logger.info(f"Worker {self.worker_id} stopped")

4. Worker Pool

# workers/pool.pyimport asynciofrom typing import List class WorkerPool:    """    Manage pool of workers.        Attributes:        queue: Task queue        num_workers: Number of workers        workers: List of worker instances        Methods:        start(): Start all workers        stop(): Stop all workers        scale(num): Scale worker count        get_statistics(): Get pool statistics    """        def __init__(self, queue: PriorityQueue, num_workers: int = 4):        self.queue = queue        self.num_workers = num_workers        self.workers: List[Worker] = []        async def start(self):        """Start all workers."""        for i in range(self.num_workers):            worker = Worker(f"worker-{i}", self.queue)            self.workers.append(worker)            asyncio.create_task(worker.start())        async def stop(self):        """Stop all workers."""        for worker in self.workers:            await worker.stop()        def get_statistics(self) -> dict:        """Get pool statistics."""        return {            'total_workers': len(self.workers),            'active_workers': sum(1 for w in self.workers if w.current_task),            'total_completed': sum(w.tasks_completed for w in self.workers),            'total_failed': sum(w.tasks_failed for w in self.workers)        }

5. Task Queue Manager

# main.pyimport asynciofrom typing import Callable, Any class TaskQueueManager:    """    Main task queue manager.        Usage:        manager = TaskQueueManager(num_workers=4)        await manager.start()                # Submit task        task_id = await manager.submit_task(            my_function,            args=(1, 2),            priority=TaskPriority.HIGH        )                # Get result        result = await manager.get_result(task_id)                await manager.stop()        Methods:        submit_task(func, args, kwargs, priority): Submit task        get_task(task_id): Get task by ID        cancel_task(task_id): Cancel task        get_result(task_id, timeout): Wait for result        get_statistics(): Get system statistics    """        def __init__(self, num_workers: int = 4):        self.queue = PriorityQueue()        self.pool = WorkerPool(self.queue, num_workers)        self.tasks = {}  # task_id -> Task        async def start(self):        """Start queue manager."""        await self.pool.start()        async def stop(self):        """Stop queue manager."""        await self.pool.stop()        async def submit_task(self, func: Callable, args: tuple = (),                         kwargs: dict = None, priority: TaskPriority = TaskPriority.NORMAL,                         max_retries: int = 3) -> str:        """Submit task to queue."""        task = Task(            id=str(uuid.uuid4()),            func=func,            args=args,            kwargs=kwargs or {},            priority=priority,            max_retries=max_retries        )                self.tasks[task.id] = task        await self.queue.put(task)                return task.id        async def get_result(self, task_id: str, timeout: float = None) -> Any:        """Wait for task result."""        task = self.tasks.get(task_id)        if not task:            raise ValueError(f"Task {task_id} not found")                start_time = asyncio.get_event_loop().time()                while task.status not in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:            await asyncio.sleep(0.1)                        if timeout:                elapsed = asyncio.get_event_loop().time() - start_time                if elapsed > timeout:                    raise TimeoutError(f"Task {task_id} timeout")                if task.status == TaskStatus.COMPLETED:            return task.result        elif task.status == TaskStatus.FAILED:            raise Exception(f"Task failed: {task.error}")        else:            raise Exception(f"Task cancelled")

6. Example Usage

# examples/usage.pyimport asyncioimport time async def example_task(duration: int, name: str):    """Example async task."""    print(f"Task {name} started (duration: {duration}s)")    await asyncio.sleep(duration)    print(f"Task {name} completed")    return f"Result from {name}" async def main():    """Example usage."""    # Create manager    manager = TaskQueueManager(num_workers=3)    await manager.start()        # Submit tasks    task_ids = []        # High priority tasks    for i in range(3):        task_id = await manager.submit_task(            example_task,            args=(2, f"high-{i}"),            priority=TaskPriority.HIGH        )        task_ids.append(task_id)        # Normal priority tasks    for i in range(5):        task_id = await manager.submit_task(            example_task,            args=(1, f"normal-{i}"),            priority=TaskPriority.NORMAL        )        task_ids.append(task_id)        # Wait for results    print("\nWaiting for results...")    for task_id in task_ids:        try:            result = await manager.get_result(task_id, timeout=30)            print(f"Result: {result}")        except Exception as e:            print(f"Error: {e}")        # Statistics    stats = manager.pool.get_statistics()    print(f"\nStatistics: {stats}")        await manager.stop() if __name__ == '__main__':    asyncio.run(main())

7. Testing Requirements

# tests/test_queue.pyimport pytestimport asyncio @pytest.mark.asyncioasync def test_priority_queue():    """Test priority queue."""    queue = PriorityQueue()        # Add tasks    high_task = Task(id="1", func=lambda: None, priority=TaskPriority.HIGH)    normal_task = Task(id="2", func=lambda: None, priority=TaskPriority.NORMAL)    low_task = Task(id="3", func=lambda: None, priority=TaskPriority.LOW)        await queue.put(normal_task)    await queue.put(low_task)    await queue.put(high_task)        # Should get high priority first    task = await queue.get()    assert task.id == "1"        task = await queue.get()    assert task.id == "2"        task = await queue.get()    assert task.id == "3" @pytest.mark.asyncioasync def test_worker_retry():    """Test worker retry logic."""    queue = PriorityQueue()    worker = Worker("test-worker", queue)        # Create failing task    def failing_task():        raise Exception("Test error")        task = Task(        id="fail-task",        func=failing_task,        max_retries=2    )        await queue.put(task)        # Process task (should retry)    await worker.process_task(task)        # Should be re-queued    assert task.retry_count == 1    assert task.status == TaskStatus.PENDING

Mở rộng thêm:

  1. Persistent Queue: Lưu tasks vào database
  2. Web Dashboard: Monitor tasks với Flask/FastAPI
  3. Distributed Queue: Multiple machines với Redis
  4. Task Chaining: Pipeline of tasks
  5. Scheduled Tasks: Cron-like scheduling
  6. Task Results Cache: Cache results với TTL
  7. Dead Letter Queue: Failed tasks handling
  8. Metrics & Monitoring: Prometheus metrics

Tổng Kết Module Python Advanced 🎓

Đã Học Được:

Core Concepts:

  • ✅ Decorators & Generators
  • ✅ Context Managers
  • ✅ Iterators & Iterables
  • ✅ Metaclasses
  • ✅ Multi-threading & Multi-processing
  • ✅ Async Programming (asyncio, aiohttp)

Advanced Topics:

  • ✅ Regular Expressions
  • ✅ File I/O Advanced
  • ✅ JSON & CSV Processing
  • ✅ Testing (unittest, pytest)
  • ✅ Error Handling
  • ✅ Type Hints
  • ✅ Functional Programming

Professional Development:

  • ✅ Virtual Environments
  • ✅ Package Structure
  • ✅ Working with Databases
  • ✅ Logging
  • ✅ Performance Optimization
  • ✅ Best Practices (SOLID, Design Patterns)
  • ✅ Working with APIs

Projects Completed:

  1. ✅ Async Web Scraper với caching, retry, rate limiting
  2. ✅ Data Processing Pipeline với generators
  3. ✅ RESTful API Client Library
  4. ✅ Task Queue System (bài tập tự làm)

Kỹ Năng Đạt Được:

  • 🎯 Viết async code hiệu quả
  • 🎯 Design patterns và SOLID principles
  • 🎯 Testing và debugging
  • 🎯 Performance optimization
  • 🎯 Database operations
  • 🎯 API integration
  • 🎯 Production-ready code

Roadmap Tiếp Theo:

Module 03: Django Framework 🌐

  • Django basics
  • Models & ORM
  • Views & Templates
  • Forms & Validation
  • Authentication & Authorization
  • REST APIs với Django REST Framework
  • Deployment

Module 04: Advanced Django 🚀

  • Celery task queue
  • WebSockets với Channels
  • GraphQL
  • Microservices architecture
  • Docker & Kubernetes
  • CI/CD pipelines

Tips Để Thành Công:

  1. Practice Daily - Code mỗi ngày
  2. Build Projects - Áp dụng vào projects thực tế
  3. Read Code - Đọc code của others trên GitHub
  4. Contribute - Contribute to open source
  5. Document - Viết documentation tốt
  6. Test - Write tests cho code
  7. Optimize - Profile và optimize performance
  8. Share - Chia sẻ kiến thức với community

Keep coding and never stop learning! 💻🚀