Bài 21: Python Advanced - Practices & Projects (Phần 3)
Project 2: Real-time Data Processing Pipeline 📊
Mô tả: Xây dựng data pipeline xử lý streaming data với generators, transformations và aggregations.
Yêu cầu chức năng:
- Read data từ multiple sources (CSV, JSON, API)
- Data transformations với generators
- Filter, map, reduce operations
- Aggregation và statistics
- Write to multiple outputs (DB, file, API)
- Memory-efficient streaming
- Error handling và logging
- Performance monitoring
Kiến thức sử dụng:
- Generators & Iterators
- Functional programming
- Context managers
- Decorators
- Type hints
- Error handling
- Logging
Hướng dẫn thực hiện:
Bước 1: Data Sources
# pipeline/sources.pyimport csvimport jsonfrom typing import Iterator, Dict, Any, Optionalfrom pathlib import Pathimport logging logger = logging.getLogger(__name__) class DataSource: """Base class for data sources.""" def read(self) -> Iterator[Dict[str, Any]]: """Read data and yield records.""" raise NotImplementedError class CSVSource(DataSource): """Read data from CSV file.""" def __init__(self, filepath: str, encoding: str = 'utf-8'): self.filepath = Path(filepath) self.encoding = encoding def read(self) -> Iterator[Dict[str, Any]]: """Read CSV file line by line.""" try: with open(self.filepath, 'r', encoding=self.encoding) as f: reader = csv.DictReader(f) for row in reader: yield dict(row) except Exception as e: logger.error(f"Error reading CSV {self.filepath}: {e}") raise class JSONLSource(DataSource): """Read data from JSON Lines file.""" def __init__(self, filepath: str, encoding: str = 'utf-8'): self.filepath = Path(filepath) self.encoding = encoding def read(self) -> Iterator[Dict[str, Any]]: """Read JSONL file line by line.""" try: with open(self.filepath, 'r', encoding=self.encoding) as f: for line in f: if line.strip(): yield json.loads(line) except Exception as e: logger.error(f"Error reading JSONL {self.filepath}: {e}") raise class APISource(DataSource): """Read data from paginated API.""" def __init__(self, base_url: str, auth_token: Optional[str] = None, params: Optional[Dict] = None): import requests self.base_url = base_url self.session = requests.Session() if auth_token: self.session.headers['Authorization'] = f'Bearer {auth_token}' self.params = params or {} def read(self) -> Iterator[Dict[str, Any]]: """Read data from paginated API.""" page = 1 while True: try: params = {**self.params, 'page': page} response = self.session.get(self.base_url, params=params) response.raise_for_status() data = response.json() items = data.get('items', []) if not items: break for item in items: yield item page += 1 except Exception as e: logger.error(f"Error reading from API: {e}") break class GeneratorSource(DataSource): """Source from generator function.""" def __init__(self, generator_func, *args, **kwargs): self.generator_func = generator_func self.args = args self.kwargs = kwargs def read(self) -> Iterator[Dict[str, Any]]: """Read from generator.""" yield from self.generator_func(*self.args, **self.kwargs)
Bước 2: Data Transformations
# pipeline/transformations.pyfrom typing import Iterator, Dict, Any, Callable, Listimport refrom datetime import datetimeimport logging logger = logging.getLogger(__name__) def transform_map(data: Iterator[Dict], func: Callable) -> Iterator[Dict]: """Apply function to each record.""" for record in data: try: yield func(record) except Exception as e: logger.error(f"Error in map transformation: {e}") def transform_filter(data: Iterator[Dict], predicate: Callable) -> Iterator[Dict]: """Filter records based on predicate.""" for record in data: try: if predicate(record): yield record except Exception as e: logger.error(f"Error in filter transformation: {e}") def transform_select(data: Iterator[Dict], fields: List[str]) -> Iterator[Dict]: """Select specific fields from records.""" for record in data: yield {field: record.get(field) for field in fields} def transform_rename(data: Iterator[Dict], mapping: Dict[str, str]) -> Iterator[Dict]: """Rename fields in records.""" for record in data: yield { mapping.get(key, key): value for key, value in record.items() } def transform_add_field(data: Iterator[Dict], field: str, func: Callable) -> Iterator[Dict]: """Add computed field to records.""" for record in data: new_record = record.copy() try: new_record[field] = func(record) except Exception as e: logger.error(f"Error computing field {field}: {e}") new_record[field] = None yield new_record def transform_clean_text(data: Iterator[Dict], fields: List[str]) -> Iterator[Dict]: """Clean text fields.""" for record in data: new_record = record.copy() for field in fields: if field in new_record and isinstance(new_record[field], str): # Remove extra whitespace new_record[field] = ' '.join(new_record[field].split()) # Remove special characters (optional) # new_record[field] = re.sub(r'[^\w\s]', '', new_record[field]) yield new_record def transform_parse_date(data: Iterator[Dict], field: str, format: str = '%Y-%m-%d') -> Iterator[Dict]: """Parse date field.""" for record in data: new_record = record.copy() try: if field in new_record and new_record[field]: new_record[field] = datetime.strptime( new_record[field], format ) except Exception as e: logger.error(f"Error parsing date in field {field}: {e}") new_record[field] = None yield new_record def transform_normalize(data: Iterator[Dict], field: str, min_val: float = 0, max_val: float = 1) -> Iterator[Dict]: """Normalize numeric field to range.""" # First pass: collect all values records = list(data) values = [r.get(field, 0) for r in records if isinstance(r.get(field), (int, float))] if not values: yield from records return data_min = min(values) data_max = max(values) range_val = data_max - data_min # Second pass: normalize for record in records: new_record = record.copy() if field in new_record and isinstance(new_record[field], (int, float)): if range_val > 0: normalized = (new_record[field] - data_min) / range_val new_record[field] = min_val + normalized * (max_val - min_val) yield new_record class Pipeline: """Data processing pipeline.""" def __init__(self, source: 'DataSource'): self.source = source self.transformations = [] def map(self, func: Callable) -> 'Pipeline': """Add map transformation.""" self.transformations.append( lambda data: transform_map(data, func) ) return self def filter(self, predicate: Callable) -> 'Pipeline': """Add filter transformation.""" self.transformations.append( lambda data: transform_filter(data, predicate) ) return self def select(self, fields: List[str]) -> 'Pipeline': """Add select transformation.""" self.transformations.append( lambda data: transform_select(data, fields) ) return self def rename(self, mapping: Dict[str, str]) -> 'Pipeline': """Add rename transformation.""" self.transformations.append( lambda data: transform_rename(data, mapping) ) return self def add_field(self, field: str, func: Callable) -> 'Pipeline': """Add computed field.""" self.transformations.append( lambda data: transform_add_field(data, field, func) ) return self def clean_text(self, fields: List[str]) -> 'Pipeline': """Add text cleaning.""" self.transformations.append( lambda data: transform_clean_text(data, fields) ) return self def parse_date(self, field: str, format: str = '%Y-%m-%d') -> 'Pipeline': """Add date parsing.""" self.transformations.append( lambda data: transform_parse_date(data, field, format) ) return self def execute(self) -> Iterator[Dict[str, Any]]: """Execute pipeline.""" data = self.source.read() # Apply all transformations for transformation in self.transformations: data = transformation(data) yield from data def collect(self) -> List[Dict[str, Any]]: """Collect all results into list.""" return list(self.execute())
Bước 3: Aggregations
# pipeline/aggregations.pyfrom typing import Iterator, Dict, Any, Callable, Listfrom collections import defaultdictimport statistics class Aggregator: """Aggregate data from pipeline.""" @staticmethod def count(data: Iterator[Dict]) -> int: """Count records.""" return sum(1 for _ in data) @staticmethod def sum_field(data: Iterator[Dict], field: str) -> float: """Sum numeric field.""" return sum( record.get(field, 0) for record in data if isinstance(record.get(field), (int, float)) ) @staticmethod def avg_field(data: Iterator[Dict], field: str) -> float: """Average of numeric field.""" values = [ record.get(field, 0) for record in data if isinstance(record.get(field), (int, float)) ] return statistics.mean(values) if values else 0 @staticmethod def min_field(data: Iterator[Dict], field: str) -> Any: """Minimum value of field.""" values = [ record.get(field) for record in data if record.get(field) is not None ] return min(values) if values else None @staticmethod def max_field(data: Iterator[Dict], field: str) -> Any: """Maximum value of field.""" values = [ record.get(field) for record in data if record.get(field) is not None ] return max(values) if values else None @staticmethod def group_by(data: Iterator[Dict], key_field: str, agg_field: str, agg_func: str = 'count') -> Dict[Any, Any]: """Group by key and aggregate.""" groups = defaultdict(list) for record in data: key = record.get(key_field) if key is not None: groups[key].append(record.get(agg_field)) # Apply aggregation result = {} for key, values in groups.items(): if agg_func == 'count': result[key] = len(values) elif agg_func == 'sum': result[key] = sum(v for v in values if isinstance(v, (int, float))) elif agg_func == 'avg': numeric_values = [v for v in values if isinstance(v, (int, float))] result[key] = statistics.mean(numeric_values) if numeric_values else 0 elif agg_func == 'min': result[key] = min(v for v in values if v is not None) elif agg_func == 'max': result[key] = max(v for v in values if v is not None) return result @staticmethod def statistics(data: Iterator[Dict], field: str) -> Dict[str, float]: """Calculate statistics for field.""" values = [ record.get(field, 0) for record in data if isinstance(record.get(field), (int, float)) ] if not values: return {} return { 'count': len(values), 'sum': sum(values), 'mean': statistics.mean(values), 'median': statistics.median(values), 'min': min(values), 'max': max(values), 'stdev': statistics.stdev(values) if len(values) > 1 else 0 }
Bước 4: Data Sinks (Outputs)
# pipeline/sinks.pyimport csvimport jsonfrom typing import Iterator, Dict, Any, Listfrom pathlib import Pathimport logging logger = logging.getLogger(__name__) class DataSink: """Base class for data sinks.""" def write(self, data: Iterator[Dict[str, Any]]): """Write data.""" raise NotImplementedError class CSVSink(DataSink): """Write data to CSV file.""" def __init__(self, filepath: str, fieldnames: List[str] = None): self.filepath = Path(filepath) self.fieldnames = fieldnames def write(self, data: Iterator[Dict[str, Any]]): """Write to CSV file.""" records = list(data) if not records: logger.warning("No data to write") return fieldnames = self.fieldnames or list(records[0].keys()) try: with open(self.filepath, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(records) logger.info(f"Wrote {len(records)} records to {self.filepath}") except Exception as e: logger.error(f"Error writing CSV: {e}") raise class JSONLSink(DataSink): """Write data to JSON Lines file.""" def __init__(self, filepath: str): self.filepath = Path(filepath) def write(self, data: Iterator[Dict[str, Any]]): """Write to JSONL file.""" count = 0 try: with open(self.filepath, 'w', encoding='utf-8') as f: for record in data: f.write(json.dumps(record, ensure_ascii=False) + '\n') count += 1 logger.info(f"Wrote {count} records to {self.filepath}") except Exception as e: logger.error(f"Error writing JSONL: {e}") raise class DatabaseSink(DataSink): """Write data to database.""" def __init__(self, db_manager, repository_class): self.db_manager = db_manager self.repository_class = repository_class def write(self, data: Iterator[Dict[str, Any]]): """Write to database.""" count = 0 try: with self.db_manager as session: repo = self.repository_class(session) for record in data: repo.create(**record) count += 1 logger.info(f"Wrote {count} records to database") except Exception as e: logger.error(f"Error writing to database: {e}") raise class ConsoleSink(DataSink): """Write data to console.""" def __init__(self, max_records: int = 10): self.max_records = max_records def write(self, data: Iterator[Dict[str, Any]]): """Print to console.""" count = 0 for record in data: if count < self.max_records: print(json.dumps(record, indent=2, ensure_ascii=False)) count += 1 if count > self.max_records: print(f"\n... and {count - self.max_records} more records") print(f"\nTotal: {count} records")
Bước 5: Complete Pipeline Example
# examples/sales_pipeline.py"""Example: Sales Data Pipeline Input: CSV file with sales dataProcessing: Clean, transform, aggregateOutput: Statistics and reports""" from pipeline.sources import CSVSourcefrom pipeline.transformations import Pipelinefrom pipeline.aggregations import Aggregatorfrom pipeline.sinks import CSVSink, JSONLSink, ConsoleSink def run_sales_pipeline(): """Process sales data.""" # Create pipeline pipeline = Pipeline(CSVSource('data/sales.csv')) # Add transformations pipeline = ( pipeline .filter(lambda r: float(r.get('amount', 0)) > 0) # Valid sales only .clean_text(['product_name', 'customer_name']) .parse_date('sale_date', '%Y-%m-%d') .add_field('total', lambda r: float(r['amount']) * int(r['quantity'])) .rename({'customer_name': 'customer', 'product_name': 'product'}) .select(['sale_date', 'customer', 'product', 'quantity', 'amount', 'total']) ) # Execute and collect results = list(pipeline.execute()) print(f"Processed {len(results)} sales records") # Aggregations print("\n=== Sales Statistics ===") # Total sales total_revenue = sum(r['total'] for r in results) print(f"Total Revenue: ${total_revenue:,.2f}") # Group by product product_sales = Aggregator.group_by( iter(results), 'product', 'total', 'sum' ) print("\nSales by Product:") for product, sales in sorted(product_sales.items(), key=lambda x: x[1], reverse=True): print(f" {product}: ${sales:,.2f}") # Group by customer customer_sales = Aggregator.group_by( iter(results), 'customer', 'total', 'sum' ) print("\nTop Customers:") for customer, sales in sorted(customer_sales.items(), key=lambda x: x[1], reverse=True)[:5]: print(f" {customer}: ${sales:,.2f}") # Amount statistics amount_stats = Aggregator.statistics(iter(results), 'total') print("\nSales Amount Statistics:") for key, value in amount_stats.items(): print(f" {key}: ${value:,.2f}") # Write outputs CSVSink('output/sales_processed.csv').write(iter(results)) JSONLSink('output/sales_processed.jsonl').write(iter(results)) print("\n✅ Pipeline completed!") if __name__ == '__main__': run_sales_pipeline()
Project 3: RESTful API Client Library 🌐
Mô tả: Xây dựng thư viện client cho REST APIs với retry, caching, và rate limiting.
Yêu cầu chức năng:
- HTTP methods (GET, POST, PUT, DELETE, PATCH)
- Authentication (Bearer, Basic, API Key)
- Automatic retry với exponential backoff
- Response caching với TTL
- Rate limiting
- Request/response interceptors
- Pagination support
- Error handling
- Logging
Hướng dẫn thực hiện:
Bước 1: Core Client
# api_client/client.pyimport requestsfrom typing import Dict, Any, Optional, Callable, Listfrom datetime import datetime, timedeltaimport timeimport hashlibimport jsonimport loggingfrom pathlib import Path logger = logging.getLogger(__name__) class HTTPClient: """HTTP client with retry and caching.""" def __init__(self, base_url: str, timeout: int = 30, max_retries: int = 3, cache_dir: str = '.cache'): """ Initialize HTTP client. Args: base_url: Base URL for API timeout: Request timeout in seconds max_retries: Maximum retry attempts cache_dir: Directory for response cache """ self.base_url = base_url.rstrip('/') self.timeout = timeout self.max_retries = max_retries self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.session = requests.Session() self.request_interceptors: List[Callable] = [] self.response_interceptors: List[Callable] = [] def add_request_interceptor(self, func: Callable): """Add request interceptor.""" self.request_interceptors.append(func) def add_response_interceptor(self, func: Callable): """Add response interceptor.""" self.response_interceptors.append(func) def _get_cache_key(self, url: str, params: Dict = None) -> str: """Generate cache key.""" cache_data = f"{url}:{json.dumps(params or {}, sort_keys=True)}" return hashlib.md5(cache_data.encode()).hexdigest() def _get_from_cache(self, key: str, ttl: int) -> Optional[Dict]: """Get response from cache.""" cache_file = self.cache_dir / f"{key}.json" if not cache_file.exists(): return None try: with open(cache_file, 'r') as f: data = json.load(f) # Check expiration cached_at = datetime.fromisoformat(data['cached_at']) if datetime.now() - cached_at > timedelta(seconds=ttl): cache_file.unlink() return None logger.debug(f"Cache hit: {key}") return data['response'] except Exception as e: logger.error(f"Cache read error: {e}") return None def _save_to_cache(self, key: str, response: Dict): """Save response to cache.""" cache_file = self.cache_dir / f"{key}.json" try: data = { 'cached_at': datetime.now().isoformat(), 'response': response } with open(cache_file, 'w') as f: json.dump(data, f) logger.debug(f"Cached response: {key}") except Exception as e: logger.error(f"Cache write error: {e}") def request(self, method: str, endpoint: str, use_cache: bool = False, cache_ttl: int = 3600, **kwargs) -> requests.Response: """ Make HTTP request with retry and caching. Args: method: HTTP method endpoint: API endpoint use_cache: Enable response caching cache_ttl: Cache time-to-live in seconds **kwargs: Additional request parameters Returns: Response object """ url = f"{self.base_url}/{endpoint.lstrip('/')}" # Check cache for GET requests if use_cache and method.upper() == 'GET': cache_key = self._get_cache_key(url, kwargs.get('params')) cached = self._get_from_cache(cache_key, cache_ttl) if cached: # Create mock response response = requests.Response() response._content = json.dumps(cached).encode() response.status_code = 200 return response # Apply request interceptors for interceptor in self.request_interceptors: kwargs = interceptor(method, url, kwargs) # Retry logic last_exception = None for attempt in range(self.max_retries): try: logger.info(f"{method} {url} (attempt {attempt + 1})") response = self.session.request( method, url, timeout=self.timeout, **kwargs ) response.raise_for_status() # Apply response interceptors for interceptor in self.response_interceptors: response = interceptor(response) # Cache GET responses if use_cache and method.upper() == 'GET': cache_key = self._get_cache_key(url, kwargs.get('params')) self._save_to_cache(cache_key, response.json()) return response except requests.exceptions.RequestException as e: last_exception = e logger.warning(f"Request failed (attempt {attempt + 1}): {e}") if attempt < self.max_retries - 1: wait_time = 2 ** attempt # Exponential backoff logger.info(f"Retrying in {wait_time}s...") time.sleep(wait_time) # All retries failed logger.error(f"Request failed after {self.max_retries} attempts") raise last_exception def get(self, endpoint: str, **kwargs) -> requests.Response: """GET request.""" return self.request('GET', endpoint, **kwargs) def post(self, endpoint: str, **kwargs) -> requests.Response: """POST request.""" return self.request('POST', endpoint, **kwargs) def put(self, endpoint: str, **kwargs) -> requests.Response: """PUT request.""" return self.request('PUT', endpoint, **kwargs) def patch(self, endpoint: str, **kwargs) -> requests.Response: """PATCH request.""" return self.request('PATCH', endpoint, **kwargs) def delete(self, endpoint: str, **kwargs) -> requests.Response: """DELETE request.""" return self.request('DELETE', endpoint, **kwargs)
Tiếp tục với Authentication, Rate Limiting, và bài tập tự làm trong file riêng...
Bài tiếp theo: Bài 21.4: API Client hoàn chỉnh và Bài Tập Tự Làm 🚀