Bài 20: Working với APIs (Phần 2)
Sessions Management
Advanced Session Configuration
import requestsfrom requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.retry import Retry class APIClient: """Advanced API client with session management.""" def __init__(self, base_url: str, timeout: int = 10): self.base_url = base_url.rstrip('/') self.timeout = timeout self.session = self._create_session() def _create_session(self) -> requests.Session: """Create configured session.""" session = requests.Session() # Set default headers session.headers.update({ 'User-Agent': 'APIClient/1.0', 'Accept': 'application/json', 'Content-Type': 'application/json' }) # Configure retry strategy retry_strategy = Retry( total=3, status_forcelist=[429, 500, 502, 503, 504], method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"], backoff_factor=1 ) # Mount adapter with retry adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=10, pool_maxsize=20 ) session.mount("http://", adapter) session.mount("https://", adapter) return session def request(self, method: str, endpoint: str, **kwargs): """Make request with session.""" url = f"{self.base_url}/{endpoint.lstrip('/')}" # Set default timeout if not provided if 'timeout' not in kwargs: kwargs['timeout'] = self.timeout response = self.session.request(method, url, **kwargs) response.raise_for_status() return response def get(self, endpoint: str, **kwargs): """GET request.""" return self.request('GET', endpoint, **kwargs) def post(self, endpoint: str, **kwargs): """POST request.""" return self.request('POST', endpoint, **kwargs) def put(self, endpoint: str, **kwargs): """PUT request.""" return self.request('PUT', endpoint, **kwargs) def delete(self, endpoint: str, **kwargs): """DELETE request.""" return self.request('DELETE', endpoint, **kwargs) def close(self): """Close session.""" self.session.close() def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() # Usagewith APIClient('https://api.example.com') as client: response = client.get('/users') users = response.json() print(f"Users: {len(users)}")
Session with Authentication
import requestsfrom typing import Optional class AuthenticatedClient: """API client with authentication.""" def __init__(self, base_url: str, api_key: Optional[str] = None, bearer_token: Optional[str] = None): self.base_url = base_url.rstrip('/') self.session = requests.Session() # Set authentication if api_key: self.session.headers['X-API-Key'] = api_key elif bearer_token: self.session.headers['Authorization'] = f'Bearer {bearer_token}' def refresh_token(self, new_token: str): """Refresh bearer token.""" self.session.headers['Authorization'] = f'Bearer {new_token}' def get(self, endpoint: str, **kwargs): """Authenticated GET request.""" url = f"{self.base_url}/{endpoint.lstrip('/')}" return self.session.get(url, **kwargs) # Usageclient = AuthenticatedClient( base_url='https://api.example.com', bearer_token='your_token_here') # Make authenticated requestsresponse = client.get('/profile')print(response.json()) # Refresh token if neededclient.refresh_token('new_token_here')
Async Requests với aiohttp
Installation
pip install aiohttp
Basic Async Requests
import asyncioimport aiohttp async def fetch_user(session: aiohttp.ClientSession, user_id: int): """Fetch single user.""" url = f'https://jsonplaceholder.typicode.com/users/{user_id}' async with session.get(url) as response: return await response.json() async def main(): """Main async function.""" async with aiohttp.ClientSession() as session: # Fetch single user user = await fetch_user(session, 1) print(f"User: {user['name']}") # Fetch multiple users concurrently tasks = [fetch_user(session, i) for i in range(1, 6)] users = await asyncio.gather(*tasks) print(f"\nFetched {len(users)} users:") for user in users: print(f"- {user['name']}") # Runasyncio.run(main())
Async API Client
import asyncioimport aiohttpfrom typing import List, Dict, Any class AsyncAPIClient: """Async API client with aiohttp.""" def __init__(self, base_url: str, timeout: int = 10): self.base_url = base_url.rstrip('/') self.timeout = aiohttp.ClientTimeout(total=timeout) self.session = None async def __aenter__(self): """Async context manager entry.""" self.session = aiohttp.ClientSession(timeout=self.timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self.session: await self.session.close() async def request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Make async request.""" url = f"{self.base_url}/{endpoint.lstrip('/')}" async with self.session.request(method, url, **kwargs) as response: response.raise_for_status() return await response.json() async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]: """Async GET request.""" return await self.request('GET', endpoint, **kwargs) async def post(self, endpoint: str, **kwargs) -> Dict[str, Any]: """Async POST request.""" return await self.request('POST', endpoint, **kwargs) async def get_many(self, endpoints: List[str]) -> List[Dict[str, Any]]: """Fetch multiple endpoints concurrently.""" tasks = [self.get(endpoint) for endpoint in endpoints] return await asyncio.gather(*tasks) # Usageasync def main(): async with AsyncAPIClient('https://jsonplaceholder.typicode.com') as client: # Single request user = await client.get('/users/1') print(f"User: {user['name']}") # Multiple concurrent requests endpoints = [f'/users/{i}' for i in range(1, 6)] users = await client.get_many(endpoints) print(f"\nFetched {len(users)} users concurrently:") for user in users: print(f"- {user['name']}") asyncio.run(main())
Rate-Limited Async Client
import asyncioimport aiohttpfrom typing import Dict, Anyfrom datetime import datetime, timedelta class AsyncRateLimitedClient: """Async client with rate limiting.""" def __init__(self, base_url: str, requests_per_second: int = 5): self.base_url = base_url.rstrip('/') self.session = None self.requests_per_second = requests_per_second self.min_interval = 1.0 / requests_per_second self.last_request_time = None self.semaphore = asyncio.Semaphore(requests_per_second) async def __aenter__(self): """Async context manager entry.""" self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self.session: await self.session.close() async def _wait_if_needed(self): """Wait to respect rate limit.""" if self.last_request_time: elapsed = datetime.now() - self.last_request_time wait_time = self.min_interval - elapsed.total_seconds() if wait_time > 0: await asyncio.sleep(wait_time) self.last_request_time = datetime.now() async def request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Rate-limited async request.""" async with self.semaphore: await self._wait_if_needed() url = f"{self.base_url}/{endpoint.lstrip('/')}" async with self.session.request(method, url, **kwargs) as response: response.raise_for_status() return await response.json() async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]: """Rate-limited GET request.""" return await self.request('GET', endpoint, **kwargs) # Usageasync def main(): async with AsyncRateLimitedClient( 'https://jsonplaceholder.typicode.com', requests_per_second=2 ) as client: # These requests will be rate-limited tasks = [client.get(f'/users/{i}') for i in range(1, 11)] print("Fetching 10 users (rate limited to 2 req/s)...") start = datetime.now() users = await asyncio.gather(*tasks) elapsed = (datetime.now() - start).total_seconds() print(f"Fetched {len(users)} users in {elapsed:.1f}s") asyncio.run(main())
Webhooks
Simple Webhook Server
from flask import Flask, request, jsonifyimport hmacimport hashlib app = Flask(__name__) # Secret for webhook verificationWEBHOOK_SECRET = 'your_webhook_secret_here' def verify_signature(payload: bytes, signature: str) -> bool: """Verify webhook signature.""" expected = hmac.new( WEBHOOK_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature) @app.route('/webhook', methods=['POST'])def webhook_handler(): """Handle incoming webhook.""" # Get signature from header signature = request.headers.get('X-Webhook-Signature', '') # Verify signature if not verify_signature(request.data, signature): return jsonify({'error': 'Invalid signature'}), 401 # Process webhook data data = request.json event_type = data.get('event') print(f"Received webhook: {event_type}") print(f"Data: {data}") # Handle different event types if event_type == 'user.created': handle_user_created(data) elif event_type == 'order.placed': handle_order_placed(data) else: print(f"Unknown event: {event_type}") return jsonify({'status': 'success'}), 200 def handle_user_created(data): """Handle user created event.""" user_id = data.get('user_id') print(f"Processing new user: {user_id}") # Send welcome email, create profile, etc. def handle_order_placed(data): """Handle order placed event.""" order_id = data.get('order_id') print(f"Processing new order: {order_id}") # Process payment, send confirmation, etc. if __name__ == '__main__': app.run(port=5000, debug=True)
Webhook Client (Sender)
import requestsimport hmacimport hashlibimport json class WebhookClient: """Client for sending webhooks.""" def __init__(self, webhook_url: str, secret: str): self.webhook_url = webhook_url self.secret = secret def _generate_signature(self, payload: str) -> str: """Generate HMAC signature.""" return hmac.new( self.secret.encode(), payload.encode(), hashlib.sha256 ).hexdigest() def send_webhook(self, event_type: str, data: dict) -> bool: """Send webhook with signature.""" payload = { 'event': event_type, **data } payload_str = json.dumps(payload) signature = self._generate_signature(payload_str) headers = { 'Content-Type': 'application/json', 'X-Webhook-Signature': signature } try: response = requests.post( self.webhook_url, data=payload_str, headers=headers, timeout=10 ) response.raise_for_status() print(f"Webhook sent successfully: {event_type}") return True except requests.exceptions.RequestException as e: print(f"Failed to send webhook: {e}") return False # Usageclient = WebhookClient( webhook_url='http://localhost:5000/webhook', secret='your_webhook_secret_here') # Send user created eventclient.send_webhook('user.created', { 'user_id': 123, 'username': 'john_doe', 'email': '[email protected]'}) # Send order placed eventclient.send_webhook('order.placed', { 'order_id': '12345', 'total': 99.99, 'items': 3})
Webhook Retry Logic
import requestsimport timefrom typing import Dict, Any class ReliableWebhookClient: """Webhook client with retry logic.""" def __init__(self, webhook_url: str, max_retries: int = 3): self.webhook_url = webhook_url self.max_retries = max_retries def send_webhook(self, data: Dict[str, Any]) -> bool: """Send webhook with retry logic.""" attempt = 0 delay = 1 while attempt < self.max_retries: try: response = requests.post( self.webhook_url, json=data, timeout=10 ) response.raise_for_status() print(f"Webhook sent successfully on attempt {attempt + 1}") return True except requests.exceptions.RequestException as e: attempt += 1 if attempt < self.max_retries: print(f"Attempt {attempt} failed: {e}") print(f"Retrying in {delay}s...") time.sleep(delay) delay *= 2 # Exponential backoff else: print(f"Failed to send webhook after {self.max_retries} attempts") return False # Usageclient = ReliableWebhookClient('http://localhost:5000/webhook') client.send_webhook({ 'event': 'payment.completed', 'payment_id': 'pay_123', 'amount': 100.00})
API Testing
Testing with responses Library
import requestsimport responsesimport pytest # Install: pip install responses pytest class GitHubClient: """GitHub API client for testing.""" BASE_URL = 'https://api.github.com' def __init__(self, token: str): self.token = token self.session = requests.Session() self.session.headers['Authorization'] = f'token {token}' def get_user(self, username: str): """Get user information.""" response = self.session.get(f'{self.BASE_URL}/users/{username}') response.raise_for_status() return response.json() def create_repo(self, name: str, private: bool = False): """Create repository.""" response = self.session.post( f'{self.BASE_URL}/user/repos', json={'name': name, 'private': private} ) response.raise_for_status() return response.json() # Tests@responses.activatedef test_get_user(): """Test get user.""" # Mock response responses.add( responses.GET, 'https://api.github.com/users/octocat', json={ 'login': 'octocat', 'id': 1, 'name': 'The Octocat', 'followers': 1000 }, status=200 ) # Test client = GitHubClient(token='test_token') user = client.get_user('octocat') assert user['login'] == 'octocat' assert user['name'] == 'The Octocat' assert user['followers'] == 1000 @responses.activatedef test_get_user_not_found(): """Test get user not found.""" # Mock 404 response responses.add( responses.GET, 'https://api.github.com/users/nonexistent', status=404 ) # Test client = GitHubClient(token='test_token') with pytest.raises(requests.exceptions.HTTPError): client.get_user('nonexistent') @responses.activatedef test_create_repo(): """Test create repository.""" # Mock response responses.add( responses.POST, 'https://api.github.com/user/repos', json={ 'id': 123, 'name': 'my-repo', 'private': False, 'html_url': 'https://github.com/user/my-repo' }, status=201 ) # Test client = GitHubClient(token='test_token') repo = client.create_repo('my-repo') assert repo['name'] == 'my-repo' assert repo['private'] is False assert 'html_url' in repo # Run testsif __name__ == '__main__': pytest.main([__file__, '-v'])
Testing with unittest.mock
import unittestfrom unittest.mock import patch, Mockimport requests class APIClient: """API client for testing.""" def __init__(self, base_url: str): self.base_url = base_url def get_data(self, endpoint: str): """Get data from API.""" response = requests.get(f"{self.base_url}/{endpoint}") response.raise_for_status() return response.json() class TestAPIClient(unittest.TestCase): """Test API client.""" @patch('requests.get') def test_get_data_success(self, mock_get): """Test successful data retrieval.""" # Configure mock mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {'id': 1, 'name': 'Test'} mock_get.return_value = mock_response # Test client = APIClient('https://api.example.com') data = client.get_data('users/1') # Assertions self.assertEqual(data['id'], 1) self.assertEqual(data['name'], 'Test') mock_get.assert_called_once_with('https://api.example.com/users/1') @patch('requests.get') def test_get_data_error(self, mock_get): """Test error handling.""" # Configure mock to raise exception mock_get.side_effect = requests.exceptions.ConnectionError() # Test client = APIClient('https://api.example.com') with self.assertRaises(requests.exceptions.ConnectionError): client.get_data('users/1') if __name__ == '__main__': unittest.main()
2 Ứng Dụng Thực Tế
1. API Gateway với Load Balancing
import requestsimport randomfrom typing import List, Dict, Anyfrom datetime import datetime, timedelta class APIGateway: """API Gateway with load balancing and health checks.""" def __init__(self, backends: List[str]): self.backends = backends self.backend_health = {url: True for url in backends} self.backend_stats = { url: {'requests': 0, 'errors': 0, 'avg_response_time': 0} for url in backends } self.session = requests.Session() def check_health(self, backend: str) -> bool: """Check backend health.""" try: response = self.session.get( f"{backend}/health", timeout=2 ) return response.status_code == 200 except: return False def update_health_status(self): """Update health status for all backends.""" for backend in self.backends: self.backend_health[backend] = self.check_health(backend) def get_healthy_backends(self) -> List[str]: """Get list of healthy backends.""" return [ backend for backend in self.backends if self.backend_health[backend] ] def select_backend(self) -> str: """Select backend using weighted round-robin.""" healthy = self.get_healthy_backends() if not healthy: raise Exception("No healthy backends available") # Weight by inverse of error rate weights = [] for backend in healthy: stats = self.backend_stats[backend] error_rate = stats['errors'] / max(stats['requests'], 1) weight = 1.0 / (error_rate + 0.1) # Avoid division by zero weights.append(weight) # Weighted random selection return random.choices(healthy, weights=weights)[0] def request(self, method: str, endpoint: str, **kwargs) -> requests.Response: """Route request to backend.""" backend = self.select_backend() url = f"{backend}/{endpoint.lstrip('/')}" start_time = datetime.now() try: response = self.session.request(method, url, **kwargs) response.raise_for_status() # Update stats elapsed = (datetime.now() - start_time).total_seconds() stats = self.backend_stats[backend] stats['requests'] += 1 stats['avg_response_time'] = ( (stats['avg_response_time'] * (stats['requests'] - 1) + elapsed) / stats['requests'] ) return response except requests.exceptions.RequestException as e: # Update error stats self.backend_stats[backend]['errors'] += 1 self.backend_stats[backend]['requests'] += 1 # Mark backend as unhealthy if too many errors stats = self.backend_stats[backend] error_rate = stats['errors'] / stats['requests'] if error_rate > 0.5: self.backend_health[backend] = False raise def get_stats(self) -> Dict[str, Any]: """Get gateway statistics.""" return { 'backends': self.backends, 'health': self.backend_health, 'stats': self.backend_stats } # Usagegateway = APIGateway([ 'http://backend1.example.com', 'http://backend2.example.com', 'http://backend3.example.com']) # Check healthgateway.update_health_status() # Route requeststry: response = gateway.request('GET', '/api/users') print(response.json())except Exception as e: print(f"Request failed: {e}") # View statsprint(gateway.get_stats())
2. API Monitoring và Analytics
import requestsimport timefrom typing import Dict, List, Anyfrom datetime import datetimefrom collections import defaultdict class APIMonitor: """Monitor API performance and track analytics.""" def __init__(self): self.requests_log = [] self.endpoint_stats = defaultdict(lambda: { 'count': 0, 'errors': 0, 'total_time': 0, 'min_time': float('inf'), 'max_time': 0 }) def track_request(self, endpoint: str, method: str, status_code: int, response_time: float, error: bool = False): """Track request metrics.""" # Log request self.requests_log.append({ 'endpoint': endpoint, 'method': method, 'status_code': status_code, 'response_time': response_time, 'error': error, 'timestamp': datetime.now() }) # Update stats key = f"{method} {endpoint}" stats = self.endpoint_stats[key] stats['count'] += 1 if error: stats['errors'] += 1 stats['total_time'] += response_time stats['min_time'] = min(stats['min_time'], response_time) stats['max_time'] = max(stats['max_time'], response_time) def get_stats(self) -> Dict[str, Any]: """Get comprehensive statistics.""" total_requests = len(self.requests_log) total_errors = sum(1 for r in self.requests_log if r['error']) endpoint_summary = {} for key, stats in self.endpoint_stats.items(): avg_time = stats['total_time'] / stats['count'] error_rate = (stats['errors'] / stats['count']) * 100 endpoint_summary[key] = { 'requests': stats['count'], 'errors': stats['errors'], 'error_rate': f"{error_rate:.1f}%", 'avg_response_time': f"{avg_time:.3f}s", 'min_response_time': f"{stats['min_time']:.3f}s", 'max_response_time': f"{stats['max_time']:.3f}s" } return { 'total_requests': total_requests, 'total_errors': total_errors, 'error_rate': f"{(total_errors / total_requests * 100):.1f}%" if total_requests > 0 else "0%", 'endpoints': endpoint_summary } def get_slow_requests(self, threshold: float = 1.0) -> List[Dict]: """Get requests slower than threshold.""" return [ r for r in self.requests_log if r['response_time'] > threshold ] def print_report(self): """Print monitoring report.""" stats = self.get_stats() print("\n" + "="*60) print("API MONITORING REPORT") print("="*60) print(f"\nOverall Statistics:") print(f" Total Requests: {stats['total_requests']}") print(f" Total Errors: {stats['total_errors']}") print(f" Error Rate: {stats['error_rate']}") print(f"\nEndpoint Statistics:") for endpoint, data in stats['endpoints'].items(): print(f"\n {endpoint}:") print(f" Requests: {data['requests']}") print(f" Errors: {data['errors']} ({data['error_rate']})") print(f" Avg Response: {data['avg_response_time']}") print(f" Min Response: {data['min_response_time']}") print(f" Max Response: {data['max_response_time']}") # Slow requests slow = self.get_slow_requests() if slow: print(f"\nSlow Requests (>1s): {len(slow)}") for req in slow[:5]: print(f" - {req['method']} {req['endpoint']}: {req['response_time']:.3f}s") class MonitoredAPIClient: """API client with monitoring.""" def __init__(self, base_url: str, monitor: APIMonitor): self.base_url = base_url.rstrip('/') self.monitor = monitor self.session = requests.Session() def request(self, method: str, endpoint: str, **kwargs): """Make monitored request.""" url = f"{self.base_url}/{endpoint.lstrip('/')}" start_time = time.time() error = False status_code = None try: response = self.session.request(method, url, **kwargs) status_code = response.status_code response.raise_for_status() return response except requests.exceptions.RequestException as e: error = True if hasattr(e, 'response') and e.response: status_code = e.response.status_code else: status_code = 0 raise finally: response_time = time.time() - start_time self.monitor.track_request( endpoint=endpoint, method=method, status_code=status_code or 0, response_time=response_time, error=error ) def get(self, endpoint: str, **kwargs): """Monitored GET request.""" return self.request('GET', endpoint, **kwargs) def post(self, endpoint: str, **kwargs): """Monitored POST request.""" return self.request('POST', endpoint, **kwargs) # Usagemonitor = APIMonitor()client = MonitoredAPIClient('https://jsonplaceholder.typicode.com', monitor) # Make requestsprint("Making API requests...")for i in range(1, 11): try: response = client.get(f'/users/{i}') print(f"✓ User {i}: {response.json()['name']}") except Exception as e: print(f"✗ User {i}: Error") # Print monitoring reportmonitor.print_report()
Best Practices Summary
1. Connection Pooling
# ✅ Use session for connection reusesession = requests.Session()for i in range(100): session.get('https://api.example.com/data')
2. Timeout Strategy
# ✅ Always set timeoutresponse = requests.get( 'https://api.example.com/data', timeout=(3, 10) # (connect, read))
3. Error Handling
# ✅ Handle specific exceptionstry: response = requests.get(url, timeout=5) response.raise_for_status()except requests.exceptions.Timeout: # Handle timeout passexcept requests.exceptions.HTTPError as e: # Handle HTTP error print(f"Status: {e.response.status_code}")
4. Rate Limiting
# ✅ Implement rate limitingimport timefrom datetime import datetime, timedelta last_request = Nonemin_interval = 1.0 # 1 second between requests if last_request: elapsed = (datetime.now() - last_request).total_seconds() if elapsed < min_interval: time.sleep(min_interval - elapsed) response = requests.get(url)last_request = datetime.now()
5. Retry with Backoff
# ✅ Exponential backoffimport time max_retries = 3delay = 1 for attempt in range(max_retries): try: response = requests.get(url, timeout=5) response.raise_for_status() break except requests.exceptions.RequestException: if attempt < max_retries - 1: time.sleep(delay) delay *= 2 # Exponential backoff
Bài Tập Thực Hành
Bài 1: Async API Client
Tạo async client fetch data từ multiple endpoints đồng thời.
Bài 2: Webhook Server
Implement webhook server với signature verification và event processing.
Bài 3: API Testing Suite
Viết test suite cho API client sử dụng responses library.
Bài 4: API Gateway
Implement API gateway với load balancing và health checks.
Bài 5: Monitoring Dashboard
Tạo monitoring system track API performance và errors.
Tóm Tắt
Trong Part 2 chúng ta đã học:
- ✅ Sessions Management - Advanced configuration, authentication
- ✅ Async Requests - aiohttp, concurrent requests, rate limiting
- ✅ Webhooks - Server, client, signature verification, retry logic
- ✅ API Testing - responses library, unittest.mock
- ✅ Real Applications - API Gateway, monitoring system
- ✅ Best Practices - Connection pooling, timeouts, error handling, rate limiting
Bạn đã hoàn thành Module 02: Python Advanced! 🎉
Những kiến thức từ module này sẽ giúp bạn:
- ✅ Viết Python code chuyên nghiệp
- ✅ Xử lý async operations
- ✅ Làm việc với APIs
- ✅ Testing và debugging
- ✅ Performance optimization
- ✅ Best practices và design patterns
Chúc mừng bạn đã hoàn thành khóa học Python Advanced! 🚀🎓
Bài tiếp theo: Bài 21: Practices & Projects 🚀