Bài 21: Python Advanced - Practices & Projects (Phần 1)

Mục Tiêu Bài Học

Sau khi hoàn thành bài này, bạn sẽ:

  • ✅ Tổng hợp tất cả kiến thức Python Advanced
  • ✅ Xây dựng các ứng dụng thực tế với async programming
  • ✅ Áp dụng decorators, generators, context managers
  • ✅ Làm việc với APIs và databases
  • ✅ Testing và performance optimization
  • ✅ Viết production-ready code

Tổng Quan

Module Python Advanced đã hoàn thành với 20 bài học:

  1. ✅ Decorators
  2. ✅ Generators & Iterators
  3. ✅ Context Managers
  4. ✅ Iterators & Iterables
  5. ✅ Metaclasses
  6. ✅ Multi-threading
  7. ✅ Multi-processing
  8. ✅ Async Programming
  9. ✅ Regular Expressions
  10. ✅ File I/O Advanced
  11. ✅ JSON & CSV
  12. ✅ Testing (unittest)
  13. ✅ Testing (pytest)
  14. ✅ Error Handling Advanced
  15. ✅ Type Hints
  16. ✅ Functional Programming
  17. ✅ Virtual Environments
  18. ✅ Package Structure
  19. ✅ Working with Databases
  20. ✅ Logging
  21. ✅ Performance Optimization
  22. ✅ Best Practices
  23. ✅ Working with APIs

Bây giờ là lúc practice với các dự án thực tế nâng cao! 🚀

Bài Tập Lớn

Project 1: Async Web Scraper & Data Pipeline 🕷️

Mô tả: Xây dựng web scraper với async programming, data processing pipeline, và caching system.

Yêu cầu chức năng:

  • Async scraping multiple websites đồng thời
  • Rate limiting và retry logic
  • Data extraction với regex
  • Data validation và transformation
  • Cache results với TTL
  • Store data in database (SQLite/PostgreSQL)
  • Export to JSON/CSV
  • Logging và error handling
  • Performance monitoring

Kiến thức sử dụng:

  • Async programming (asyncio, aiohttp)
  • Context managers
  • Decorators (retry, cache, timing)
  • Generators (data streaming)
  • Regular expressions
  • Database operations
  • Logging
  • Error handling
  • Type hints

Hướng dẫn thực hiện:

Bước 1: Setup Project Structure

# project_structure.py"""web_scraper/├── __init__.py├── core/│   ├── __init__.py│   ├── scraper.py       # Main scraper class│   ├── parser.py        # HTML/JSON parsers│   └── validator.py     # Data validators├── utils/│   ├── __init__.py│   ├── cache.py         # Caching system│   ├── decorators.py    # Custom decorators│   └── logger.py        # Logging setup├── database/│   ├── __init__.py│   ├── models.py        # Database models│   └── repository.py    # Data access layer├── config.py            # Configuration├── main.py              # Entry point└── tests/    ├── __init__.py    ├── test_scraper.py    └── test_parser.py"""

Bước 2: Core Components

# core/scraper.pyimport asyncioimport aiohttpfrom typing import List, Dict, Optionalfrom datetime import datetime, timedeltaimport logging logger = logging.getLogger(__name__) class RateLimiter:    """Rate limiter using token bucket algorithm."""        def __init__(self, rate: int, per: float):        """        Args:            rate: Number of requests            per: Time period in seconds        """        self.rate = rate        self.per = per        self.allowance = rate        self.last_check = datetime.now()        self._lock = asyncio.Lock()        async def acquire(self):        """Wait until request is allowed."""        async with self._lock:            current = datetime.now()            time_passed = (current - self.last_check).total_seconds()            self.last_check = current                        self.allowance += time_passed * (self.rate / self.per)            if self.allowance > self.rate:                self.allowance = self.rate                        if self.allowance < 1.0:                sleep_time = (1.0 - self.allowance) * (self.per / self.rate)                logger.debug(f"Rate limit reached. Sleeping {sleep_time:.2f}s")                await asyncio.sleep(sleep_time)                self.allowance = 0.0            else:                self.allowance -= 1.0 class AsyncWebScraper:    """Async web scraper with rate limiting and retry."""        def __init__(self, max_concurrent: int = 5, rate_limit: int = 10,                 rate_per_seconds: float = 1.0, timeout: int = 30):        """        Initialize scraper.                Args:            max_concurrent: Max concurrent requests            rate_limit: Max requests per time period            rate_per_seconds: Time period for rate limit            timeout: Request timeout in seconds        """        self.max_concurrent = max_concurrent        self.semaphore = asyncio.Semaphore(max_concurrent)        self.rate_limiter = RateLimiter(rate_limit, rate_per_seconds)        self.timeout = aiohttp.ClientTimeout(total=timeout)        self.session: Optional[aiohttp.ClientSession] = None        self.stats = {            'total_requests': 0,            'successful': 0,            'failed': 0,            'total_time': 0.0        }        async def __aenter__(self):        """Async context manager entry."""        self.session = aiohttp.ClientSession(timeout=self.timeout)        return self        async def __aexit__(self, exc_type, exc_val, exc_tb):        """Async context manager exit."""        if self.session:            await self.session.close()        async def fetch(self, url: str, max_retries: int = 3) -> Optional[str]:        """        Fetch URL with rate limiting and retry.                Args:            url: URL to fetch            max_retries: Maximum retry attempts                Returns:            HTML content or None if failed        """        async with self.semaphore:            await self.rate_limiter.acquire()                        for attempt in range(max_retries):                start_time = datetime.now()                                try:                    self.stats['total_requests'] += 1                                        logger.info(f"Fetching: {url} (attempt {attempt + 1}/{max_retries})")                                        async with self.session.get(url) as response:                        response.raise_for_status()                        content = await response.text()                                                elapsed = (datetime.now() - start_time).total_seconds()                        self.stats['successful'] += 1                        self.stats['total_time'] += elapsed                                                logger.info(f"Success: {url} ({elapsed:.2f}s)")                        return content                                except aiohttp.ClientError as e:                    logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}")                                        if attempt < max_retries - 1:                        wait_time = 2 ** attempt  # Exponential backoff                        logger.info(f"Retrying in {wait_time}s...")                        await asyncio.sleep(wait_time)                    else:                        logger.error(f"All retries failed for {url}")                        self.stats['failed'] += 1                        return None        async def fetch_many(self, urls: List[str]) -> List[Optional[str]]:        """        Fetch multiple URLs concurrently.                Args:            urls: List of URLs                Returns:            List of HTML contents        """        tasks = [self.fetch(url) for url in urls]        return await asyncio.gather(*tasks)        def get_stats(self) -> Dict:        """Get scraping statistics."""        avg_time = (self.stats['total_time'] / self.stats['successful']                    if self.stats['successful'] > 0 else 0)                return {            **self.stats,            'avg_response_time': avg_time,            'success_rate': (self.stats['successful'] / self.stats['total_requests'] * 100                           if self.stats['total_requests'] > 0 else 0)        } # Usage exampleasync def main():    """Example usage."""    urls = [        'https://quotes.toscrape.com/page/1/',        'https://quotes.toscrape.com/page/2/',        'https://quotes.toscrape.com/page/3/',    ]        async with AsyncWebScraper(max_concurrent=3, rate_limit=5) as scraper:        results = await scraper.fetch_many(urls)                print(f"Fetched {len(results)} pages")        print(f"Stats: {scraper.get_stats()}") if __name__ == "__main__":    asyncio.run(main())

Bước 3: Data Parser

# core/parser.pyimport refrom typing import List, Dict, Any, Optionalfrom bs4 import BeautifulSoupimport logging logger = logging.getLogger(__name__) class DataParser:    """Parse and extract data from HTML."""        @staticmethod    def parse_html(html: str) -> BeautifulSoup:        """Parse HTML content."""        return BeautifulSoup(html, 'html.parser')        @staticmethod    def extract_text(element, selector: str) -> Optional[str]:        """Extract text from element."""        try:            found = element.select_one(selector)            return found.get_text(strip=True) if found else None        except Exception as e:            logger.error(f"Error extracting text: {e}")            return None        @staticmethod    def extract_attribute(element, selector: str, attr: str) -> Optional[str]:        """Extract attribute from element."""        try:            found = element.select_one(selector)            return found.get(attr) if found else None        except Exception as e:            logger.error(f"Error extracting attribute: {e}")            return None        @staticmethod    def extract_email(text: str) -> Optional[str]:        """Extract email using regex."""        pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'        match = re.search(pattern, text)        return match.group(0) if match else None        @staticmethod    def extract_phone(text: str) -> Optional[str]:        """Extract phone number using regex."""        # US phone pattern        pattern = r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'        match = re.search(pattern, text)        return match.group(0) if match else None        @staticmethod    def extract_price(text: str) -> Optional[float]:        """Extract price from text."""        pattern = r'\$?\s*(\d+(?:,\d{3})*(?:\.\d{2})?)'        match = re.search(pattern, text)        if match:            price_str = match.group(1).replace(',', '')            return float(price_str)        return None        @staticmethod    def extract_date(text: str) -> Optional[str]:        """Extract date using regex."""        # Format: YYYY-MM-DD        pattern = r'\d{4}-\d{2}-\d{2}'        match = re.search(pattern, text)        return match.group(0) if match else None class QuoteParser(DataParser):    """Parser for quotes.toscrape.com."""        def parse_quotes(self, html: str) -> List[Dict[str, Any]]:        """        Parse quotes from HTML.                Returns:            List of quote dictionaries        """        soup = self.parse_html(html)        quotes = []                for quote_div in soup.select('.quote'):            try:                quote_data = {                    'text': self.extract_text(quote_div, '.text'),                    'author': self.extract_text(quote_div, '.author'),                    'tags': [tag.get_text(strip=True)                             for tag in quote_div.select('.tag')],                    'author_url': self.extract_attribute(quote_div, 'a', 'href')                }                quotes.append(quote_data)            except Exception as e:                logger.error(f"Error parsing quote: {e}")                return quotes        def parse_author(self, html: str) -> Optional[Dict[str, Any]]:        """Parse author details."""        soup = self.parse_html(html)                try:            return {                'name': self.extract_text(soup, '.author-title'),                'born_date': self.extract_text(soup, '.author-born-date'),                'born_location': self.extract_text(soup, '.author-born-location'),                'description': self.extract_text(soup, '.author-description')            }        except Exception as e:            logger.error(f"Error parsing author: {e}")            return None

Bước 4: Caching System

# utils/cache.pyimport jsonimport hashlibimport timefrom typing import Any, Optional, Callablefrom pathlib import Pathimport functoolsimport logging logger = logging.getLogger(__name__) class FileCache:    """Simple file-based cache with TTL."""        def __init__(self, cache_dir: str = '.cache', default_ttl: int = 3600):        """        Initialize cache.                Args:            cache_dir: Directory for cache files            default_ttl: Default time-to-live in seconds        """        self.cache_dir = Path(cache_dir)        self.cache_dir.mkdir(exist_ok=True)        self.default_ttl = default_ttl        def _get_cache_key(self, key: str) -> str:        """Generate cache key hash."""        return hashlib.md5(key.encode()).hexdigest()        def _get_cache_path(self, key: str) -> Path:        """Get cache file path."""        cache_key = self._get_cache_key(key)        return self.cache_dir / f"{cache_key}.json"        def get(self, key: str) -> Optional[Any]:        """Get value from cache."""        cache_path = self._get_cache_path(key)                if not cache_path.exists():            return None                try:            with open(cache_path, 'r') as f:                data = json.load(f)                        # Check if expired            if time.time() > data['expires_at']:                logger.debug(f"Cache expired for key: {key}")                cache_path.unlink()                return None                        logger.debug(f"Cache hit for key: {key}")            return data['value']                except Exception as e:            logger.error(f"Error reading cache: {e}")            return None        def set(self, key: str, value: Any, ttl: Optional[int] = None):        """Set value in cache."""        cache_path = self._get_cache_path(key)        ttl = ttl or self.default_ttl                try:            data = {                'value': value,                'cached_at': time.time(),                'expires_at': time.time() + ttl            }                        with open(cache_path, 'w') as f:                json.dump(data, f)                        logger.debug(f"Cached key: {key} (TTL: {ttl}s)")                except Exception as e:            logger.error(f"Error writing cache: {e}")        def clear(self):        """Clear all cache."""        for cache_file in self.cache_dir.glob('*.json'):            cache_file.unlink()        logger.info("Cache cleared") def cached(ttl: int = 3600, cache_dir: str = '.cache'):    """    Decorator for caching function results.        Args:        ttl: Time-to-live in seconds        cache_dir: Cache directory    """    cache = FileCache(cache_dir, ttl)        def decorator(func: Callable):        @functools.wraps(func)        def wrapper(*args, **kwargs):            # Generate cache key from function name and arguments            key = f"{func.__name__}:{str(args)}:{str(kwargs)}"                        # Try to get from cache            result = cache.get(key)            if result is not None:                return result                        # Call function and cache result            result = func(*args, **kwargs)            cache.set(key, result, ttl)                        return result                return wrapper        return decorator # Async versiondef async_cached(ttl: int = 3600, cache_dir: str = '.cache'):    """Async version of cached decorator."""    cache = FileCache(cache_dir, ttl)        def decorator(func: Callable):        @functools.wraps(func)        async def wrapper(*args, **kwargs):            key = f"{func.__name__}:{str(args)}:{str(kwargs)}"                        result = cache.get(key)            if result is not None:                return result                        result = await func(*args, **kwargs)            cache.set(key, result, ttl)                        return result                return wrapper        return decorator

Bước 5: Custom Decorators

# utils/decorators.pyimport timeimport functoolsimport loggingfrom typing import Callable, Anyimport asyncio logger = logging.getLogger(__name__) def timing(func: Callable) -> Callable:    """Decorator to measure function execution time."""    @functools.wraps(func)    def wrapper(*args, **kwargs):        start = time.time()        result = func(*args, **kwargs)        elapsed = time.time() - start        logger.info(f"{func.__name__} took {elapsed:.4f}s")        return result    return wrapper def async_timing(func: Callable) -> Callable:    """Async version of timing decorator."""    @functools.wraps(func)    async def wrapper(*args, **kwargs):        start = time.time()        result = await func(*args, **kwargs)        elapsed = time.time() - start        logger.info(f"{func.__name__} took {elapsed:.4f}s")        return result    return wrapper def retry(max_attempts: int = 3, delay: float = 1.0,          backoff: float = 2.0, exceptions: tuple = (Exception,)):    """    Retry decorator with exponential backoff.        Args:        max_attempts: Maximum retry attempts        delay: Initial delay between retries        backoff: Backoff multiplier        exceptions: Tuple of exceptions to catch    """    def decorator(func: Callable):        @functools.wraps(func)        def wrapper(*args, **kwargs):            current_delay = delay                        for attempt in range(max_attempts):                try:                    return func(*args, **kwargs)                except exceptions as e:                    if attempt == max_attempts - 1:                        logger.error(f"{func.__name__} failed after {max_attempts} attempts")                        raise                                        logger.warning(f"{func.__name__} attempt {attempt + 1} failed: {e}")                    logger.info(f"Retrying in {current_delay}s...")                    time.sleep(current_delay)                    current_delay *= backoff                return wrapper    return decorator def async_retry(max_attempts: int = 3, delay: float = 1.0,               backoff: float = 2.0, exceptions: tuple = (Exception,)):    """Async version of retry decorator."""    def decorator(func: Callable):        @functools.wraps(func)        async def wrapper(*args, **kwargs):            current_delay = delay                        for attempt in range(max_attempts):                try:                    return await func(*args, **kwargs)                except exceptions as e:                    if attempt == max_attempts - 1:                        logger.error(f"{func.__name__} failed after {max_attempts} attempts")                        raise                                        logger.warning(f"{func.__name__} attempt {attempt + 1} failed: {e}")                    logger.info(f"Retrying in {current_delay}s...")                    await asyncio.sleep(current_delay)                    current_delay *= backoff                return wrapper    return decorator def validate_args(**validators):    """    Decorator to validate function arguments.        Example:        @validate_args(url=lambda x: x.startswith('http'),                       count=lambda x: x > 0)        def fetch(url: str, count: int):            pass    """    def decorator(func: Callable):        @functools.wraps(func)        def wrapper(*args, **kwargs):            # Get function signature            import inspect            sig = inspect.signature(func)            bound = sig.bind(*args, **kwargs)            bound.apply_defaults()                        # Validate arguments            for param_name, validator in validators.items():                if param_name in bound.arguments:                    value = bound.arguments[param_name]                    if not validator(value):                        raise ValueError(                            f"Validation failed for parameter '{param_name}' "                            f"with value '{value}'"                        )                        return func(*args, **kwargs)                return wrapper    return decorator

Bước 6: Logging Configuration

# utils/logger.pyimport loggingimport sysfrom pathlib import Pathfrom datetime import datetime def setup_logging(log_level: str = 'INFO', log_file: str = None):    """    Setup logging configuration.        Args:        log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)        log_file: Optional log file path    """    # Create formatter    formatter = logging.Formatter(        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',        datefmt='%Y-%m-%d %H:%M:%S'    )        # Root logger    root_logger = logging.getLogger()    root_logger.setLevel(getattr(logging, log_level.upper()))        # Console handler    console_handler = logging.StreamHandler(sys.stdout)    console_handler.setFormatter(formatter)    root_logger.addHandler(console_handler)        # File handler (optional)    if log_file:        log_path = Path(log_file)        log_path.parent.mkdir(parents=True, exist_ok=True)                file_handler = logging.FileHandler(log_path)        file_handler.setFormatter(formatter)        root_logger.addHandler(file_handler)        return root_logger

Tiếp tục Part 2 sẽ bao gồm:

  • Database models và repository
  • Data validation
  • Main application
  • Testing
  • CLI interface
  • Performance monitoring

Bài tiếp theo: Bài 21.2: Complete Web Scraper Implementation 🚀