Bài 21: Python Advanced - Practices & Projects (Phần 1)
Mục Tiêu Bài Học
Sau khi hoàn thành bài này, bạn sẽ:
- ✅ Tổng hợp tất cả kiến thức Python Advanced
- ✅ Xây dựng các ứng dụng thực tế với async programming
- ✅ Áp dụng decorators, generators, context managers
- ✅ Làm việc với APIs và databases
- ✅ Testing và performance optimization
- ✅ Viết production-ready code
Tổng Quan
Module Python Advanced đã hoàn thành với 20 bài học:
- ✅ Decorators
- ✅ Generators & Iterators
- ✅ Context Managers
- ✅ Iterators & Iterables
- ✅ Metaclasses
- ✅ Multi-threading
- ✅ Multi-processing
- ✅ Async Programming
- ✅ Regular Expressions
- ✅ File I/O Advanced
- ✅ JSON & CSV
- ✅ Testing (unittest)
- ✅ Testing (pytest)
- ✅ Error Handling Advanced
- ✅ Type Hints
- ✅ Functional Programming
- ✅ Virtual Environments
- ✅ Package Structure
- ✅ Working with Databases
- ✅ Logging
- ✅ Performance Optimization
- ✅ Best Practices
- ✅ Working with APIs
Bây giờ là lúc practice với các dự án thực tế nâng cao! 🚀
Bài Tập Lớn
Project 1: Async Web Scraper & Data Pipeline 🕷️
Mô tả: Xây dựng web scraper với async programming, data processing pipeline, và caching system.
Yêu cầu chức năng:
- Async scraping multiple websites đồng thời
- Rate limiting và retry logic
- Data extraction với regex
- Data validation và transformation
- Cache results với TTL
- Store data in database (SQLite/PostgreSQL)
- Export to JSON/CSV
- Logging và error handling
- Performance monitoring
Kiến thức sử dụng:
- Async programming (asyncio, aiohttp)
- Context managers
- Decorators (retry, cache, timing)
- Generators (data streaming)
- Regular expressions
- Database operations
- Logging
- Error handling
- Type hints
Hướng dẫn thực hiện:
Bước 1: Setup Project Structure
# project_structure.py"""web_scraper/├── __init__.py├── core/│ ├── __init__.py│ ├── scraper.py # Main scraper class│ ├── parser.py # HTML/JSON parsers│ └── validator.py # Data validators├── utils/│ ├── __init__.py│ ├── cache.py # Caching system│ ├── decorators.py # Custom decorators│ └── logger.py # Logging setup├── database/│ ├── __init__.py│ ├── models.py # Database models│ └── repository.py # Data access layer├── config.py # Configuration├── main.py # Entry point└── tests/ ├── __init__.py ├── test_scraper.py └── test_parser.py"""
Bước 2: Core Components
# core/scraper.pyimport asyncioimport aiohttpfrom typing import List, Dict, Optionalfrom datetime import datetime, timedeltaimport logging logger = logging.getLogger(__name__) class RateLimiter: """Rate limiter using token bucket algorithm.""" def __init__(self, rate: int, per: float): """ Args: rate: Number of requests per: Time period in seconds """ self.rate = rate self.per = per self.allowance = rate self.last_check = datetime.now() self._lock = asyncio.Lock() async def acquire(self): """Wait until request is allowed.""" async with self._lock: current = datetime.now() time_passed = (current - self.last_check).total_seconds() self.last_check = current self.allowance += time_passed * (self.rate / self.per) if self.allowance > self.rate: self.allowance = self.rate if self.allowance < 1.0: sleep_time = (1.0 - self.allowance) * (self.per / self.rate) logger.debug(f"Rate limit reached. Sleeping {sleep_time:.2f}s") await asyncio.sleep(sleep_time) self.allowance = 0.0 else: self.allowance -= 1.0 class AsyncWebScraper: """Async web scraper with rate limiting and retry.""" def __init__(self, max_concurrent: int = 5, rate_limit: int = 10, rate_per_seconds: float = 1.0, timeout: int = 30): """ Initialize scraper. Args: max_concurrent: Max concurrent requests rate_limit: Max requests per time period rate_per_seconds: Time period for rate limit timeout: Request timeout in seconds """ self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.rate_limiter = RateLimiter(rate_limit, rate_per_seconds) self.timeout = aiohttp.ClientTimeout(total=timeout) self.session: Optional[aiohttp.ClientSession] = None self.stats = { 'total_requests': 0, 'successful': 0, 'failed': 0, 'total_time': 0.0 } async def __aenter__(self): """Async context manager entry.""" self.session = aiohttp.ClientSession(timeout=self.timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self.session: await self.session.close() async def fetch(self, url: str, max_retries: int = 3) -> Optional[str]: """ Fetch URL with rate limiting and retry. Args: url: URL to fetch max_retries: Maximum retry attempts Returns: HTML content or None if failed """ async with self.semaphore: await self.rate_limiter.acquire() for attempt in range(max_retries): start_time = datetime.now() try: self.stats['total_requests'] += 1 logger.info(f"Fetching: {url} (attempt {attempt + 1}/{max_retries})") async with self.session.get(url) as response: response.raise_for_status() content = await response.text() elapsed = (datetime.now() - start_time).total_seconds() self.stats['successful'] += 1 self.stats['total_time'] += elapsed logger.info(f"Success: {url} ({elapsed:.2f}s)") return content except aiohttp.ClientError as e: logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}") if attempt < max_retries - 1: wait_time = 2 ** attempt # Exponential backoff logger.info(f"Retrying in {wait_time}s...") await asyncio.sleep(wait_time) else: logger.error(f"All retries failed for {url}") self.stats['failed'] += 1 return None async def fetch_many(self, urls: List[str]) -> List[Optional[str]]: """ Fetch multiple URLs concurrently. Args: urls: List of URLs Returns: List of HTML contents """ tasks = [self.fetch(url) for url in urls] return await asyncio.gather(*tasks) def get_stats(self) -> Dict: """Get scraping statistics.""" avg_time = (self.stats['total_time'] / self.stats['successful'] if self.stats['successful'] > 0 else 0) return { **self.stats, 'avg_response_time': avg_time, 'success_rate': (self.stats['successful'] / self.stats['total_requests'] * 100 if self.stats['total_requests'] > 0 else 0) } # Usage exampleasync def main(): """Example usage.""" urls = [ 'https://quotes.toscrape.com/page/1/', 'https://quotes.toscrape.com/page/2/', 'https://quotes.toscrape.com/page/3/', ] async with AsyncWebScraper(max_concurrent=3, rate_limit=5) as scraper: results = await scraper.fetch_many(urls) print(f"Fetched {len(results)} pages") print(f"Stats: {scraper.get_stats()}") if __name__ == "__main__": asyncio.run(main())
Bước 3: Data Parser
# core/parser.pyimport refrom typing import List, Dict, Any, Optionalfrom bs4 import BeautifulSoupimport logging logger = logging.getLogger(__name__) class DataParser: """Parse and extract data from HTML.""" @staticmethod def parse_html(html: str) -> BeautifulSoup: """Parse HTML content.""" return BeautifulSoup(html, 'html.parser') @staticmethod def extract_text(element, selector: str) -> Optional[str]: """Extract text from element.""" try: found = element.select_one(selector) return found.get_text(strip=True) if found else None except Exception as e: logger.error(f"Error extracting text: {e}") return None @staticmethod def extract_attribute(element, selector: str, attr: str) -> Optional[str]: """Extract attribute from element.""" try: found = element.select_one(selector) return found.get(attr) if found else None except Exception as e: logger.error(f"Error extracting attribute: {e}") return None @staticmethod def extract_email(text: str) -> Optional[str]: """Extract email using regex.""" pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' match = re.search(pattern, text) return match.group(0) if match else None @staticmethod def extract_phone(text: str) -> Optional[str]: """Extract phone number using regex.""" # US phone pattern pattern = r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}' match = re.search(pattern, text) return match.group(0) if match else None @staticmethod def extract_price(text: str) -> Optional[float]: """Extract price from text.""" pattern = r'\$?\s*(\d+(?:,\d{3})*(?:\.\d{2})?)' match = re.search(pattern, text) if match: price_str = match.group(1).replace(',', '') return float(price_str) return None @staticmethod def extract_date(text: str) -> Optional[str]: """Extract date using regex.""" # Format: YYYY-MM-DD pattern = r'\d{4}-\d{2}-\d{2}' match = re.search(pattern, text) return match.group(0) if match else None class QuoteParser(DataParser): """Parser for quotes.toscrape.com.""" def parse_quotes(self, html: str) -> List[Dict[str, Any]]: """ Parse quotes from HTML. Returns: List of quote dictionaries """ soup = self.parse_html(html) quotes = [] for quote_div in soup.select('.quote'): try: quote_data = { 'text': self.extract_text(quote_div, '.text'), 'author': self.extract_text(quote_div, '.author'), 'tags': [tag.get_text(strip=True) for tag in quote_div.select('.tag')], 'author_url': self.extract_attribute(quote_div, 'a', 'href') } quotes.append(quote_data) except Exception as e: logger.error(f"Error parsing quote: {e}") return quotes def parse_author(self, html: str) -> Optional[Dict[str, Any]]: """Parse author details.""" soup = self.parse_html(html) try: return { 'name': self.extract_text(soup, '.author-title'), 'born_date': self.extract_text(soup, '.author-born-date'), 'born_location': self.extract_text(soup, '.author-born-location'), 'description': self.extract_text(soup, '.author-description') } except Exception as e: logger.error(f"Error parsing author: {e}") return None
Bước 4: Caching System
# utils/cache.pyimport jsonimport hashlibimport timefrom typing import Any, Optional, Callablefrom pathlib import Pathimport functoolsimport logging logger = logging.getLogger(__name__) class FileCache: """Simple file-based cache with TTL.""" def __init__(self, cache_dir: str = '.cache', default_ttl: int = 3600): """ Initialize cache. Args: cache_dir: Directory for cache files default_ttl: Default time-to-live in seconds """ self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.default_ttl = default_ttl def _get_cache_key(self, key: str) -> str: """Generate cache key hash.""" return hashlib.md5(key.encode()).hexdigest() def _get_cache_path(self, key: str) -> Path: """Get cache file path.""" cache_key = self._get_cache_key(key) return self.cache_dir / f"{cache_key}.json" def get(self, key: str) -> Optional[Any]: """Get value from cache.""" cache_path = self._get_cache_path(key) if not cache_path.exists(): return None try: with open(cache_path, 'r') as f: data = json.load(f) # Check if expired if time.time() > data['expires_at']: logger.debug(f"Cache expired for key: {key}") cache_path.unlink() return None logger.debug(f"Cache hit for key: {key}") return data['value'] except Exception as e: logger.error(f"Error reading cache: {e}") return None def set(self, key: str, value: Any, ttl: Optional[int] = None): """Set value in cache.""" cache_path = self._get_cache_path(key) ttl = ttl or self.default_ttl try: data = { 'value': value, 'cached_at': time.time(), 'expires_at': time.time() + ttl } with open(cache_path, 'w') as f: json.dump(data, f) logger.debug(f"Cached key: {key} (TTL: {ttl}s)") except Exception as e: logger.error(f"Error writing cache: {e}") def clear(self): """Clear all cache.""" for cache_file in self.cache_dir.glob('*.json'): cache_file.unlink() logger.info("Cache cleared") def cached(ttl: int = 3600, cache_dir: str = '.cache'): """ Decorator for caching function results. Args: ttl: Time-to-live in seconds cache_dir: Cache directory """ cache = FileCache(cache_dir, ttl) def decorator(func: Callable): @functools.wraps(func) def wrapper(*args, **kwargs): # Generate cache key from function name and arguments key = f"{func.__name__}:{str(args)}:{str(kwargs)}" # Try to get from cache result = cache.get(key) if result is not None: return result # Call function and cache result result = func(*args, **kwargs) cache.set(key, result, ttl) return result return wrapper return decorator # Async versiondef async_cached(ttl: int = 3600, cache_dir: str = '.cache'): """Async version of cached decorator.""" cache = FileCache(cache_dir, ttl) def decorator(func: Callable): @functools.wraps(func) async def wrapper(*args, **kwargs): key = f"{func.__name__}:{str(args)}:{str(kwargs)}" result = cache.get(key) if result is not None: return result result = await func(*args, **kwargs) cache.set(key, result, ttl) return result return wrapper return decorator
Bước 5: Custom Decorators
# utils/decorators.pyimport timeimport functoolsimport loggingfrom typing import Callable, Anyimport asyncio logger = logging.getLogger(__name__) def timing(func: Callable) -> Callable: """Decorator to measure function execution time.""" @functools.wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) elapsed = time.time() - start logger.info(f"{func.__name__} took {elapsed:.4f}s") return result return wrapper def async_timing(func: Callable) -> Callable: """Async version of timing decorator.""" @functools.wraps(func) async def wrapper(*args, **kwargs): start = time.time() result = await func(*args, **kwargs) elapsed = time.time() - start logger.info(f"{func.__name__} took {elapsed:.4f}s") return result return wrapper def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0, exceptions: tuple = (Exception,)): """ Retry decorator with exponential backoff. Args: max_attempts: Maximum retry attempts delay: Initial delay between retries backoff: Backoff multiplier exceptions: Tuple of exceptions to catch """ def decorator(func: Callable): @functools.wraps(func) def wrapper(*args, **kwargs): current_delay = delay for attempt in range(max_attempts): try: return func(*args, **kwargs) except exceptions as e: if attempt == max_attempts - 1: logger.error(f"{func.__name__} failed after {max_attempts} attempts") raise logger.warning(f"{func.__name__} attempt {attempt + 1} failed: {e}") logger.info(f"Retrying in {current_delay}s...") time.sleep(current_delay) current_delay *= backoff return wrapper return decorator def async_retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0, exceptions: tuple = (Exception,)): """Async version of retry decorator.""" def decorator(func: Callable): @functools.wraps(func) async def wrapper(*args, **kwargs): current_delay = delay for attempt in range(max_attempts): try: return await func(*args, **kwargs) except exceptions as e: if attempt == max_attempts - 1: logger.error(f"{func.__name__} failed after {max_attempts} attempts") raise logger.warning(f"{func.__name__} attempt {attempt + 1} failed: {e}") logger.info(f"Retrying in {current_delay}s...") await asyncio.sleep(current_delay) current_delay *= backoff return wrapper return decorator def validate_args(**validators): """ Decorator to validate function arguments. Example: @validate_args(url=lambda x: x.startswith('http'), count=lambda x: x > 0) def fetch(url: str, count: int): pass """ def decorator(func: Callable): @functools.wraps(func) def wrapper(*args, **kwargs): # Get function signature import inspect sig = inspect.signature(func) bound = sig.bind(*args, **kwargs) bound.apply_defaults() # Validate arguments for param_name, validator in validators.items(): if param_name in bound.arguments: value = bound.arguments[param_name] if not validator(value): raise ValueError( f"Validation failed for parameter '{param_name}' " f"with value '{value}'" ) return func(*args, **kwargs) return wrapper return decorator
Bước 6: Logging Configuration
# utils/logger.pyimport loggingimport sysfrom pathlib import Pathfrom datetime import datetime def setup_logging(log_level: str = 'INFO', log_file: str = None): """ Setup logging configuration. Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) log_file: Optional log file path """ # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Root logger root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level.upper())) # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # File handler (optional) if log_file: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) file_handler = logging.FileHandler(log_path) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) return root_logger
Tiếp tục Part 2 sẽ bao gồm:
- Database models và repository
- Data validation
- Main application
- Testing
- CLI interface
- Performance monitoring
Bài tiếp theo: Bài 21.2: Complete Web Scraper Implementation 🚀