Bài 21: Python Advanced - Practices & Projects (Phần 2)

Tiếp tục Project 1: Async Web Scraper

Bước 7: Database Models

# database/models.pyfrom sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, JSONfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerfrom datetime import datetimefrom typing import Optional Base = declarative_base() class ScrapedData(Base):    """Model for scraped data."""        __tablename__ = 'scraped_data'        id = Column(Integer, primary_key=True)    url = Column(String(500), nullable=False, index=True)    title = Column(String(200))    content = Column(Text)    data = Column(JSON)  # Store parsed data as JSON    scraped_at = Column(DateTime, default=datetime.now)    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)        def __repr__(self):        return f"<ScrapedData(id={self.id}, url='{self.url}')>" class Quote(Base):    """Model for quotes."""        __tablename__ = 'quotes'        id = Column(Integer, primary_key=True)    text = Column(Text, nullable=False)    author = Column(String(100), nullable=False, index=True)    author_url = Column(String(500))    tags = Column(JSON)  # Store tags as JSON array    scraped_at = Column(DateTime, default=datetime.now)        def __repr__(self):        return f"<Quote(author='{self.author}')>" class Author(Base):    """Model for authors."""        __tablename__ = 'authors'        id = Column(Integer, primary_key=True)    name = Column(String(100), nullable=False, unique=True, index=True)    born_date = Column(String(50))    born_location = Column(String(200))    description = Column(Text)    scraped_at = Column(DateTime, default=datetime.now)        def __repr__(self):        return f"<Author(name='{self.name}')>" class DatabaseManager:    """Database connection manager."""        def __init__(self, database_url: str = 'sqlite:///scraper.db'):        """        Initialize database manager.                Args:            database_url: SQLAlchemy database URL        """        self.engine = create_engine(database_url, echo=False)        self.SessionLocal = sessionmaker(bind=self.engine)                # Create tables        Base.metadata.create_all(self.engine)        def get_session(self):        """Get database session."""        return self.SessionLocal()        def __enter__(self):        """Context manager entry."""        self.session = self.get_session()        return self.session        def __exit__(self, exc_type, exc_val, exc_tb):        """Context manager exit."""        if exc_type is None:            self.session.commit()        else:            self.session.rollback()        self.session.close()

Bước 8: Repository Pattern

# database/repository.pyfrom typing import List, Optional, Dict, Anyfrom sqlalchemy.orm import Sessionfrom sqlalchemy import funcfrom datetime import datetime, timedeltaimport logging from .models import Quote, Author, ScrapedData logger = logging.getLogger(__name__) class QuoteRepository:    """Repository for Quote operations."""        def __init__(self, session: Session):        self.session = session        def create(self, text: str, author: str, tags: List[str],                author_url: Optional[str] = None) -> Quote:        """Create new quote."""        quote = Quote(            text=text,            author=author,            author_url=author_url,            tags=tags        )        self.session.add(quote)        self.session.flush()        logger.info(f"Created quote: {quote.id}")        return quote        def get_by_id(self, quote_id: int) -> Optional[Quote]:        """Get quote by ID."""        return self.session.query(Quote).filter(Quote.id == quote_id).first()        def get_by_author(self, author: str) -> List[Quote]:        """Get all quotes by author."""        return self.session.query(Quote).filter(            Quote.author == author        ).all()        def search(self, query: str) -> List[Quote]:        """Search quotes by text."""        return self.session.query(Quote).filter(            Quote.text.contains(query)        ).all()        def get_by_tag(self, tag: str) -> List[Quote]:        """Get quotes by tag."""        return self.session.query(Quote).filter(            Quote.tags.contains([tag])        ).all()        def get_all(self, limit: int = 100, offset: int = 0) -> List[Quote]:        """Get all quotes with pagination."""        return self.session.query(Quote).offset(offset).limit(limit).all()        def count(self) -> int:        """Count total quotes."""        return self.session.query(func.count(Quote.id)).scalar()        def get_statistics(self) -> Dict[str, Any]:        """Get quote statistics."""        total = self.count()                # Count by author        author_counts = self.session.query(            Quote.author, func.count(Quote.id)        ).group_by(Quote.author).all()                # Most quoted authors        top_authors = sorted(author_counts, key=lambda x: x[1], reverse=True)[:5]                return {            'total_quotes': total,            'unique_authors': len(author_counts),            'top_authors': [{'author': a, 'count': c} for a, c in top_authors]        } class AuthorRepository:    """Repository for Author operations."""        def __init__(self, session: Session):        self.session = session        def create(self, name: str, born_date: Optional[str] = None,               born_location: Optional[str] = None,               description: Optional[str] = None) -> Author:        """Create new author."""        author = Author(            name=name,            born_date=born_date,            born_location=born_location,            description=description        )        self.session.add(author)        self.session.flush()        logger.info(f"Created author: {author.name}")        return author        def get_by_name(self, name: str) -> Optional[Author]:        """Get author by name."""        return self.session.query(Author).filter(Author.name == name).first()        def get_or_create(self, name: str, **kwargs) -> tuple[Author, bool]:        """Get existing author or create new one."""        author = self.get_by_name(name)        if author:            return author, False                author = self.create(name, **kwargs)        return author, True        def update(self, name: str, **kwargs) -> Optional[Author]:        """Update author information."""        author = self.get_by_name(name)        if not author:            return None                for key, value in kwargs.items():            if hasattr(author, key) and value is not None:                setattr(author, key, value)                self.session.flush()        logger.info(f"Updated author: {name}")        return author        def get_all(self) -> List[Author]:        """Get all authors."""        return self.session.query(Author).all()

Bước 9: Data Validation với Pydantic

# core/validator.pyfrom pydantic import BaseModel, Field, validator, HttpUrlfrom typing import List, Optionalfrom datetime import datetime class QuoteSchema(BaseModel):    """Schema for quote validation."""        text: str = Field(..., min_length=1, max_length=1000)    author: str = Field(..., min_length=1, max_length=100)    author_url: Optional[str] = None    tags: List[str] = Field(default_factory=list)        @validator('text')    def validate_text(cls, v):        """Validate quote text."""        if not v.strip():            raise ValueError("Quote text cannot be empty")        return v.strip()        @validator('tags')    def validate_tags(cls, v):        """Validate tags."""        return [tag.strip().lower() for tag in v if tag.strip()]        class Config:        json_schema_extra = {            "example": {                "text": "The world is a book...",                "author": "Saint Augustine",                "tags": ["world", "travel"]            }        } class AuthorSchema(BaseModel):    """Schema for author validation."""        name: str = Field(..., min_length=1, max_length=100)    born_date: Optional[str] = Field(None, max_length=50)    born_location: Optional[str] = Field(None, max_length=200)    description: Optional[str] = None        @validator('name')    def validate_name(cls, v):        """Validate author name."""        if not v.strip():            raise ValueError("Author name cannot be empty")        return v.strip()        class Config:        json_schema_extra = {            "example": {                "name": "Albert Einstein",                "born_date": "March 14, 1879",                "born_location": "Ulm, Germany"            }        } class ScraperConfigSchema(BaseModel):    """Schema for scraper configuration."""        max_concurrent: int = Field(5, ge=1, le=20)    rate_limit: int = Field(10, ge=1, le=100)    rate_per_seconds: float = Field(1.0, gt=0)    timeout: int = Field(30, ge=5, le=120)    max_retries: int = Field(3, ge=1, le=10)    cache_ttl: int = Field(3600, ge=0)        class Config:        json_schema_extra = {            "example": {                "max_concurrent": 5,                "rate_limit": 10,                "timeout": 30            }        } def validate_quote_data(data: dict) -> QuoteSchema:    """Validate quote data."""    try:        return QuoteSchema(**data)    except Exception as e:        logger.error(f"Validation error: {e}")        raise def validate_author_data(data: dict) -> AuthorSchema:    """Validate author data."""    try:        return AuthorSchema(**data)    except Exception as e:        logger.error(f"Validation error: {e}")        raise

Bước 10: Main Application

# main.pyimport asyncioimport loggingfrom typing import List, Dictfrom pathlib import Path from core.scraper import AsyncWebScraperfrom core.parser import QuoteParserfrom core.validator import validate_quote_data, validate_author_datafrom database.models import DatabaseManagerfrom database.repository import QuoteRepository, AuthorRepositoryfrom utils.logger import setup_loggingfrom utils.cache import FileCachefrom utils.decorators import async_timing logger = logging.getLogger(__name__) class QuoteScraperApp:    """Main application for quote scraping."""        def __init__(self, db_url: str = 'sqlite:///quotes.db',                 cache_dir: str = '.cache'):        """        Initialize application.                Args:            db_url: Database URL            cache_dir: Cache directory        """        self.db_manager = DatabaseManager(db_url)        self.cache = FileCache(cache_dir)        self.parser = QuoteParser()        self.base_url = 'https://quotes.toscrape.com'        @async_timing    async def scrape_quotes(self, max_pages: int = 10):        """        Scrape quotes from multiple pages.                Args:            max_pages: Maximum number of pages to scrape        """        urls = [f'{self.base_url}/page/{i}/' for i in range(1, max_pages + 1)]                logger.info(f"Starting scraper for {len(urls)} pages...")                async with AsyncWebScraper(max_concurrent=3, rate_limit=5) as scraper:            # Fetch all pages            results = await scraper.fetch_many(urls)                        # Parse and store quotes            total_quotes = 0            total_authors = 0                        with self.db_manager as session:                quote_repo = QuoteRepository(session)                author_repo = AuthorRepository(session)                                for html in results:                    if html:                        quotes = self.parser.parse_quotes(html)                                                for quote_data in quotes:                            try:                                # Validate data                                validated = validate_quote_data(quote_data)                                                                # Create/get author                                author, created = author_repo.get_or_create(                                    validated.author                                )                                if created:                                    total_authors += 1                                                                # Create quote                                quote_repo.create(                                    text=validated.text,                                    author=validated.author,                                    tags=validated.tags,                                    author_url=validated.author_url                                )                                total_quotes += 1                                                            except Exception as e:                                logger.error(f"Error processing quote: {e}")                        # Print statistics            stats = scraper.get_stats()            logger.info(f"Scraping completed!")            logger.info(f"Total quotes: {total_quotes}")            logger.info(f"New authors: {total_authors}")            logger.info(f"Success rate: {stats['success_rate']:.1f}%")            logger.info(f"Avg response time: {stats['avg_response_time']:.2f}s")        @async_timing    async def scrape_author_details(self, author_name: str):        """        Scrape detailed author information.                Args:            author_name: Author name to scrape        """        with self.db_manager as session:            author_repo = AuthorRepository(session)            author = author_repo.get_by_name(author_name)                        if not author or not author.description:                # Scrape author page                author_url = f"{self.base_url}/author/{author_name.replace(' ', '-')}"                                async with AsyncWebScraper() as scraper:                    html = await scraper.fetch(author_url)                                        if html:                        author_data = self.parser.parse_author(html)                                                if author_data:                            # Validate and update                            validated = validate_author_data(author_data)                            author_repo.update(                                author_name,                                born_date=validated.born_date,                                born_location=validated.born_location,                                description=validated.description                            )                            logger.info(f"Updated author: {author_name}")        def get_statistics(self) -> Dict:        """Get application statistics."""        with self.db_manager as session:            quote_repo = QuoteRepository(session)            return quote_repo.get_statistics()        def search_quotes(self, query: str) -> List:        """Search quotes."""        with self.db_manager as session:            quote_repo = QuoteRepository(session)            return quote_repo.search(query)        def export_to_json(self, filename: str = 'quotes_export.json'):        """Export quotes to JSON file."""        import json                with self.db_manager as session:            quote_repo = QuoteRepository(session)            quotes = quote_repo.get_all(limit=1000)                        data = [                {                    'text': q.text,                    'author': q.author,                    'tags': q.tags,                    'scraped_at': q.scraped_at.isoformat()                }                for q in quotes            ]                        with open(filename, 'w', encoding='utf-8') as f:                json.dump(data, f, indent=2, ensure_ascii=False)                        logger.info(f"Exported {len(data)} quotes to {filename}") async def main():    """Main entry point."""    # Setup logging    setup_logging('INFO', 'logs/scraper.log')        # Create app    app = QuoteScraperApp()        # Scrape quotes    await app.scrape_quotes(max_pages=5)        # Get statistics    stats = app.get_statistics()    print("\n" + "="*60)    print("STATISTICS")    print("="*60)    print(f"Total quotes: {stats['total_quotes']}")    print(f"Unique authors: {stats['unique_authors']}")    print("\nTop Authors:")    for author_data in stats['top_authors']:        print(f"  - {author_data['author']}: {author_data['count']} quotes")        # Export data    app.export_to_json('quotes_export.json')        print("\n✅ Done!") if __name__ == "__main__":    asyncio.run(main())

Bước 11: CLI Interface

# cli.pyimport asyncioimport clickfrom pathlib import Path from main import QuoteScraperAppfrom utils.logger import setup_logging @click.group()@click.option('--db', default='sqlite:///quotes.db', help='Database URL')@click.option('--log-level', default='INFO', help='Log level')@click.pass_contextdef cli(ctx, db, log_level):    """Quote Scraper CLI."""    setup_logging(log_level)    ctx.obj = QuoteScraperApp(db_url=db) @cli.command()@click.option('--pages', default=10, help='Number of pages to scrape')@click.pass_objdef scrape(app, pages):    """Scrape quotes from website."""    click.echo(f"Scraping {pages} pages...")    asyncio.run(app.scrape_quotes(max_pages=pages))    click.echo("✅ Scraping completed!") @cli.command()@click.argument('query')@click.pass_objdef search(app, query):    """Search quotes."""    quotes = app.search_quotes(query)        click.echo(f"\nFound {len(quotes)} quotes:")    for quote in quotes[:10]:        click.echo(f"\n'{quote.text}'")        click.echo(f"  — {quote.author}")        click.echo(f"  Tags: {', '.join(quote.tags)}") @cli.command()@click.pass_objdef stats(app):    """Show statistics."""    stats = app.get_statistics()        click.echo("\n" + "="*60)    click.echo("STATISTICS")    click.echo("="*60)    click.echo(f"Total quotes: {stats['total_quotes']}")    click.echo(f"Unique authors: {stats['unique_authors']}")    click.echo("\nTop Authors:")    for author_data in stats['top_authors']:        click.echo(f"  - {author_data['author']}: {author_data['count']} quotes") @cli.command()@click.option('--output', default='quotes_export.json', help='Output filename')@click.pass_objdef export(app, output):    """Export quotes to JSON."""    app.export_to_json(output)    click.echo(f"✅ Exported to {output}") if __name__ == '__main__':    cli()

Bước 12: Testing Suite

# tests/test_scraper.pyimport pytestimport asynciofrom unittest.mock import Mock, patch, AsyncMock from core.scraper import AsyncWebScraper, RateLimiter @pytest.fixturedef rate_limiter():    """Create rate limiter instance."""    return RateLimiter(rate=5, per=1.0) @pytest.mark.asyncioasync def test_rate_limiter_acquire(rate_limiter):    """Test rate limiter acquire."""    # Should allow first request immediately    await rate_limiter.acquire()    assert rate_limiter.allowance < 5 @pytest.mark.asyncioasync def test_scraper_initialization():    """Test scraper initialization."""    scraper = AsyncWebScraper(max_concurrent=3, rate_limit=5)        assert scraper.max_concurrent == 3    assert scraper.rate_limiter.rate == 5 @pytest.mark.asyncioasync def test_scraper_fetch_success():    """Test successful fetch."""    async with AsyncWebScraper() as scraper:        # Mock response        with patch('aiohttp.ClientSession.get') as mock_get:            mock_response = AsyncMock()            mock_response.status = 200            mock_response.text = AsyncMock(return_value='<html>Test</html>')            mock_response.__aenter__.return_value = mock_response            mock_get.return_value = mock_response                        result = await scraper.fetch('http://example.com')                        assert result == '<html>Test</html>'            assert scraper.stats['successful'] == 1 @pytest.mark.asyncioasync def test_scraper_fetch_retry():    """Test fetch with retry."""    async with AsyncWebScraper() as scraper:        with patch('aiohttp.ClientSession.get') as mock_get:            # Simulate failure then success            mock_response_fail = AsyncMock()            mock_response_fail.raise_for_status.side_effect = Exception("Error")                        mock_response_success = AsyncMock()            mock_response_success.status = 200            mock_response_success.text = AsyncMock(return_value='Success')                        mock_get.side_effect = [                mock_response_fail,                mock_response_success            ]                        # Should retry and succeed            result = await scraper.fetch('http://example.com', max_retries=2)            assert result is not None @pytest.mark.asyncioasync def test_scraper_fetch_many():    """Test fetching multiple URLs."""    urls = ['http://example.com/1', 'http://example.com/2']        async with AsyncWebScraper() as scraper:        with patch.object(scraper, 'fetch', return_value='<html>Test</html>'):            results = await scraper.fetch_many(urls)                        assert len(results) == 2            assert all(r == '<html>Test</html>' for r in results) def test_scraper_stats():    """Test statistics calculation."""    scraper = AsyncWebScraper()    scraper.stats = {        'total_requests': 10,        'successful': 8,        'failed': 2,        'total_time': 20.0    }        stats = scraper.get_stats()        assert stats['success_rate'] == 80.0    assert stats['avg_response_time'] == 2.5
# tests/test_parser.pyimport pytestfrom core.parser import QuoteParser, DataParser @pytest.fixturedef parser():    """Create parser instance."""    return QuoteParser() @pytest.fixturedef sample_html():    """Sample HTML for testing."""    return '''    <div class="quote">        <span class="text">"Test quote"</span>        <span class="author">Test Author</span>        <div class="tags">            <a class="tag">tag1</a>            <a class="tag">tag2</a>        </div>    </div>    ''' def test_parse_quotes(parser, sample_html):    """Test quote parsing."""    quotes = parser.parse_quotes(sample_html)        assert len(quotes) == 1    assert quotes[0]['text'] == '"Test quote"'    assert quotes[0]['author'] == 'Test Author'    assert 'tag1' in quotes[0]['tags'] def test_extract_email():    """Test email extraction."""    text = "Contact me at [email protected] for more info"    email = DataParser.extract_email(text)        assert email == '[email protected]' def test_extract_phone():    """Test phone extraction."""    text = "Call us at (123) 456-7890"    phone = DataParser.extract_phone(text)        assert phone == '(123) 456-7890' def test_extract_price():    """Test price extraction."""    text = "Price: $1,234.56"    price = DataParser.extract_price(text)        assert price == 1234.56 def test_extract_date():    """Test date extraction."""    text = "Published on 2024-01-15"    date = DataParser.extract_date(text)        assert date == '2024-01-15'
# tests/test_repository.pyimport pytestfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker from database.models import Base, Quote, Authorfrom database.repository import QuoteRepository, AuthorRepository @pytest.fixturedef session():    """Create test database session."""    engine = create_engine('sqlite:///:memory:')    Base.metadata.create_all(engine)    Session = sessionmaker(bind=engine)    session = Session()        yield session        session.close() @pytest.fixturedef quote_repo(session):    """Create quote repository."""    return QuoteRepository(session) @pytest.fixturedef author_repo(session):    """Create author repository."""    return AuthorRepository(session) def test_create_quote(quote_repo, session):    """Test quote creation."""    quote = quote_repo.create(        text="Test quote",        author="Test Author",        tags=['test', 'python']    )        session.commit()        assert quote.id is not None    assert quote.text == "Test quote"    assert quote.author == "Test Author"    assert 'test' in quote.tags def test_get_quote_by_author(quote_repo, session):    """Test getting quotes by author."""    quote_repo.create("Quote 1", "Author A", [])    quote_repo.create("Quote 2", "Author A", [])    quote_repo.create("Quote 3", "Author B", [])    session.commit()        quotes = quote_repo.get_by_author("Author A")        assert len(quotes) == 2 def test_search_quotes(quote_repo, session):    """Test quote search."""    quote_repo.create("Python is awesome", "Author", [])    quote_repo.create("Java is great", "Author", [])    session.commit()        results = quote_repo.search("Python")        assert len(results) == 1    assert "Python" in results[0].text def test_create_author(author_repo, session):    """Test author creation."""    author = author_repo.create(        name="Test Author",        born_date="2000-01-01",        born_location="Test City"    )        session.commit()        assert author.id is not None    assert author.name == "Test Author" def test_get_or_create_author(author_repo, session):    """Test get or create author."""    # First call creates    author1, created1 = author_repo.get_or_create("New Author")    session.commit()        assert created1 is True        # Second call gets existing    author2, created2 = author_repo.get_or_create("New Author")        assert created2 is False    assert author1.id == author2.id def test_quote_statistics(quote_repo, session):    """Test statistics calculation."""    quote_repo.create("Q1", "Author A", [])    quote_repo.create("Q2", "Author A", [])    quote_repo.create("Q3", "Author B", [])    session.commit()        stats = quote_repo.get_statistics()        assert stats['total_quotes'] == 3    assert stats['unique_authors'] == 2    assert len(stats['top_authors']) > 0

Bước 13: Configuration File

# config.pyfrom pydantic import BaseSettings, Fieldfrom typing import Optional class Settings(BaseSettings):    """Application settings."""        # Database    DATABASE_URL: str = Field(        default='sqlite:///quotes.db',        env='DATABASE_URL'    )        # Scraper    MAX_CONCURRENT: int = Field(default=5, ge=1, le=20)    RATE_LIMIT: int = Field(default=10, ge=1, le=100)    RATE_PER_SECONDS: float = Field(default=1.0, gt=0)    TIMEOUT: int = Field(default=30, ge=5, le=120)    MAX_RETRIES: int = Field(default=3, ge=1, le=10)        # Cache    CACHE_DIR: str = Field(default='.cache')    CACHE_TTL: int = Field(default=3600, ge=0)        # Logging    LOG_LEVEL: str = Field(default='INFO')    LOG_FILE: Optional[str] = Field(default='logs/scraper.log')        class Config:        env_file = '.env'        case_sensitive = True # Create settings instancesettings = Settings()

Bước 14: Requirements và Setup

# requirements.txt# Coreaiohttp>=3.9.0beautifulsoup4>=4.12.0sqlalchemy>=2.0.0pydantic>=2.0.0 # CLIclick>=8.1.0 # Testingpytest>=7.4.0pytest-asyncio>=0.21.0pytest-cov>=4.1.0 # Utilspython-dotenv>=1.0.0
# setup.pyfrom setuptools import setup, find_packages setup(    name='async-web-scraper',    version='1.0.0',    packages=find_packages(),    install_requires=[        'aiohttp>=3.9.0',        'beautifulsoup4>=4.12.0',        'sqlalchemy>=2.0.0',        'pydantic>=2.0.0',        'click>=8.1.0',    ],    entry_points={        'console_scripts': [            'scraper=cli:cli',        ],    },    python_requires='>=3.9',)

Tiếp tục Part 3 sẽ bao gồm:

  • Project 2: Real-time Data Processing Pipeline
  • Project 3: RESTful API Client Library
  • Bài tập tự làm với hướng dẫn chi tiết

Bài tiếp theo: Bài 21.3: More Advanced Projects 🚀