Bài 21: Python Advanced - Practices & Projects (Phần 2)
Tiếp tục Project 1: Async Web Scraper
Bước 7: Database Models
# database/models.pyfrom sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, JSONfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerfrom datetime import datetimefrom typing import Optional Base = declarative_base() class ScrapedData(Base): """Model for scraped data.""" __tablename__ = 'scraped_data' id = Column(Integer, primary_key=True) url = Column(String(500), nullable=False, index=True) title = Column(String(200)) content = Column(Text) data = Column(JSON) # Store parsed data as JSON scraped_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) def __repr__(self): return f"<ScrapedData(id={self.id}, url='{self.url}')>" class Quote(Base): """Model for quotes.""" __tablename__ = 'quotes' id = Column(Integer, primary_key=True) text = Column(Text, nullable=False) author = Column(String(100), nullable=False, index=True) author_url = Column(String(500)) tags = Column(JSON) # Store tags as JSON array scraped_at = Column(DateTime, default=datetime.now) def __repr__(self): return f"<Quote(author='{self.author}')>" class Author(Base): """Model for authors.""" __tablename__ = 'authors' id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False, unique=True, index=True) born_date = Column(String(50)) born_location = Column(String(200)) description = Column(Text) scraped_at = Column(DateTime, default=datetime.now) def __repr__(self): return f"<Author(name='{self.name}')>" class DatabaseManager: """Database connection manager.""" def __init__(self, database_url: str = 'sqlite:///scraper.db'): """ Initialize database manager. Args: database_url: SQLAlchemy database URL """ self.engine = create_engine(database_url, echo=False) self.SessionLocal = sessionmaker(bind=self.engine) # Create tables Base.metadata.create_all(self.engine) def get_session(self): """Get database session.""" return self.SessionLocal() def __enter__(self): """Context manager entry.""" self.session = self.get_session() return self.session def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" if exc_type is None: self.session.commit() else: self.session.rollback() self.session.close()
Bước 8: Repository Pattern
# database/repository.pyfrom typing import List, Optional, Dict, Anyfrom sqlalchemy.orm import Sessionfrom sqlalchemy import funcfrom datetime import datetime, timedeltaimport logging from .models import Quote, Author, ScrapedData logger = logging.getLogger(__name__) class QuoteRepository: """Repository for Quote operations.""" def __init__(self, session: Session): self.session = session def create(self, text: str, author: str, tags: List[str], author_url: Optional[str] = None) -> Quote: """Create new quote.""" quote = Quote( text=text, author=author, author_url=author_url, tags=tags ) self.session.add(quote) self.session.flush() logger.info(f"Created quote: {quote.id}") return quote def get_by_id(self, quote_id: int) -> Optional[Quote]: """Get quote by ID.""" return self.session.query(Quote).filter(Quote.id == quote_id).first() def get_by_author(self, author: str) -> List[Quote]: """Get all quotes by author.""" return self.session.query(Quote).filter( Quote.author == author ).all() def search(self, query: str) -> List[Quote]: """Search quotes by text.""" return self.session.query(Quote).filter( Quote.text.contains(query) ).all() def get_by_tag(self, tag: str) -> List[Quote]: """Get quotes by tag.""" return self.session.query(Quote).filter( Quote.tags.contains([tag]) ).all() def get_all(self, limit: int = 100, offset: int = 0) -> List[Quote]: """Get all quotes with pagination.""" return self.session.query(Quote).offset(offset).limit(limit).all() def count(self) -> int: """Count total quotes.""" return self.session.query(func.count(Quote.id)).scalar() def get_statistics(self) -> Dict[str, Any]: """Get quote statistics.""" total = self.count() # Count by author author_counts = self.session.query( Quote.author, func.count(Quote.id) ).group_by(Quote.author).all() # Most quoted authors top_authors = sorted(author_counts, key=lambda x: x[1], reverse=True)[:5] return { 'total_quotes': total, 'unique_authors': len(author_counts), 'top_authors': [{'author': a, 'count': c} for a, c in top_authors] } class AuthorRepository: """Repository for Author operations.""" def __init__(self, session: Session): self.session = session def create(self, name: str, born_date: Optional[str] = None, born_location: Optional[str] = None, description: Optional[str] = None) -> Author: """Create new author.""" author = Author( name=name, born_date=born_date, born_location=born_location, description=description ) self.session.add(author) self.session.flush() logger.info(f"Created author: {author.name}") return author def get_by_name(self, name: str) -> Optional[Author]: """Get author by name.""" return self.session.query(Author).filter(Author.name == name).first() def get_or_create(self, name: str, **kwargs) -> tuple[Author, bool]: """Get existing author or create new one.""" author = self.get_by_name(name) if author: return author, False author = self.create(name, **kwargs) return author, True def update(self, name: str, **kwargs) -> Optional[Author]: """Update author information.""" author = self.get_by_name(name) if not author: return None for key, value in kwargs.items(): if hasattr(author, key) and value is not None: setattr(author, key, value) self.session.flush() logger.info(f"Updated author: {name}") return author def get_all(self) -> List[Author]: """Get all authors.""" return self.session.query(Author).all()
Bước 9: Data Validation với Pydantic
# core/validator.pyfrom pydantic import BaseModel, Field, validator, HttpUrlfrom typing import List, Optionalfrom datetime import datetime class QuoteSchema(BaseModel): """Schema for quote validation.""" text: str = Field(..., min_length=1, max_length=1000) author: str = Field(..., min_length=1, max_length=100) author_url: Optional[str] = None tags: List[str] = Field(default_factory=list) @validator('text') def validate_text(cls, v): """Validate quote text.""" if not v.strip(): raise ValueError("Quote text cannot be empty") return v.strip() @validator('tags') def validate_tags(cls, v): """Validate tags.""" return [tag.strip().lower() for tag in v if tag.strip()] class Config: json_schema_extra = { "example": { "text": "The world is a book...", "author": "Saint Augustine", "tags": ["world", "travel"] } } class AuthorSchema(BaseModel): """Schema for author validation.""" name: str = Field(..., min_length=1, max_length=100) born_date: Optional[str] = Field(None, max_length=50) born_location: Optional[str] = Field(None, max_length=200) description: Optional[str] = None @validator('name') def validate_name(cls, v): """Validate author name.""" if not v.strip(): raise ValueError("Author name cannot be empty") return v.strip() class Config: json_schema_extra = { "example": { "name": "Albert Einstein", "born_date": "March 14, 1879", "born_location": "Ulm, Germany" } } class ScraperConfigSchema(BaseModel): """Schema for scraper configuration.""" max_concurrent: int = Field(5, ge=1, le=20) rate_limit: int = Field(10, ge=1, le=100) rate_per_seconds: float = Field(1.0, gt=0) timeout: int = Field(30, ge=5, le=120) max_retries: int = Field(3, ge=1, le=10) cache_ttl: int = Field(3600, ge=0) class Config: json_schema_extra = { "example": { "max_concurrent": 5, "rate_limit": 10, "timeout": 30 } } def validate_quote_data(data: dict) -> QuoteSchema: """Validate quote data.""" try: return QuoteSchema(**data) except Exception as e: logger.error(f"Validation error: {e}") raise def validate_author_data(data: dict) -> AuthorSchema: """Validate author data.""" try: return AuthorSchema(**data) except Exception as e: logger.error(f"Validation error: {e}") raise
Bước 10: Main Application
# main.pyimport asyncioimport loggingfrom typing import List, Dictfrom pathlib import Path from core.scraper import AsyncWebScraperfrom core.parser import QuoteParserfrom core.validator import validate_quote_data, validate_author_datafrom database.models import DatabaseManagerfrom database.repository import QuoteRepository, AuthorRepositoryfrom utils.logger import setup_loggingfrom utils.cache import FileCachefrom utils.decorators import async_timing logger = logging.getLogger(__name__) class QuoteScraperApp: """Main application for quote scraping.""" def __init__(self, db_url: str = 'sqlite:///quotes.db', cache_dir: str = '.cache'): """ Initialize application. Args: db_url: Database URL cache_dir: Cache directory """ self.db_manager = DatabaseManager(db_url) self.cache = FileCache(cache_dir) self.parser = QuoteParser() self.base_url = 'https://quotes.toscrape.com' @async_timing async def scrape_quotes(self, max_pages: int = 10): """ Scrape quotes from multiple pages. Args: max_pages: Maximum number of pages to scrape """ urls = [f'{self.base_url}/page/{i}/' for i in range(1, max_pages + 1)] logger.info(f"Starting scraper for {len(urls)} pages...") async with AsyncWebScraper(max_concurrent=3, rate_limit=5) as scraper: # Fetch all pages results = await scraper.fetch_many(urls) # Parse and store quotes total_quotes = 0 total_authors = 0 with self.db_manager as session: quote_repo = QuoteRepository(session) author_repo = AuthorRepository(session) for html in results: if html: quotes = self.parser.parse_quotes(html) for quote_data in quotes: try: # Validate data validated = validate_quote_data(quote_data) # Create/get author author, created = author_repo.get_or_create( validated.author ) if created: total_authors += 1 # Create quote quote_repo.create( text=validated.text, author=validated.author, tags=validated.tags, author_url=validated.author_url ) total_quotes += 1 except Exception as e: logger.error(f"Error processing quote: {e}") # Print statistics stats = scraper.get_stats() logger.info(f"Scraping completed!") logger.info(f"Total quotes: {total_quotes}") logger.info(f"New authors: {total_authors}") logger.info(f"Success rate: {stats['success_rate']:.1f}%") logger.info(f"Avg response time: {stats['avg_response_time']:.2f}s") @async_timing async def scrape_author_details(self, author_name: str): """ Scrape detailed author information. Args: author_name: Author name to scrape """ with self.db_manager as session: author_repo = AuthorRepository(session) author = author_repo.get_by_name(author_name) if not author or not author.description: # Scrape author page author_url = f"{self.base_url}/author/{author_name.replace(' ', '-')}" async with AsyncWebScraper() as scraper: html = await scraper.fetch(author_url) if html: author_data = self.parser.parse_author(html) if author_data: # Validate and update validated = validate_author_data(author_data) author_repo.update( author_name, born_date=validated.born_date, born_location=validated.born_location, description=validated.description ) logger.info(f"Updated author: {author_name}") def get_statistics(self) -> Dict: """Get application statistics.""" with self.db_manager as session: quote_repo = QuoteRepository(session) return quote_repo.get_statistics() def search_quotes(self, query: str) -> List: """Search quotes.""" with self.db_manager as session: quote_repo = QuoteRepository(session) return quote_repo.search(query) def export_to_json(self, filename: str = 'quotes_export.json'): """Export quotes to JSON file.""" import json with self.db_manager as session: quote_repo = QuoteRepository(session) quotes = quote_repo.get_all(limit=1000) data = [ { 'text': q.text, 'author': q.author, 'tags': q.tags, 'scraped_at': q.scraped_at.isoformat() } for q in quotes ] with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) logger.info(f"Exported {len(data)} quotes to {filename}") async def main(): """Main entry point.""" # Setup logging setup_logging('INFO', 'logs/scraper.log') # Create app app = QuoteScraperApp() # Scrape quotes await app.scrape_quotes(max_pages=5) # Get statistics stats = app.get_statistics() print("\n" + "="*60) print("STATISTICS") print("="*60) print(f"Total quotes: {stats['total_quotes']}") print(f"Unique authors: {stats['unique_authors']}") print("\nTop Authors:") for author_data in stats['top_authors']: print(f" - {author_data['author']}: {author_data['count']} quotes") # Export data app.export_to_json('quotes_export.json') print("\n✅ Done!") if __name__ == "__main__": asyncio.run(main())
Bước 11: CLI Interface
# cli.pyimport asyncioimport clickfrom pathlib import Path from main import QuoteScraperAppfrom utils.logger import setup_logging @click.group()@click.option('--db', default='sqlite:///quotes.db', help='Database URL')@click.option('--log-level', default='INFO', help='Log level')@click.pass_contextdef cli(ctx, db, log_level): """Quote Scraper CLI.""" setup_logging(log_level) ctx.obj = QuoteScraperApp(db_url=db) @cli.command()@click.option('--pages', default=10, help='Number of pages to scrape')@click.pass_objdef scrape(app, pages): """Scrape quotes from website.""" click.echo(f"Scraping {pages} pages...") asyncio.run(app.scrape_quotes(max_pages=pages)) click.echo("✅ Scraping completed!") @cli.command()@click.argument('query')@click.pass_objdef search(app, query): """Search quotes.""" quotes = app.search_quotes(query) click.echo(f"\nFound {len(quotes)} quotes:") for quote in quotes[:10]: click.echo(f"\n'{quote.text}'") click.echo(f" — {quote.author}") click.echo(f" Tags: {', '.join(quote.tags)}") @cli.command()@click.pass_objdef stats(app): """Show statistics.""" stats = app.get_statistics() click.echo("\n" + "="*60) click.echo("STATISTICS") click.echo("="*60) click.echo(f"Total quotes: {stats['total_quotes']}") click.echo(f"Unique authors: {stats['unique_authors']}") click.echo("\nTop Authors:") for author_data in stats['top_authors']: click.echo(f" - {author_data['author']}: {author_data['count']} quotes") @cli.command()@click.option('--output', default='quotes_export.json', help='Output filename')@click.pass_objdef export(app, output): """Export quotes to JSON.""" app.export_to_json(output) click.echo(f"✅ Exported to {output}") if __name__ == '__main__': cli()
Bước 12: Testing Suite
# tests/test_scraper.pyimport pytestimport asynciofrom unittest.mock import Mock, patch, AsyncMock from core.scraper import AsyncWebScraper, RateLimiter @pytest.fixturedef rate_limiter(): """Create rate limiter instance.""" return RateLimiter(rate=5, per=1.0) @pytest.mark.asyncioasync def test_rate_limiter_acquire(rate_limiter): """Test rate limiter acquire.""" # Should allow first request immediately await rate_limiter.acquire() assert rate_limiter.allowance < 5 @pytest.mark.asyncioasync def test_scraper_initialization(): """Test scraper initialization.""" scraper = AsyncWebScraper(max_concurrent=3, rate_limit=5) assert scraper.max_concurrent == 3 assert scraper.rate_limiter.rate == 5 @pytest.mark.asyncioasync def test_scraper_fetch_success(): """Test successful fetch.""" async with AsyncWebScraper() as scraper: # Mock response with patch('aiohttp.ClientSession.get') as mock_get: mock_response = AsyncMock() mock_response.status = 200 mock_response.text = AsyncMock(return_value='<html>Test</html>') mock_response.__aenter__.return_value = mock_response mock_get.return_value = mock_response result = await scraper.fetch('http://example.com') assert result == '<html>Test</html>' assert scraper.stats['successful'] == 1 @pytest.mark.asyncioasync def test_scraper_fetch_retry(): """Test fetch with retry.""" async with AsyncWebScraper() as scraper: with patch('aiohttp.ClientSession.get') as mock_get: # Simulate failure then success mock_response_fail = AsyncMock() mock_response_fail.raise_for_status.side_effect = Exception("Error") mock_response_success = AsyncMock() mock_response_success.status = 200 mock_response_success.text = AsyncMock(return_value='Success') mock_get.side_effect = [ mock_response_fail, mock_response_success ] # Should retry and succeed result = await scraper.fetch('http://example.com', max_retries=2) assert result is not None @pytest.mark.asyncioasync def test_scraper_fetch_many(): """Test fetching multiple URLs.""" urls = ['http://example.com/1', 'http://example.com/2'] async with AsyncWebScraper() as scraper: with patch.object(scraper, 'fetch', return_value='<html>Test</html>'): results = await scraper.fetch_many(urls) assert len(results) == 2 assert all(r == '<html>Test</html>' for r in results) def test_scraper_stats(): """Test statistics calculation.""" scraper = AsyncWebScraper() scraper.stats = { 'total_requests': 10, 'successful': 8, 'failed': 2, 'total_time': 20.0 } stats = scraper.get_stats() assert stats['success_rate'] == 80.0 assert stats['avg_response_time'] == 2.5
# tests/test_parser.pyimport pytestfrom core.parser import QuoteParser, DataParser @pytest.fixturedef parser(): """Create parser instance.""" return QuoteParser() @pytest.fixturedef sample_html(): """Sample HTML for testing.""" return ''' <div class="quote"> <span class="text">"Test quote"</span> <span class="author">Test Author</span> <div class="tags"> <a class="tag">tag1</a> <a class="tag">tag2</a> </div> </div> ''' def test_parse_quotes(parser, sample_html): """Test quote parsing.""" quotes = parser.parse_quotes(sample_html) assert len(quotes) == 1 assert quotes[0]['text'] == '"Test quote"' assert quotes[0]['author'] == 'Test Author' assert 'tag1' in quotes[0]['tags'] def test_extract_email(): """Test email extraction.""" text = "Contact me at [email protected] for more info" email = DataParser.extract_email(text) assert email == '[email protected]' def test_extract_phone(): """Test phone extraction.""" text = "Call us at (123) 456-7890" phone = DataParser.extract_phone(text) assert phone == '(123) 456-7890' def test_extract_price(): """Test price extraction.""" text = "Price: $1,234.56" price = DataParser.extract_price(text) assert price == 1234.56 def test_extract_date(): """Test date extraction.""" text = "Published on 2024-01-15" date = DataParser.extract_date(text) assert date == '2024-01-15'
# tests/test_repository.pyimport pytestfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker from database.models import Base, Quote, Authorfrom database.repository import QuoteRepository, AuthorRepository @pytest.fixturedef session(): """Create test database session.""" engine = create_engine('sqlite:///:memory:') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() @pytest.fixturedef quote_repo(session): """Create quote repository.""" return QuoteRepository(session) @pytest.fixturedef author_repo(session): """Create author repository.""" return AuthorRepository(session) def test_create_quote(quote_repo, session): """Test quote creation.""" quote = quote_repo.create( text="Test quote", author="Test Author", tags=['test', 'python'] ) session.commit() assert quote.id is not None assert quote.text == "Test quote" assert quote.author == "Test Author" assert 'test' in quote.tags def test_get_quote_by_author(quote_repo, session): """Test getting quotes by author.""" quote_repo.create("Quote 1", "Author A", []) quote_repo.create("Quote 2", "Author A", []) quote_repo.create("Quote 3", "Author B", []) session.commit() quotes = quote_repo.get_by_author("Author A") assert len(quotes) == 2 def test_search_quotes(quote_repo, session): """Test quote search.""" quote_repo.create("Python is awesome", "Author", []) quote_repo.create("Java is great", "Author", []) session.commit() results = quote_repo.search("Python") assert len(results) == 1 assert "Python" in results[0].text def test_create_author(author_repo, session): """Test author creation.""" author = author_repo.create( name="Test Author", born_date="2000-01-01", born_location="Test City" ) session.commit() assert author.id is not None assert author.name == "Test Author" def test_get_or_create_author(author_repo, session): """Test get or create author.""" # First call creates author1, created1 = author_repo.get_or_create("New Author") session.commit() assert created1 is True # Second call gets existing author2, created2 = author_repo.get_or_create("New Author") assert created2 is False assert author1.id == author2.id def test_quote_statistics(quote_repo, session): """Test statistics calculation.""" quote_repo.create("Q1", "Author A", []) quote_repo.create("Q2", "Author A", []) quote_repo.create("Q3", "Author B", []) session.commit() stats = quote_repo.get_statistics() assert stats['total_quotes'] == 3 assert stats['unique_authors'] == 2 assert len(stats['top_authors']) > 0
Bước 13: Configuration File
# config.pyfrom pydantic import BaseSettings, Fieldfrom typing import Optional class Settings(BaseSettings): """Application settings.""" # Database DATABASE_URL: str = Field( default='sqlite:///quotes.db', env='DATABASE_URL' ) # Scraper MAX_CONCURRENT: int = Field(default=5, ge=1, le=20) RATE_LIMIT: int = Field(default=10, ge=1, le=100) RATE_PER_SECONDS: float = Field(default=1.0, gt=0) TIMEOUT: int = Field(default=30, ge=5, le=120) MAX_RETRIES: int = Field(default=3, ge=1, le=10) # Cache CACHE_DIR: str = Field(default='.cache') CACHE_TTL: int = Field(default=3600, ge=0) # Logging LOG_LEVEL: str = Field(default='INFO') LOG_FILE: Optional[str] = Field(default='logs/scraper.log') class Config: env_file = '.env' case_sensitive = True # Create settings instancesettings = Settings()
Bước 14: Requirements và Setup
# requirements.txt# Coreaiohttp>=3.9.0beautifulsoup4>=4.12.0sqlalchemy>=2.0.0pydantic>=2.0.0 # CLIclick>=8.1.0 # Testingpytest>=7.4.0pytest-asyncio>=0.21.0pytest-cov>=4.1.0 # Utilspython-dotenv>=1.0.0
# setup.pyfrom setuptools import setup, find_packages setup( name='async-web-scraper', version='1.0.0', packages=find_packages(), install_requires=[ 'aiohttp>=3.9.0', 'beautifulsoup4>=4.12.0', 'sqlalchemy>=2.0.0', 'pydantic>=2.0.0', 'click>=8.1.0', ], entry_points={ 'console_scripts': [ 'scraper=cli:cli', ], }, python_requires='>=3.9',)
Tiếp tục Part 3 sẽ bao gồm:
- Project 2: Real-time Data Processing Pipeline
- Project 3: RESTful API Client Library
- Bài tập tự làm với hướng dẫn chi tiết
Bài tiếp theo: Bài 21.3: More Advanced Projects 🚀