Bài 20: Working với APIs (Phần 2)

Sessions Management

Advanced Session Configuration

import requestsfrom requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.retry import Retry class APIClient:    """Advanced API client with session management."""        def __init__(self, base_url: str, timeout: int = 10):        self.base_url = base_url.rstrip('/')        self.timeout = timeout        self.session = self._create_session()        def _create_session(self) -> requests.Session:        """Create configured session."""        session = requests.Session()                # Set default headers        session.headers.update({            'User-Agent': 'APIClient/1.0',            'Accept': 'application/json',            'Content-Type': 'application/json'        })                # Configure retry strategy        retry_strategy = Retry(            total=3,            status_forcelist=[429, 500, 502, 503, 504],            method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"],            backoff_factor=1        )                # Mount adapter with retry        adapter = HTTPAdapter(            max_retries=retry_strategy,            pool_connections=10,            pool_maxsize=20        )                session.mount("http://", adapter)        session.mount("https://", adapter)                return session        def request(self, method: str, endpoint: str, **kwargs):        """Make request with session."""        url = f"{self.base_url}/{endpoint.lstrip('/')}"                # Set default timeout if not provided        if 'timeout' not in kwargs:            kwargs['timeout'] = self.timeout                response = self.session.request(method, url, **kwargs)        response.raise_for_status()                return response        def get(self, endpoint: str, **kwargs):        """GET request."""        return self.request('GET', endpoint, **kwargs)        def post(self, endpoint: str, **kwargs):        """POST request."""        return self.request('POST', endpoint, **kwargs)        def put(self, endpoint: str, **kwargs):        """PUT request."""        return self.request('PUT', endpoint, **kwargs)        def delete(self, endpoint: str, **kwargs):        """DELETE request."""        return self.request('DELETE', endpoint, **kwargs)        def close(self):        """Close session."""        self.session.close()        def __enter__(self):        """Context manager entry."""        return self        def __exit__(self, exc_type, exc_val, exc_tb):        """Context manager exit."""        self.close() # Usagewith APIClient('https://api.example.com') as client:    response = client.get('/users')    users = response.json()    print(f"Users: {len(users)}")

Session with Authentication

import requestsfrom typing import Optional class AuthenticatedClient:    """API client with authentication."""        def __init__(self, base_url: str, api_key: Optional[str] = None,                 bearer_token: Optional[str] = None):        self.base_url = base_url.rstrip('/')        self.session = requests.Session()                # Set authentication        if api_key:            self.session.headers['X-API-Key'] = api_key        elif bearer_token:            self.session.headers['Authorization'] = f'Bearer {bearer_token}'        def refresh_token(self, new_token: str):        """Refresh bearer token."""        self.session.headers['Authorization'] = f'Bearer {new_token}'        def get(self, endpoint: str, **kwargs):        """Authenticated GET request."""        url = f"{self.base_url}/{endpoint.lstrip('/')}"        return self.session.get(url, **kwargs) # Usageclient = AuthenticatedClient(    base_url='https://api.example.com',    bearer_token='your_token_here') # Make authenticated requestsresponse = client.get('/profile')print(response.json()) # Refresh token if neededclient.refresh_token('new_token_here')

Async Requests với aiohttp

Installation

pip install aiohttp

Basic Async Requests

import asyncioimport aiohttp async def fetch_user(session: aiohttp.ClientSession, user_id: int):    """Fetch single user."""    url = f'https://jsonplaceholder.typicode.com/users/{user_id}'        async with session.get(url) as response:        return await response.json() async def main():    """Main async function."""    async with aiohttp.ClientSession() as session:        # Fetch single user        user = await fetch_user(session, 1)        print(f"User: {user['name']}")                # Fetch multiple users concurrently        tasks = [fetch_user(session, i) for i in range(1, 6)]        users = await asyncio.gather(*tasks)                print(f"\nFetched {len(users)} users:")        for user in users:            print(f"- {user['name']}") # Runasyncio.run(main())

Async API Client

import asyncioimport aiohttpfrom typing import List, Dict, Any class AsyncAPIClient:    """Async API client with aiohttp."""        def __init__(self, base_url: str, timeout: int = 10):        self.base_url = base_url.rstrip('/')        self.timeout = aiohttp.ClientTimeout(total=timeout)        self.session = None        async def __aenter__(self):        """Async context manager entry."""        self.session = aiohttp.ClientSession(timeout=self.timeout)        return self        async def __aexit__(self, exc_type, exc_val, exc_tb):        """Async context manager exit."""        if self.session:            await self.session.close()        async def request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:        """Make async request."""        url = f"{self.base_url}/{endpoint.lstrip('/')}"                async with self.session.request(method, url, **kwargs) as response:            response.raise_for_status()            return await response.json()        async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:        """Async GET request."""        return await self.request('GET', endpoint, **kwargs)        async def post(self, endpoint: str, **kwargs) -> Dict[str, Any]:        """Async POST request."""        return await self.request('POST', endpoint, **kwargs)        async def get_many(self, endpoints: List[str]) -> List[Dict[str, Any]]:        """Fetch multiple endpoints concurrently."""        tasks = [self.get(endpoint) for endpoint in endpoints]        return await asyncio.gather(*tasks) # Usageasync def main():    async with AsyncAPIClient('https://jsonplaceholder.typicode.com') as client:        # Single request        user = await client.get('/users/1')        print(f"User: {user['name']}")                # Multiple concurrent requests        endpoints = [f'/users/{i}' for i in range(1, 6)]        users = await client.get_many(endpoints)                print(f"\nFetched {len(users)} users concurrently:")        for user in users:            print(f"- {user['name']}") asyncio.run(main())

Rate-Limited Async Client

import asyncioimport aiohttpfrom typing import Dict, Anyfrom datetime import datetime, timedelta class AsyncRateLimitedClient:    """Async client with rate limiting."""        def __init__(self, base_url: str, requests_per_second: int = 5):        self.base_url = base_url.rstrip('/')        self.session = None        self.requests_per_second = requests_per_second        self.min_interval = 1.0 / requests_per_second        self.last_request_time = None        self.semaphore = asyncio.Semaphore(requests_per_second)        async def __aenter__(self):        """Async context manager entry."""        self.session = aiohttp.ClientSession()        return self        async def __aexit__(self, exc_type, exc_val, exc_tb):        """Async context manager exit."""        if self.session:            await self.session.close()        async def _wait_if_needed(self):        """Wait to respect rate limit."""        if self.last_request_time:            elapsed = datetime.now() - self.last_request_time            wait_time = self.min_interval - elapsed.total_seconds()                        if wait_time > 0:                await asyncio.sleep(wait_time)                self.last_request_time = datetime.now()        async def request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:        """Rate-limited async request."""        async with self.semaphore:            await self._wait_if_needed()                        url = f"{self.base_url}/{endpoint.lstrip('/')}"                        async with self.session.request(method, url, **kwargs) as response:                response.raise_for_status()                return await response.json()        async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:        """Rate-limited GET request."""        return await self.request('GET', endpoint, **kwargs) # Usageasync def main():    async with AsyncRateLimitedClient(        'https://jsonplaceholder.typicode.com',        requests_per_second=2    ) as client:        # These requests will be rate-limited        tasks = [client.get(f'/users/{i}') for i in range(1, 11)]                print("Fetching 10 users (rate limited to 2 req/s)...")        start = datetime.now()                users = await asyncio.gather(*tasks)                elapsed = (datetime.now() - start).total_seconds()        print(f"Fetched {len(users)} users in {elapsed:.1f}s") asyncio.run(main())

Webhooks

Simple Webhook Server

from flask import Flask, request, jsonifyimport hmacimport hashlib app = Flask(__name__) # Secret for webhook verificationWEBHOOK_SECRET = 'your_webhook_secret_here' def verify_signature(payload: bytes, signature: str) -> bool:    """Verify webhook signature."""    expected = hmac.new(        WEBHOOK_SECRET.encode(),        payload,        hashlib.sha256    ).hexdigest()        return hmac.compare_digest(expected, signature) @app.route('/webhook', methods=['POST'])def webhook_handler():    """Handle incoming webhook."""    # Get signature from header    signature = request.headers.get('X-Webhook-Signature', '')        # Verify signature    if not verify_signature(request.data, signature):        return jsonify({'error': 'Invalid signature'}), 401        # Process webhook data    data = request.json    event_type = data.get('event')        print(f"Received webhook: {event_type}")    print(f"Data: {data}")        # Handle different event types    if event_type == 'user.created':        handle_user_created(data)    elif event_type == 'order.placed':        handle_order_placed(data)    else:        print(f"Unknown event: {event_type}")        return jsonify({'status': 'success'}), 200 def handle_user_created(data):    """Handle user created event."""    user_id = data.get('user_id')    print(f"Processing new user: {user_id}")    # Send welcome email, create profile, etc. def handle_order_placed(data):    """Handle order placed event."""    order_id = data.get('order_id')    print(f"Processing new order: {order_id}")    # Process payment, send confirmation, etc. if __name__ == '__main__':    app.run(port=5000, debug=True)

Webhook Client (Sender)

import requestsimport hmacimport hashlibimport json class WebhookClient:    """Client for sending webhooks."""        def __init__(self, webhook_url: str, secret: str):        self.webhook_url = webhook_url        self.secret = secret        def _generate_signature(self, payload: str) -> str:        """Generate HMAC signature."""        return hmac.new(            self.secret.encode(),            payload.encode(),            hashlib.sha256        ).hexdigest()        def send_webhook(self, event_type: str, data: dict) -> bool:        """Send webhook with signature."""        payload = {            'event': event_type,            **data        }                payload_str = json.dumps(payload)        signature = self._generate_signature(payload_str)                headers = {            'Content-Type': 'application/json',            'X-Webhook-Signature': signature        }                try:            response = requests.post(                self.webhook_url,                data=payload_str,                headers=headers,                timeout=10            )                        response.raise_for_status()            print(f"Webhook sent successfully: {event_type}")            return True                    except requests.exceptions.RequestException as e:            print(f"Failed to send webhook: {e}")            return False # Usageclient = WebhookClient(    webhook_url='http://localhost:5000/webhook',    secret='your_webhook_secret_here') # Send user created eventclient.send_webhook('user.created', {    'user_id': 123,    'username': 'john_doe',    'email': '[email protected]'}) # Send order placed eventclient.send_webhook('order.placed', {    'order_id': '12345',    'total': 99.99,    'items': 3})

Webhook Retry Logic

import requestsimport timefrom typing import Dict, Any class ReliableWebhookClient:    """Webhook client with retry logic."""        def __init__(self, webhook_url: str, max_retries: int = 3):        self.webhook_url = webhook_url        self.max_retries = max_retries        def send_webhook(self, data: Dict[str, Any]) -> bool:        """Send webhook with retry logic."""        attempt = 0        delay = 1                while attempt < self.max_retries:            try:                response = requests.post(                    self.webhook_url,                    json=data,                    timeout=10                )                                response.raise_for_status()                print(f"Webhook sent successfully on attempt {attempt + 1}")                return True                            except requests.exceptions.RequestException as e:                attempt += 1                                if attempt < self.max_retries:                    print(f"Attempt {attempt} failed: {e}")                    print(f"Retrying in {delay}s...")                    time.sleep(delay)                    delay *= 2  # Exponential backoff                else:                    print(f"Failed to send webhook after {self.max_retries} attempts")                    return False # Usageclient = ReliableWebhookClient('http://localhost:5000/webhook') client.send_webhook({    'event': 'payment.completed',    'payment_id': 'pay_123',    'amount': 100.00})

API Testing

Testing with responses Library

import requestsimport responsesimport pytest # Install: pip install responses pytest class GitHubClient:    """GitHub API client for testing."""        BASE_URL = 'https://api.github.com'        def __init__(self, token: str):        self.token = token        self.session = requests.Session()        self.session.headers['Authorization'] = f'token {token}'        def get_user(self, username: str):        """Get user information."""        response = self.session.get(f'{self.BASE_URL}/users/{username}')        response.raise_for_status()        return response.json()        def create_repo(self, name: str, private: bool = False):        """Create repository."""        response = self.session.post(            f'{self.BASE_URL}/user/repos',            json={'name': name, 'private': private}        )        response.raise_for_status()        return response.json() # Tests@responses.activatedef test_get_user():    """Test get user."""    # Mock response    responses.add(        responses.GET,        'https://api.github.com/users/octocat',        json={            'login': 'octocat',            'id': 1,            'name': 'The Octocat',            'followers': 1000        },        status=200    )        # Test    client = GitHubClient(token='test_token')    user = client.get_user('octocat')        assert user['login'] == 'octocat'    assert user['name'] == 'The Octocat'    assert user['followers'] == 1000 @responses.activatedef test_get_user_not_found():    """Test get user not found."""    # Mock 404 response    responses.add(        responses.GET,        'https://api.github.com/users/nonexistent',        status=404    )        # Test    client = GitHubClient(token='test_token')        with pytest.raises(requests.exceptions.HTTPError):        client.get_user('nonexistent') @responses.activatedef test_create_repo():    """Test create repository."""    # Mock response    responses.add(        responses.POST,        'https://api.github.com/user/repos',        json={            'id': 123,            'name': 'my-repo',            'private': False,            'html_url': 'https://github.com/user/my-repo'        },        status=201    )        # Test    client = GitHubClient(token='test_token')    repo = client.create_repo('my-repo')        assert repo['name'] == 'my-repo'    assert repo['private'] is False    assert 'html_url' in repo # Run testsif __name__ == '__main__':    pytest.main([__file__, '-v'])

Testing with unittest.mock

import unittestfrom unittest.mock import patch, Mockimport requests class APIClient:    """API client for testing."""        def __init__(self, base_url: str):        self.base_url = base_url        def get_data(self, endpoint: str):        """Get data from API."""        response = requests.get(f"{self.base_url}/{endpoint}")        response.raise_for_status()        return response.json() class TestAPIClient(unittest.TestCase):    """Test API client."""        @patch('requests.get')    def test_get_data_success(self, mock_get):        """Test successful data retrieval."""        # Configure mock        mock_response = Mock()        mock_response.status_code = 200        mock_response.json.return_value = {'id': 1, 'name': 'Test'}        mock_get.return_value = mock_response                # Test        client = APIClient('https://api.example.com')        data = client.get_data('users/1')                # Assertions        self.assertEqual(data['id'], 1)        self.assertEqual(data['name'], 'Test')        mock_get.assert_called_once_with('https://api.example.com/users/1')        @patch('requests.get')    def test_get_data_error(self, mock_get):        """Test error handling."""        # Configure mock to raise exception        mock_get.side_effect = requests.exceptions.ConnectionError()                # Test        client = APIClient('https://api.example.com')                with self.assertRaises(requests.exceptions.ConnectionError):            client.get_data('users/1') if __name__ == '__main__':    unittest.main()

2 Ứng Dụng Thực Tế

1. API Gateway với Load Balancing

import requestsimport randomfrom typing import List, Dict, Anyfrom datetime import datetime, timedelta class APIGateway:    """API Gateway with load balancing and health checks."""        def __init__(self, backends: List[str]):        self.backends = backends        self.backend_health = {url: True for url in backends}        self.backend_stats = {            url: {'requests': 0, 'errors': 0, 'avg_response_time': 0}            for url in backends        }        self.session = requests.Session()        def check_health(self, backend: str) -> bool:        """Check backend health."""        try:            response = self.session.get(                f"{backend}/health",                timeout=2            )            return response.status_code == 200        except:            return False        def update_health_status(self):        """Update health status for all backends."""        for backend in self.backends:            self.backend_health[backend] = self.check_health(backend)        def get_healthy_backends(self) -> List[str]:        """Get list of healthy backends."""        return [            backend for backend in self.backends            if self.backend_health[backend]        ]        def select_backend(self) -> str:        """Select backend using weighted round-robin."""        healthy = self.get_healthy_backends()                if not healthy:            raise Exception("No healthy backends available")                # Weight by inverse of error rate        weights = []        for backend in healthy:            stats = self.backend_stats[backend]            error_rate = stats['errors'] / max(stats['requests'], 1)            weight = 1.0 / (error_rate + 0.1)  # Avoid division by zero            weights.append(weight)                # Weighted random selection        return random.choices(healthy, weights=weights)[0]        def request(self, method: str, endpoint: str, **kwargs) -> requests.Response:        """Route request to backend."""        backend = self.select_backend()        url = f"{backend}/{endpoint.lstrip('/')}"                start_time = datetime.now()                try:            response = self.session.request(method, url, **kwargs)            response.raise_for_status()                        # Update stats            elapsed = (datetime.now() - start_time).total_seconds()            stats = self.backend_stats[backend]            stats['requests'] += 1            stats['avg_response_time'] = (                (stats['avg_response_time'] * (stats['requests'] - 1) + elapsed)                / stats['requests']            )                        return response                    except requests.exceptions.RequestException as e:            # Update error stats            self.backend_stats[backend]['errors'] += 1            self.backend_stats[backend]['requests'] += 1                        # Mark backend as unhealthy if too many errors            stats = self.backend_stats[backend]            error_rate = stats['errors'] / stats['requests']                        if error_rate > 0.5:                self.backend_health[backend] = False                        raise        def get_stats(self) -> Dict[str, Any]:        """Get gateway statistics."""        return {            'backends': self.backends,            'health': self.backend_health,            'stats': self.backend_stats        } # Usagegateway = APIGateway([    'http://backend1.example.com',    'http://backend2.example.com',    'http://backend3.example.com']) # Check healthgateway.update_health_status() # Route requeststry:    response = gateway.request('GET', '/api/users')    print(response.json())except Exception as e:    print(f"Request failed: {e}") # View statsprint(gateway.get_stats())

2. API Monitoring và Analytics

import requestsimport timefrom typing import Dict, List, Anyfrom datetime import datetimefrom collections import defaultdict class APIMonitor:    """Monitor API performance and track analytics."""        def __init__(self):        self.requests_log = []        self.endpoint_stats = defaultdict(lambda: {            'count': 0,            'errors': 0,            'total_time': 0,            'min_time': float('inf'),            'max_time': 0        })        def track_request(self, endpoint: str, method: str, status_code: int,                     response_time: float, error: bool = False):        """Track request metrics."""        # Log request        self.requests_log.append({            'endpoint': endpoint,            'method': method,            'status_code': status_code,            'response_time': response_time,            'error': error,            'timestamp': datetime.now()        })                # Update stats        key = f"{method} {endpoint}"        stats = self.endpoint_stats[key]                stats['count'] += 1        if error:            stats['errors'] += 1                stats['total_time'] += response_time        stats['min_time'] = min(stats['min_time'], response_time)        stats['max_time'] = max(stats['max_time'], response_time)        def get_stats(self) -> Dict[str, Any]:        """Get comprehensive statistics."""        total_requests = len(self.requests_log)        total_errors = sum(1 for r in self.requests_log if r['error'])                endpoint_summary = {}        for key, stats in self.endpoint_stats.items():            avg_time = stats['total_time'] / stats['count']            error_rate = (stats['errors'] / stats['count']) * 100                        endpoint_summary[key] = {                'requests': stats['count'],                'errors': stats['errors'],                'error_rate': f"{error_rate:.1f}%",                'avg_response_time': f"{avg_time:.3f}s",                'min_response_time': f"{stats['min_time']:.3f}s",                'max_response_time': f"{stats['max_time']:.3f}s"            }                return {            'total_requests': total_requests,            'total_errors': total_errors,            'error_rate': f"{(total_errors / total_requests * 100):.1f}%" if total_requests > 0 else "0%",            'endpoints': endpoint_summary        }        def get_slow_requests(self, threshold: float = 1.0) -> List[Dict]:        """Get requests slower than threshold."""        return [            r for r in self.requests_log            if r['response_time'] > threshold        ]        def print_report(self):        """Print monitoring report."""        stats = self.get_stats()                print("\n" + "="*60)        print("API MONITORING REPORT")        print("="*60)                print(f"\nOverall Statistics:")        print(f"  Total Requests: {stats['total_requests']}")        print(f"  Total Errors: {stats['total_errors']}")        print(f"  Error Rate: {stats['error_rate']}")                print(f"\nEndpoint Statistics:")        for endpoint, data in stats['endpoints'].items():            print(f"\n  {endpoint}:")            print(f"    Requests: {data['requests']}")            print(f"    Errors: {data['errors']} ({data['error_rate']})")            print(f"    Avg Response: {data['avg_response_time']}")            print(f"    Min Response: {data['min_response_time']}")            print(f"    Max Response: {data['max_response_time']}")                # Slow requests        slow = self.get_slow_requests()        if slow:            print(f"\nSlow Requests (>1s): {len(slow)}")            for req in slow[:5]:                print(f"  - {req['method']} {req['endpoint']}: {req['response_time']:.3f}s") class MonitoredAPIClient:    """API client with monitoring."""        def __init__(self, base_url: str, monitor: APIMonitor):        self.base_url = base_url.rstrip('/')        self.monitor = monitor        self.session = requests.Session()        def request(self, method: str, endpoint: str, **kwargs):        """Make monitored request."""        url = f"{self.base_url}/{endpoint.lstrip('/')}"                start_time = time.time()        error = False        status_code = None                try:            response = self.session.request(method, url, **kwargs)            status_code = response.status_code            response.raise_for_status()            return response                    except requests.exceptions.RequestException as e:            error = True            if hasattr(e, 'response') and e.response:                status_code = e.response.status_code            else:                status_code = 0            raise                    finally:            response_time = time.time() - start_time            self.monitor.track_request(                endpoint=endpoint,                method=method,                status_code=status_code or 0,                response_time=response_time,                error=error            )        def get(self, endpoint: str, **kwargs):        """Monitored GET request."""        return self.request('GET', endpoint, **kwargs)        def post(self, endpoint: str, **kwargs):        """Monitored POST request."""        return self.request('POST', endpoint, **kwargs) # Usagemonitor = APIMonitor()client = MonitoredAPIClient('https://jsonplaceholder.typicode.com', monitor) # Make requestsprint("Making API requests...")for i in range(1, 11):    try:        response = client.get(f'/users/{i}')        print(f"✓ User {i}: {response.json()['name']}")    except Exception as e:        print(f"✗ User {i}: Error") # Print monitoring reportmonitor.print_report()

Best Practices Summary

1. Connection Pooling

# ✅ Use session for connection reusesession = requests.Session()for i in range(100):    session.get('https://api.example.com/data')

2. Timeout Strategy

# ✅ Always set timeoutresponse = requests.get(    'https://api.example.com/data',    timeout=(3, 10)  # (connect, read))

3. Error Handling

# ✅ Handle specific exceptionstry:    response = requests.get(url, timeout=5)    response.raise_for_status()except requests.exceptions.Timeout:    # Handle timeout    passexcept requests.exceptions.HTTPError as e:    # Handle HTTP error    print(f"Status: {e.response.status_code}")

4. Rate Limiting

# ✅ Implement rate limitingimport timefrom datetime import datetime, timedelta last_request = Nonemin_interval = 1.0  # 1 second between requests if last_request:    elapsed = (datetime.now() - last_request).total_seconds()    if elapsed < min_interval:        time.sleep(min_interval - elapsed) response = requests.get(url)last_request = datetime.now()

5. Retry with Backoff

# ✅ Exponential backoffimport time max_retries = 3delay = 1 for attempt in range(max_retries):    try:        response = requests.get(url, timeout=5)        response.raise_for_status()        break    except requests.exceptions.RequestException:        if attempt < max_retries - 1:            time.sleep(delay)            delay *= 2  # Exponential backoff

Bài Tập Thực Hành

Bài 1: Async API Client

Tạo async client fetch data từ multiple endpoints đồng thời.

Bài 2: Webhook Server

Implement webhook server với signature verification và event processing.

Bài 3: API Testing Suite

Viết test suite cho API client sử dụng responses library.

Bài 4: API Gateway

Implement API gateway với load balancing và health checks.

Bài 5: Monitoring Dashboard

Tạo monitoring system track API performance và errors.

Tóm Tắt

Trong Part 2 chúng ta đã học:

  1. Sessions Management - Advanced configuration, authentication
  2. Async Requests - aiohttp, concurrent requests, rate limiting
  3. Webhooks - Server, client, signature verification, retry logic
  4. API Testing - responses library, unittest.mock
  5. Real Applications - API Gateway, monitoring system
  6. Best Practices - Connection pooling, timeouts, error handling, rate limiting

Bạn đã hoàn thành Module 02: Python Advanced! 🎉

Những kiến thức từ module này sẽ giúp bạn:

  • ✅ Viết Python code chuyên nghiệp
  • ✅ Xử lý async operations
  • ✅ Làm việc với APIs
  • ✅ Testing và debugging
  • ✅ Performance optimization
  • ✅ Best practices và design patterns

Chúc mừng bạn đã hoàn thành khóa học Python Advanced! 🚀🎓


Bài tiếp theo: Bài 21: Practices & Projects 🚀