Bài 21: Python Advanced - Practices & Projects (Phần 3)

Project 2: Real-time Data Processing Pipeline 📊

Mô tả: Xây dựng data pipeline xử lý streaming data với generators, transformations và aggregations.

Yêu cầu chức năng:

  • Read data từ multiple sources (CSV, JSON, API)
  • Data transformations với generators
  • Filter, map, reduce operations
  • Aggregation và statistics
  • Write to multiple outputs (DB, file, API)
  • Memory-efficient streaming
  • Error handling và logging
  • Performance monitoring

Kiến thức sử dụng:

  • Generators & Iterators
  • Functional programming
  • Context managers
  • Decorators
  • Type hints
  • Error handling
  • Logging

Hướng dẫn thực hiện:

Bước 1: Data Sources

# pipeline/sources.pyimport csvimport jsonfrom typing import Iterator, Dict, Any, Optionalfrom pathlib import Pathimport logging logger = logging.getLogger(__name__) class DataSource:    """Base class for data sources."""        def read(self) -> Iterator[Dict[str, Any]]:        """Read data and yield records."""        raise NotImplementedError class CSVSource(DataSource):    """Read data from CSV file."""        def __init__(self, filepath: str, encoding: str = 'utf-8'):        self.filepath = Path(filepath)        self.encoding = encoding        def read(self) -> Iterator[Dict[str, Any]]:        """Read CSV file line by line."""        try:            with open(self.filepath, 'r', encoding=self.encoding) as f:                reader = csv.DictReader(f)                                for row in reader:                    yield dict(row)                            except Exception as e:            logger.error(f"Error reading CSV {self.filepath}: {e}")            raise class JSONLSource(DataSource):    """Read data from JSON Lines file."""        def __init__(self, filepath: str, encoding: str = 'utf-8'):        self.filepath = Path(filepath)        self.encoding = encoding        def read(self) -> Iterator[Dict[str, Any]]:        """Read JSONL file line by line."""        try:            with open(self.filepath, 'r', encoding=self.encoding) as f:                for line in f:                    if line.strip():                        yield json.loads(line)                                except Exception as e:            logger.error(f"Error reading JSONL {self.filepath}: {e}")            raise class APISource(DataSource):    """Read data from paginated API."""        def __init__(self, base_url: str, auth_token: Optional[str] = None,                 params: Optional[Dict] = None):        import requests        self.base_url = base_url        self.session = requests.Session()                if auth_token:            self.session.headers['Authorization'] = f'Bearer {auth_token}'                self.params = params or {}        def read(self) -> Iterator[Dict[str, Any]]:        """Read data from paginated API."""        page = 1                while True:            try:                params = {**self.params, 'page': page}                response = self.session.get(self.base_url, params=params)                response.raise_for_status()                                data = response.json()                items = data.get('items', [])                                if not items:                    break                                for item in items:                    yield item                                page += 1                            except Exception as e:                logger.error(f"Error reading from API: {e}")                break class GeneratorSource(DataSource):    """Source from generator function."""        def __init__(self, generator_func, *args, **kwargs):        self.generator_func = generator_func        self.args = args        self.kwargs = kwargs        def read(self) -> Iterator[Dict[str, Any]]:        """Read from generator."""        yield from self.generator_func(*self.args, **self.kwargs)

Bước 2: Data Transformations

# pipeline/transformations.pyfrom typing import Iterator, Dict, Any, Callable, Listimport refrom datetime import datetimeimport logging logger = logging.getLogger(__name__) def transform_map(data: Iterator[Dict], func: Callable) -> Iterator[Dict]:    """Apply function to each record."""    for record in data:        try:            yield func(record)        except Exception as e:            logger.error(f"Error in map transformation: {e}") def transform_filter(data: Iterator[Dict], predicate: Callable) -> Iterator[Dict]:    """Filter records based on predicate."""    for record in data:        try:            if predicate(record):                yield record        except Exception as e:            logger.error(f"Error in filter transformation: {e}") def transform_select(data: Iterator[Dict], fields: List[str]) -> Iterator[Dict]:    """Select specific fields from records."""    for record in data:        yield {field: record.get(field) for field in fields} def transform_rename(data: Iterator[Dict], mapping: Dict[str, str]) -> Iterator[Dict]:    """Rename fields in records."""    for record in data:        yield {            mapping.get(key, key): value            for key, value in record.items()        } def transform_add_field(data: Iterator[Dict], field: str,                        func: Callable) -> Iterator[Dict]:    """Add computed field to records."""    for record in data:        new_record = record.copy()        try:            new_record[field] = func(record)        except Exception as e:            logger.error(f"Error computing field {field}: {e}")            new_record[field] = None        yield new_record def transform_clean_text(data: Iterator[Dict],                         fields: List[str]) -> Iterator[Dict]:    """Clean text fields."""    for record in data:        new_record = record.copy()        for field in fields:            if field in new_record and isinstance(new_record[field], str):                # Remove extra whitespace                new_record[field] = ' '.join(new_record[field].split())                # Remove special characters (optional)                # new_record[field] = re.sub(r'[^\w\s]', '', new_record[field])        yield new_record def transform_parse_date(data: Iterator[Dict], field: str,                         format: str = '%Y-%m-%d') -> Iterator[Dict]:    """Parse date field."""    for record in data:        new_record = record.copy()        try:            if field in new_record and new_record[field]:                new_record[field] = datetime.strptime(                    new_record[field], format                )        except Exception as e:            logger.error(f"Error parsing date in field {field}: {e}")            new_record[field] = None        yield new_record def transform_normalize(data: Iterator[Dict], field: str,                       min_val: float = 0, max_val: float = 1) -> Iterator[Dict]:    """Normalize numeric field to range."""    # First pass: collect all values    records = list(data)    values = [r.get(field, 0) for r in records if isinstance(r.get(field), (int, float))]        if not values:        yield from records        return        data_min = min(values)    data_max = max(values)    range_val = data_max - data_min        # Second pass: normalize    for record in records:        new_record = record.copy()        if field in new_record and isinstance(new_record[field], (int, float)):            if range_val > 0:                normalized = (new_record[field] - data_min) / range_val                new_record[field] = min_val + normalized * (max_val - min_val)        yield new_record class Pipeline:    """Data processing pipeline."""        def __init__(self, source: 'DataSource'):        self.source = source        self.transformations = []        def map(self, func: Callable) -> 'Pipeline':        """Add map transformation."""        self.transformations.append(            lambda data: transform_map(data, func)        )        return self        def filter(self, predicate: Callable) -> 'Pipeline':        """Add filter transformation."""        self.transformations.append(            lambda data: transform_filter(data, predicate)        )        return self        def select(self, fields: List[str]) -> 'Pipeline':        """Add select transformation."""        self.transformations.append(            lambda data: transform_select(data, fields)        )        return self        def rename(self, mapping: Dict[str, str]) -> 'Pipeline':        """Add rename transformation."""        self.transformations.append(            lambda data: transform_rename(data, mapping)        )        return self        def add_field(self, field: str, func: Callable) -> 'Pipeline':        """Add computed field."""        self.transformations.append(            lambda data: transform_add_field(data, field, func)        )        return self        def clean_text(self, fields: List[str]) -> 'Pipeline':        """Add text cleaning."""        self.transformations.append(            lambda data: transform_clean_text(data, fields)        )        return self        def parse_date(self, field: str, format: str = '%Y-%m-%d') -> 'Pipeline':        """Add date parsing."""        self.transformations.append(            lambda data: transform_parse_date(data, field, format)        )        return self        def execute(self) -> Iterator[Dict[str, Any]]:        """Execute pipeline."""        data = self.source.read()                # Apply all transformations        for transformation in self.transformations:            data = transformation(data)                yield from data        def collect(self) -> List[Dict[str, Any]]:        """Collect all results into list."""        return list(self.execute())

Bước 3: Aggregations

# pipeline/aggregations.pyfrom typing import Iterator, Dict, Any, Callable, Listfrom collections import defaultdictimport statistics class Aggregator:    """Aggregate data from pipeline."""        @staticmethod    def count(data: Iterator[Dict]) -> int:        """Count records."""        return sum(1 for _ in data)        @staticmethod    def sum_field(data: Iterator[Dict], field: str) -> float:        """Sum numeric field."""        return sum(            record.get(field, 0)            for record in data            if isinstance(record.get(field), (int, float))        )        @staticmethod    def avg_field(data: Iterator[Dict], field: str) -> float:        """Average of numeric field."""        values = [            record.get(field, 0)            for record in data            if isinstance(record.get(field), (int, float))        ]        return statistics.mean(values) if values else 0        @staticmethod    def min_field(data: Iterator[Dict], field: str) -> Any:        """Minimum value of field."""        values = [            record.get(field)            for record in data            if record.get(field) is not None        ]        return min(values) if values else None        @staticmethod    def max_field(data: Iterator[Dict], field: str) -> Any:        """Maximum value of field."""        values = [            record.get(field)            for record in data            if record.get(field) is not None        ]        return max(values) if values else None        @staticmethod    def group_by(data: Iterator[Dict], key_field: str,                 agg_field: str, agg_func: str = 'count') -> Dict[Any, Any]:        """Group by key and aggregate."""        groups = defaultdict(list)                for record in data:            key = record.get(key_field)            if key is not None:                groups[key].append(record.get(agg_field))                # Apply aggregation        result = {}        for key, values in groups.items():            if agg_func == 'count':                result[key] = len(values)            elif agg_func == 'sum':                result[key] = sum(v for v in values if isinstance(v, (int, float)))            elif agg_func == 'avg':                numeric_values = [v for v in values if isinstance(v, (int, float))]                result[key] = statistics.mean(numeric_values) if numeric_values else 0            elif agg_func == 'min':                result[key] = min(v for v in values if v is not None)            elif agg_func == 'max':                result[key] = max(v for v in values if v is not None)                return result        @staticmethod    def statistics(data: Iterator[Dict], field: str) -> Dict[str, float]:        """Calculate statistics for field."""        values = [            record.get(field, 0)            for record in data            if isinstance(record.get(field), (int, float))        ]                if not values:            return {}                return {            'count': len(values),            'sum': sum(values),            'mean': statistics.mean(values),            'median': statistics.median(values),            'min': min(values),            'max': max(values),            'stdev': statistics.stdev(values) if len(values) > 1 else 0        }

Bước 4: Data Sinks (Outputs)

# pipeline/sinks.pyimport csvimport jsonfrom typing import Iterator, Dict, Any, Listfrom pathlib import Pathimport logging logger = logging.getLogger(__name__) class DataSink:    """Base class for data sinks."""        def write(self, data: Iterator[Dict[str, Any]]):        """Write data."""        raise NotImplementedError class CSVSink(DataSink):    """Write data to CSV file."""        def __init__(self, filepath: str, fieldnames: List[str] = None):        self.filepath = Path(filepath)        self.fieldnames = fieldnames        def write(self, data: Iterator[Dict[str, Any]]):        """Write to CSV file."""        records = list(data)                if not records:            logger.warning("No data to write")            return                fieldnames = self.fieldnames or list(records[0].keys())                try:            with open(self.filepath, 'w', newline='', encoding='utf-8') as f:                writer = csv.DictWriter(f, fieldnames=fieldnames)                writer.writeheader()                writer.writerows(records)                        logger.info(f"Wrote {len(records)} records to {self.filepath}")                except Exception as e:            logger.error(f"Error writing CSV: {e}")            raise class JSONLSink(DataSink):    """Write data to JSON Lines file."""        def __init__(self, filepath: str):        self.filepath = Path(filepath)        def write(self, data: Iterator[Dict[str, Any]]):        """Write to JSONL file."""        count = 0                try:            with open(self.filepath, 'w', encoding='utf-8') as f:                for record in data:                    f.write(json.dumps(record, ensure_ascii=False) + '\n')                    count += 1                        logger.info(f"Wrote {count} records to {self.filepath}")                except Exception as e:            logger.error(f"Error writing JSONL: {e}")            raise class DatabaseSink(DataSink):    """Write data to database."""        def __init__(self, db_manager, repository_class):        self.db_manager = db_manager        self.repository_class = repository_class        def write(self, data: Iterator[Dict[str, Any]]):        """Write to database."""        count = 0                try:            with self.db_manager as session:                repo = self.repository_class(session)                                for record in data:                    repo.create(**record)                    count += 1                        logger.info(f"Wrote {count} records to database")                except Exception as e:            logger.error(f"Error writing to database: {e}")            raise class ConsoleSink(DataSink):    """Write data to console."""        def __init__(self, max_records: int = 10):        self.max_records = max_records        def write(self, data: Iterator[Dict[str, Any]]):        """Print to console."""        count = 0                for record in data:            if count < self.max_records:                print(json.dumps(record, indent=2, ensure_ascii=False))            count += 1                if count > self.max_records:            print(f"\n... and {count - self.max_records} more records")                print(f"\nTotal: {count} records")

Bước 5: Complete Pipeline Example

# examples/sales_pipeline.py"""Example: Sales Data Pipeline Input: CSV file with sales dataProcessing: Clean, transform, aggregateOutput: Statistics and reports""" from pipeline.sources import CSVSourcefrom pipeline.transformations import Pipelinefrom pipeline.aggregations import Aggregatorfrom pipeline.sinks import CSVSink, JSONLSink, ConsoleSink def run_sales_pipeline():    """Process sales data."""        # Create pipeline    pipeline = Pipeline(CSVSource('data/sales.csv'))        # Add transformations    pipeline = (        pipeline        .filter(lambda r: float(r.get('amount', 0)) > 0)  # Valid sales only        .clean_text(['product_name', 'customer_name'])        .parse_date('sale_date', '%Y-%m-%d')        .add_field('total', lambda r: float(r['amount']) * int(r['quantity']))        .rename({'customer_name': 'customer', 'product_name': 'product'})        .select(['sale_date', 'customer', 'product', 'quantity', 'amount', 'total'])    )        # Execute and collect    results = list(pipeline.execute())        print(f"Processed {len(results)} sales records")        # Aggregations    print("\n=== Sales Statistics ===")        # Total sales    total_revenue = sum(r['total'] for r in results)    print(f"Total Revenue: ${total_revenue:,.2f}")        # Group by product    product_sales = Aggregator.group_by(        iter(results), 'product', 'total', 'sum'    )    print("\nSales by Product:")    for product, sales in sorted(product_sales.items(),                                  key=lambda x: x[1], reverse=True):        print(f"  {product}: ${sales:,.2f}")        # Group by customer    customer_sales = Aggregator.group_by(        iter(results), 'customer', 'total', 'sum'    )    print("\nTop Customers:")    for customer, sales in sorted(customer_sales.items(),                                   key=lambda x: x[1], reverse=True)[:5]:        print(f"  {customer}: ${sales:,.2f}")        # Amount statistics    amount_stats = Aggregator.statistics(iter(results), 'total')    print("\nSales Amount Statistics:")    for key, value in amount_stats.items():        print(f"  {key}: ${value:,.2f}")        # Write outputs    CSVSink('output/sales_processed.csv').write(iter(results))    JSONLSink('output/sales_processed.jsonl').write(iter(results))        print("\n✅ Pipeline completed!") if __name__ == '__main__':    run_sales_pipeline()

Project 3: RESTful API Client Library 🌐

Mô tả: Xây dựng thư viện client cho REST APIs với retry, caching, và rate limiting.

Yêu cầu chức năng:

  • HTTP methods (GET, POST, PUT, DELETE, PATCH)
  • Authentication (Bearer, Basic, API Key)
  • Automatic retry với exponential backoff
  • Response caching với TTL
  • Rate limiting
  • Request/response interceptors
  • Pagination support
  • Error handling
  • Logging

Hướng dẫn thực hiện:

Bước 1: Core Client

# api_client/client.pyimport requestsfrom typing import Dict, Any, Optional, Callable, Listfrom datetime import datetime, timedeltaimport timeimport hashlibimport jsonimport loggingfrom pathlib import Path logger = logging.getLogger(__name__) class HTTPClient:    """HTTP client with retry and caching."""        def __init__(self, base_url: str, timeout: int = 30,                 max_retries: int = 3, cache_dir: str = '.cache'):        """        Initialize HTTP client.                Args:            base_url: Base URL for API            timeout: Request timeout in seconds            max_retries: Maximum retry attempts            cache_dir: Directory for response cache        """        self.base_url = base_url.rstrip('/')        self.timeout = timeout        self.max_retries = max_retries        self.cache_dir = Path(cache_dir)        self.cache_dir.mkdir(exist_ok=True)                self.session = requests.Session()        self.request_interceptors: List[Callable] = []        self.response_interceptors: List[Callable] = []        def add_request_interceptor(self, func: Callable):        """Add request interceptor."""        self.request_interceptors.append(func)        def add_response_interceptor(self, func: Callable):        """Add response interceptor."""        self.response_interceptors.append(func)        def _get_cache_key(self, url: str, params: Dict = None) -> str:        """Generate cache key."""        cache_data = f"{url}:{json.dumps(params or {}, sort_keys=True)}"        return hashlib.md5(cache_data.encode()).hexdigest()        def _get_from_cache(self, key: str, ttl: int) -> Optional[Dict]:        """Get response from cache."""        cache_file = self.cache_dir / f"{key}.json"                if not cache_file.exists():            return None                try:            with open(cache_file, 'r') as f:                data = json.load(f)                        # Check expiration            cached_at = datetime.fromisoformat(data['cached_at'])            if datetime.now() - cached_at > timedelta(seconds=ttl):                cache_file.unlink()                return None                        logger.debug(f"Cache hit: {key}")            return data['response']                except Exception as e:            logger.error(f"Cache read error: {e}")            return None        def _save_to_cache(self, key: str, response: Dict):        """Save response to cache."""        cache_file = self.cache_dir / f"{key}.json"                try:            data = {                'cached_at': datetime.now().isoformat(),                'response': response            }                        with open(cache_file, 'w') as f:                json.dump(data, f)                        logger.debug(f"Cached response: {key}")                except Exception as e:            logger.error(f"Cache write error: {e}")        def request(self, method: str, endpoint: str, use_cache: bool = False,                cache_ttl: int = 3600, **kwargs) -> requests.Response:        """        Make HTTP request with retry and caching.                Args:            method: HTTP method            endpoint: API endpoint            use_cache: Enable response caching            cache_ttl: Cache time-to-live in seconds            **kwargs: Additional request parameters                Returns:            Response object        """        url = f"{self.base_url}/{endpoint.lstrip('/')}"                # Check cache for GET requests        if use_cache and method.upper() == 'GET':            cache_key = self._get_cache_key(url, kwargs.get('params'))            cached = self._get_from_cache(cache_key, cache_ttl)                        if cached:                # Create mock response                response = requests.Response()                response._content = json.dumps(cached).encode()                response.status_code = 200                return response                # Apply request interceptors        for interceptor in self.request_interceptors:            kwargs = interceptor(method, url, kwargs)                # Retry logic        last_exception = None        for attempt in range(self.max_retries):            try:                logger.info(f"{method} {url} (attempt {attempt + 1})")                                response = self.session.request(                    method, url, timeout=self.timeout, **kwargs                )                response.raise_for_status()                                # Apply response interceptors                for interceptor in self.response_interceptors:                    response = interceptor(response)                                # Cache GET responses                if use_cache and method.upper() == 'GET':                    cache_key = self._get_cache_key(url, kwargs.get('params'))                    self._save_to_cache(cache_key, response.json())                                return response                        except requests.exceptions.RequestException as e:                last_exception = e                logger.warning(f"Request failed (attempt {attempt + 1}): {e}")                                if attempt < self.max_retries - 1:                    wait_time = 2 ** attempt  # Exponential backoff                    logger.info(f"Retrying in {wait_time}s...")                    time.sleep(wait_time)                # All retries failed        logger.error(f"Request failed after {self.max_retries} attempts")        raise last_exception        def get(self, endpoint: str, **kwargs) -> requests.Response:        """GET request."""        return self.request('GET', endpoint, **kwargs)        def post(self, endpoint: str, **kwargs) -> requests.Response:        """POST request."""        return self.request('POST', endpoint, **kwargs)        def put(self, endpoint: str, **kwargs) -> requests.Response:        """PUT request."""        return self.request('PUT', endpoint, **kwargs)        def patch(self, endpoint: str, **kwargs) -> requests.Response:        """PATCH request."""        return self.request('PATCH', endpoint, **kwargs)        def delete(self, endpoint: str, **kwargs) -> requests.Response:        """DELETE request."""        return self.request('DELETE', endpoint, **kwargs)

Tiếp tục với Authentication, Rate Limiting, và bài tập tự làm trong file riêng...


Bài tiếp theo: Bài 21.4: API Client hoàn chỉnh và Bài Tập Tự Làm 🚀