Bài 13: File I/O - Làm Việc Với Files (Phần 2)

Mục Tiêu Bài Học

Sau khi hoàn thành bài này, bạn sẽ:

  • ✅ Làm việc với CSV files
  • ✅ Read/write JSON files
  • ✅ Handle binary files
  • ✅ Sử dụng os module cho file operations
  • ✅ Làm việc với pathlib

CSV Files

CSV (Comma-Separated Values) là format phổ biến cho tabular data.

Reading CSV Files

import csv # Basic CSV readingwith open('data.csv', 'r', encoding='utf-8') as file:    reader = csv.reader(file)        for row in reader:        print(row)  # ['column1', 'column2', 'column3'] # Example CSV file (users.csv):# name,email,age# Alice,[email protected],25# Bob,[email protected],30# Charlie,[email protected],35 # Read with headerwith open('users.csv', 'r', encoding='utf-8') as file:    reader = csv.reader(file)        header = next(reader)  # Skip header    print(f"Columns: {header}")        for row in reader:        name, email, age = row        print(f"{name}: {email} ({age})")

CSV DictReader

import csv # Read as dictionaries (easier to work with)with open('users.csv', 'r', encoding='utf-8') as file:    reader = csv.DictReader(file)        for row in reader:        print(row)        # {'name': 'Alice', 'email': '[email protected]', 'age': '25'}        print(f"Name: {row['name']}")        print(f"Email: {row['email']}")        print(f"Age: {row['age']}") # Process all rowswith open('users.csv', 'r', encoding='utf-8') as file:    reader = csv.DictReader(file)    users = list(reader)        # Filter users    adults = [u for u in users if int(u['age']) >= 18]    print(f"Adults: {len(adults)}")

Writing CSV Files

import csv # Basic CSV writingdata = [    ['name', 'email', 'age'],    ['Alice', '[email protected]', 25],    ['Bob', '[email protected]', 30],    ['Charlie', '[email protected]', 35]] with open('output.csv', 'w', encoding='utf-8', newline='') as file:    writer = csv.writer(file)        for row in data:        writer.writerow(row)        # Or write all at once    # writer.writerows(data) # CSV DictWriterusers = [    {'name': 'Alice', 'email': '[email protected]', 'age': 25},    {'name': 'Bob', 'email': '[email protected]', 'age': 30},    {'name': 'Charlie', 'email': '[email protected]', 'age': 35}] with open('users.csv', 'w', encoding='utf-8', newline='') as file:    fieldnames = ['name', 'email', 'age']    writer = csv.DictWriter(file, fieldnames=fieldnames)        writer.writeheader()  # Write header row    writer.writerows(users)  # Write data rows

CSV with Different Delimiters

import csv # Tab-separated values (TSV)with open('data.tsv', 'r', encoding='utf-8') as file:    reader = csv.reader(file, delimiter='\t')    for row in reader:        print(row) # Semicolon-separated (common in Europe)with open('data.csv', 'r', encoding='utf-8') as file:    reader = csv.reader(file, delimiter=';')    for row in reader:        print(row) # Custom delimiterwith open('data.txt', 'r', encoding='utf-8') as file:    reader = csv.reader(file, delimiter='|')    for row in reader:        print(row) # Writing with custom delimiterwith open('output.csv', 'w', encoding='utf-8', newline='') as file:    writer = csv.writer(file, delimiter=';')    writer.writerow(['col1', 'col2', 'col3'])    writer.writerow(['val1', 'val2', 'val3'])

JSON Files

JSON (JavaScript Object Notation) là format phổ biến cho structured data.

Reading JSON Files

import json # Read JSON filewith open('data.json', 'r', encoding='utf-8') as file:    data = json.load(file)    print(data)    print(type(data))  # dict or list # Example JSON file (user.json):# {#   "name": "Alice",#   "email": "[email protected]",#   "age": 25,#   "hobbies": ["reading", "coding"]# } with open('user.json', 'r', encoding='utf-8') as file:    user = json.load(file)        print(f"Name: {user['name']}")    print(f"Email: {user['email']}")    print(f"Hobbies: {', '.join(user['hobbies'])}") # JSON with array (users.json):# [#   {"name": "Alice", "age": 25},#   {"name": "Bob", "age": 30}# ] with open('users.json', 'r', encoding='utf-8') as file:    users = json.load(file)        for user in users:        print(f"{user['name']}: {user['age']}")

Writing JSON Files

import json # Write JSON fileuser = {    "name": "Alice",    "email": "[email protected]",    "age": 25,    "hobbies": ["reading", "coding"],    "active": True} with open('user.json', 'w', encoding='utf-8') as file:    json.dump(user, file) # Pretty print with indentwith open('user.json', 'w', encoding='utf-8') as file:    json.dump(user, file, indent=2) # Output:# {#   "name": "Alice",#   "email": "[email protected]",#   "age": 25,#   ...# } # Ensure ASCII False for Unicode (Vietnamese)with open('user.json', 'w', encoding='utf-8') as file:    json.dump(user, file, indent=2, ensure_ascii=False) # Write arrayusers = [    {"name": "Alice", "age": 25},    {"name": "Bob", "age": 30},    {"name": "Charlie", "age": 35}] with open('users.json', 'w', encoding='utf-8') as file:    json.dump(users, file, indent=2, ensure_ascii=False)

JSON Strings

import json # Python object → JSON stringuser = {"name": "Alice", "age": 25}json_string = json.dumps(user)print(json_string)  # '{"name": "Alice", "age": 25}' # Pretty printjson_string = json.dumps(user, indent=2)print(json_string)# {#   "name": "Alice",#   "age": 25# } # JSON string → Python objectjson_string = '{"name": "Bob", "age": 30}'user = json.loads(json_string)print(user)  # {'name': 'Bob', 'age': 30}print(type(user))  # <class 'dict'> # Handle nested structuresdata = {    "users": [        {"name": "Alice", "age": 25},        {"name": "Bob", "age": 30}    ],    "total": 2} json_string = json.dumps(data, indent=2)print(json_string)

JSON Error Handling

import json # Handle invalid JSONtry:    with open('data.json', 'r', encoding='utf-8') as file:        data = json.load(file)except FileNotFoundError:    print("File not found")except json.JSONDecodeError as e:    print(f"Invalid JSON: {e}") # Validate JSON stringdef is_valid_json(json_string):    """Check if string is valid JSON."""    try:        json.loads(json_string)        return True    except json.JSONDecodeError:        return False print(is_valid_json('{"name": "Alice"}'))  # Trueprint(is_valid_json('{invalid}'))  # False

Binary Files

Binary files chứa non-text data (images, audio, video, etc.).

Reading Binary Files

# Read binary filewith open('image.jpg', 'rb') as file:    data = file.read()    print(f"File size: {len(data)} bytes")    print(f"First 10 bytes: {data[:10]}") # Read binary in chunks (memory efficient)def read_binary_chunks(filename, chunk_size=1024):    """Read binary file in chunks."""    with open(filename, 'rb') as file:        while True:            chunk = file.read(chunk_size)            if not chunk:                break            yield chunk # Usagefor chunk in read_binary_chunks('large_file.bin'):    process_chunk(chunk)

Writing Binary Files

# Write binary filedata = b'\x00\x01\x02\x03\x04\x05'  # Bytes with open('output.bin', 'wb') as file:    file.write(data) # Copy binary filewith open('source.jpg', 'rb') as src:    with open('destination.jpg', 'wb') as dst:        dst.write(src.read()) # Copy in chunks (better for large files)with open('source.jpg', 'rb') as src:    with open('destination.jpg', 'wb') as dst:        while True:            chunk = src.read(8192)  # 8KB chunks            if not chunk:                break            dst.write(chunk)

Bytes vs Strings

# String to bytestext = "Hello, World!"data = text.encode('utf-8')  # b'Hello, World!'print(type(data))  # <class 'bytes'> # Bytes to stringdata = b'Hello, World!'text = data.decode('utf-8')  # 'Hello, World!'print(type(text))  # <class 'str'> # Vietnamese texttext = "Xin chào Việt Nam"data = text.encode('utf-8')print(data) # Decode backtext = data.decode('utf-8')print(text)  # Xin chào Việt Nam

File System Operations - os Module

File and Directory Info

import os # Check if existsprint(os.path.exists('file.txt'))  # True/False # Check if fileprint(os.path.isfile('file.txt'))  # True/False # Check if directoryprint(os.path.isdir('folder'))  # True/False # Get file sizesize = os.path.getsize('file.txt')print(f"Size: {size} bytes") # Get absolute pathabs_path = os.path.abspath('file.txt')print(abs_path) # Get file name from pathfilename = os.path.basename('/path/to/file.txt')print(filename)  # file.txt # Get directory from pathdirname = os.path.dirname('/path/to/file.txt')print(dirname)  # /path/to # Split pathpath, filename = os.path.split('/path/to/file.txt')print(path)      # /path/toprint(filename)  # file.txt # Split extensionname, ext = os.path.splitext('file.txt')print(name)  # fileprint(ext)   # .txt

Directory Operations

import os # Get current directorycurrent = os.getcwd()print(current) # Change directoryos.chdir('/path/to/directory') # List files and directoriesitems = os.listdir('.')print(items) # List only filesfiles = [f for f in os.listdir('.') if os.path.isfile(f)] # List only directoriesdirs = [d for d in os.listdir('.') if os.path.isdir(d)] # List with full pathitems = [os.path.join('.', item) for item in os.listdir('.')] # Create directoryos.mkdir('new_folder') # Create nested directoriesos.makedirs('path/to/nested/folder', exist_ok=True) # Remove empty directoryos.rmdir('folder') # Remove fileos.remove('file.txt') # Rename/move fileos.rename('old_name.txt', 'new_name.txt')

Path Operations

import os # Join paths (cross-platform)path = os.path.join('folder', 'subfolder', 'file.txt')print(path)# Windows: folder\subfolder\file.txt# Unix: folder/subfolder/file.txt # Build relative pathpath = os.path.join('..', 'data', 'input.txt')print(path)  # ../data/input.txt # Check if path is absoluteprint(os.path.isabs('/absolute/path'))  # Trueprint(os.path.isabs('relative/path'))   # False # Get common pathpath1 = '/home/user/project/file1.txt'path2 = '/home/user/project/file2.txt'common = os.path.commonpath([path1, path2])print(common)  # /home/user/project

Walk Directory Tree

import os # Walk directory recursivelyfor root, dirs, files in os.walk('.'):    print(f"Directory: {root}")    print(f"  Subdirs: {dirs}")    print(f"  Files: {files}") # Find all Python filesdef find_files(directory, extension):    """Find all files with extension."""    result = []    for root, dirs, files in os.walk(directory):        for file in files:            if file.endswith(extension):                result.append(os.path.join(root, file))    return result # Usagepy_files = find_files('.', '.py')for file in py_files:    print(file) # Count files by extensiondef count_by_extension(directory):    """Count files by extension."""    counts = {}    for root, dirs, files in os.walk(directory):        for file in files:            _, ext = os.path.splitext(file)            counts[ext] = counts.get(ext, 0) + 1    return counts # Usagecounts = count_by_extension('.')for ext, count in counts.items():    print(f"{ext}: {count} files")

pathlib Module (Modern Approach)

pathlib là modern, object-oriented approach cho file paths.

Basic pathlib

from pathlib import Path # Create Path objectpath = Path('data/file.txt')print(path) # Current directorycurrent = Path.cwd()print(current) # Home directoryhome = Path.home()print(home) # Join paths with /path = Path('data') / 'subfolder' / 'file.txt'print(path) # Check if existspath = Path('file.txt')print(path.exists()) # Check if file/directoryprint(path.is_file())print(path.is_dir()) # Get partspath = Path('/home/user/data/file.txt')print(path.name)       # file.txtprint(path.stem)       # fileprint(path.suffix)     # .txtprint(path.parent)     # /home/user/dataprint(path.parts)      # ('/', 'home', 'user', 'data', 'file.txt')

File Operations with pathlib

from pathlib import Path # Read filepath = Path('data.txt')content = path.read_text(encoding='utf-8')print(content) # Write filepath = Path('output.txt')path.write_text('Hello, World!', encoding='utf-8') # Read binarypath = Path('image.jpg')data = path.read_bytes() # Write binarypath = Path('output.bin')path.write_bytes(b'\x00\x01\x02') # List directorypath = Path('.')for item in path.iterdir():    print(item) # List only filesfiles = [f for f in path.iterdir() if f.is_file()] # Glob patternspath = Path('.')py_files = list(path.glob('*.py'))all_py_files = list(path.rglob('*.py'))  # Recursive # Create directorypath = Path('new_folder')path.mkdir(exist_ok=True) # Create nested directoriespath = Path('path/to/nested')path.mkdir(parents=True, exist_ok=True) # Delete filepath = Path('file.txt')path.unlink(missing_ok=True) # Delete directorypath = Path('folder')path.rmdir()  # Only if empty

pathlib vs os.path

from pathlib import Pathimport os # os.path approachfilepath = os.path.join('data', 'file.txt')if os.path.exists(filepath):    with open(filepath, 'r') as f:        content = f.read() # pathlib approach (cleaner)filepath = Path('data') / 'file.txt'if filepath.exists():    content = filepath.read_text() # Get file info# os.pathsize = os.path.getsize('file.txt')modified = os.path.getmtime('file.txt') # pathlibpath = Path('file.txt')size = path.stat().st_sizemodified = path.stat().st_mtime

Ví Dụ Thực Tế

1. CSV to JSON Converter

import csvimport json def csv_to_json(csv_file, json_file):    """    Convert CSV to JSON.        Args:        csv_file (str): Input CSV file        json_file (str): Output JSON file        Returns:        bool: True if success    """    try:        # Read CSV        with open(csv_file, 'r', encoding='utf-8') as f:            reader = csv.DictReader(f)            data = list(reader)                # Write JSON        with open(json_file, 'w', encoding='utf-8') as f:            json.dump(data, f, indent=2, ensure_ascii=False)                print(f"Converted {len(data)} rows")        return True        except Exception as e:        print(f"Error: {e}")        return False # Usagecsv_to_json('users.csv', 'users.json')

2. JSON Database Manager

import jsonfrom pathlib import Path class JSONDatabase:    """Simple JSON-based database."""        def __init__(self, filepath):        self.filepath = Path(filepath)        self.data = self._load()        def _load(self):        """Load data from file."""        if self.filepath.exists():            return json.loads(self.filepath.read_text(encoding='utf-8'))        return []        def _save(self):        """Save data to file."""        self.filepath.write_text(            json.dumps(self.data, indent=2, ensure_ascii=False),            encoding='utf-8'        )        def add(self, item):        """Add item to database."""        self.data.append(item)        self._save()        def all(self):        """Get all items."""        return self.data        def find(self, key, value):        """Find items by key-value."""        return [item for item in self.data if item.get(key) == value]        def update(self, key, value, updates):        """Update items matching key-value."""        for item in self.data:            if item.get(key) == value:                item.update(updates)        self._save()        def delete(self, key, value):        """Delete items matching key-value."""        self.data = [item for item in self.data if item.get(key) != value]        self._save() # Usagedb = JSONDatabase('users.json') # Add usersdb.add({"id": 1, "name": "Alice", "age": 25})db.add({"id": 2, "name": "Bob", "age": 30}) # Get allusers = db.all()print(users) # Findalice = db.find("name", "Alice")print(alice) # Updatedb.update("name", "Alice", {"age": 26}) # Deletedb.delete("name", "Bob")

3. File Backup Utility

from pathlib import Pathimport shutilfrom datetime import datetime def backup_file(source, backup_dir='backups'):    """    Create timestamped backup of file.        Args:        source (str): Source file path        backup_dir (str): Backup directory        Returns:        Path: Backup file path or None    """    try:        source = Path(source)        if not source.exists():            print(f"Source file not found: {source}")            return None                # Create backup directory        backup_path = Path(backup_dir)        backup_path.mkdir(exist_ok=True)                # Generate backup filename with timestamp        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')        backup_name = f"{source.stem}_{timestamp}{source.suffix}"        backup_file = backup_path / backup_name                # Copy file        shutil.copy2(source, backup_file)                print(f"Backup created: {backup_file}")        return backup_file        except Exception as e:        print(f"Backup failed: {e}")        return None def cleanup_old_backups(backup_dir='backups', keep_count=5):    """Keep only recent backups."""    backup_path = Path(backup_dir)    if not backup_path.exists():        return        # Get all backup files sorted by modification time    backups = sorted(backup_path.iterdir(), key=lambda p: p.stat().st_mtime)        # Delete old backups    while len(backups) > keep_count:        old = backups.pop(0)        old.unlink()        print(f"Deleted old backup: {old}") # Usagebackup_file('important_data.json')cleanup_old_backups(keep_count=3)

4. Directory Statistics

from pathlib import Path def analyze_directory(directory):    """    Analyze directory contents.        Returns:        dict: Statistics    """    path = Path(directory)    if not path.exists():        return None        stats = {        'total_files': 0,        'total_dirs': 0,        'total_size': 0,        'by_extension': {},        'largest_files': []    }        files_with_size = []        for item in path.rglob('*'):        if item.is_file():            stats['total_files'] += 1            size = item.stat().st_size            stats['total_size'] += size                        # Count by extension            ext = item.suffix or 'no extension'            stats['by_extension'][ext] = stats['by_extension'].get(ext, 0) + 1                        # Track for largest files            files_with_size.append((item, size))                elif item.is_dir():            stats['total_dirs'] += 1        # Get 10 largest files    files_with_size.sort(key=lambda x: x[1], reverse=True)    stats['largest_files'] = [        {'path': str(f), 'size': s}        for f, s in files_with_size[:10]    ]        return stats def format_size(bytes):    """Format bytes to human readable."""    for unit in ['B', 'KB', 'MB', 'GB']:        if bytes < 1024:            return f"{bytes:.2f} {unit}"        bytes /= 1024    return f"{bytes:.2f} TB" # Usagestats = analyze_directory('.')if stats:    print(f"Total files: {stats['total_files']}")    print(f"Total directories: {stats['total_dirs']}")    print(f"Total size: {format_size(stats['total_size'])}")        print("\nFiles by extension:")    for ext, count in sorted(stats['by_extension'].items()):        print(f"  {ext}: {count}")        print("\nLargest files:")    for file in stats['largest_files'][:5]:        print(f"  {file['path']}: {format_size(file['size'])}")

5. Configuration Manager (JSON + ENV)

import jsonimport osfrom pathlib import Path class Config:    """Configuration manager."""        def __init__(self, config_file='config.json'):        self.config_file = Path(config_file)        self.config = self._load()        def _load(self):        """Load configuration."""        # Load from JSON file        if self.config_file.exists():            config = json.loads(                self.config_file.read_text(encoding='utf-8')            )        else:            config = {}                # Override with environment variables        for key in config:            env_key = f"APP_{key.upper()}"            if env_key in os.environ:                config[key] = os.environ[env_key]                return config        def get(self, key, default=None):        """Get configuration value."""        return self.config.get(key, default)        def set(self, key, value):        """Set configuration value."""        self.config[key] = value        def save(self):        """Save configuration to file."""        self.config_file.write_text(            json.dumps(self.config, indent=2, ensure_ascii=False),            encoding='utf-8'        )        def __getitem__(self, key):        return self.config[key]        def __setitem__(self, key, value):        self.config[key] = value # Usageconfig = Config('app_config.json') # Get valuesapp_name = config.get('app_name', 'My App')debug = config.get('debug', False)port = int(config.get('port', 8000)) print(f"App: {app_name}")print(f"Debug: {debug}")print(f"Port: {port}") # Set valuesconfig['version'] = '2.0.0'config.save()

Best Practices

# 1. Always specify encodingwith open('file.txt', 'r', encoding='utf-8') as f:    content = f.read() # 2. Use pathlib for modern codefrom pathlib import Pathpath = Path('data') / 'file.txt' # 3. Handle errors gracefullytry:    with open('file.txt', 'r') as f:        content = f.read()except FileNotFoundError:    print("File not found") # 4. Use json.dump with ensure_ascii=Falsewith open('data.json', 'w', encoding='utf-8') as f:    json.dump(data, f, indent=2, ensure_ascii=False) # 5. Use csv.DictReader for better readabilitywith open('data.csv', 'r') as f:    reader = csv.DictReader(f)    for row in reader:        print(row['column_name']) # 6. Process large files in chunkswith open('large.bin', 'rb') as f:    while chunk := f.read(8192):        process(chunk)

Bài Tập Thực Hành

Bài 1: CSV Analyzer

Viết program:

  • Read CSV file
  • Calculate statistics (sum, avg, min, max for numeric columns)
  • Export summary to JSON

Bài 2: JSON Merger

Viết function merge multiple JSON files:

  • Read all JSON files in directory
  • Merge into single list/dict
  • Save to output file

Bài 3: File Organizer

Viết program organize files by extension:

  • Scan directory
  • Create folders by extension
  • Move files to appropriate folders

Bài 4: Backup System

Create backup system:

  • Backup files with timestamp
  • Compress old backups
  • Clean up old backups (keep latest N)

Bài 5: Data Converter

Create universal converter:

  • Support CSV ↔ JSON ↔ Text
  • Handle different encodings
  • Command-line interface

Tóm Tắt

CSV: csv.reader(), csv.DictReader(), csv.writer()
JSON: json.load(), json.dump(), json.loads(), json.dumps()
Binary: 'rb', 'wb' modes, bytes type
os module: File/directory operations, path handling
pathlib: Modern OOP approach, Path class
Best practice: Use pathlib, UTF-8 encoding, error handling

Bài Tiếp Theo

Bài 14: Exception Handling - Try/except, custom exceptions, error handling patterns, và debugging.


Remember:

  • Use csv.DictReader for easier CSV handling
  • Always use ensure_ascii=False for JSON with Vietnamese
  • pathlib is modern and cleaner than os.path
  • Handle FileNotFoundError and JSONDecodeError
  • Process large files in chunks!