Bài 13: File I/O - Làm Việc Với Files (Phần 2)
Mục Tiêu Bài Học
Sau khi hoàn thành bài này, bạn sẽ:
- ✅ Làm việc với CSV files
- ✅ Read/write JSON files
- ✅ Handle binary files
- ✅ Sử dụng os module cho file operations
- ✅ Làm việc với pathlib
CSV Files
CSV (Comma-Separated Values) là format phổ biến cho tabular data.
Reading CSV Files
import csv # Basic CSV readingwith open('data.csv', 'r', encoding='utf-8') as file: reader = csv.reader(file) for row in reader: print(row) # ['column1', 'column2', 'column3'] # Example CSV file (users.csv):# name,email,age# Alice,[email protected],25# Bob,[email protected],30# Charlie,[email protected],35 # Read with headerwith open('users.csv', 'r', encoding='utf-8') as file: reader = csv.reader(file) header = next(reader) # Skip header print(f"Columns: {header}") for row in reader: name, email, age = row print(f"{name}: {email} ({age})")
CSV DictReader
import csv # Read as dictionaries (easier to work with)with open('users.csv', 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: print(row) # {'name': 'Alice', 'email': '[email protected]', 'age': '25'} print(f"Name: {row['name']}") print(f"Email: {row['email']}") print(f"Age: {row['age']}") # Process all rowswith open('users.csv', 'r', encoding='utf-8') as file: reader = csv.DictReader(file) users = list(reader) # Filter users adults = [u for u in users if int(u['age']) >= 18] print(f"Adults: {len(adults)}")
Writing CSV Files
import csv # Basic CSV writingdata = [ ['name', 'email', 'age'], ['Alice', '[email protected]', 25], ['Bob', '[email protected]', 30], ['Charlie', '[email protected]', 35]] with open('output.csv', 'w', encoding='utf-8', newline='') as file: writer = csv.writer(file) for row in data: writer.writerow(row) # Or write all at once # writer.writerows(data) # CSV DictWriterusers = [ {'name': 'Alice', 'email': '[email protected]', 'age': 25}, {'name': 'Bob', 'email': '[email protected]', 'age': 30}, {'name': 'Charlie', 'email': '[email protected]', 'age': 35}] with open('users.csv', 'w', encoding='utf-8', newline='') as file: fieldnames = ['name', 'email', 'age'] writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() # Write header row writer.writerows(users) # Write data rows
CSV with Different Delimiters
import csv # Tab-separated values (TSV)with open('data.tsv', 'r', encoding='utf-8') as file: reader = csv.reader(file, delimiter='\t') for row in reader: print(row) # Semicolon-separated (common in Europe)with open('data.csv', 'r', encoding='utf-8') as file: reader = csv.reader(file, delimiter=';') for row in reader: print(row) # Custom delimiterwith open('data.txt', 'r', encoding='utf-8') as file: reader = csv.reader(file, delimiter='|') for row in reader: print(row) # Writing with custom delimiterwith open('output.csv', 'w', encoding='utf-8', newline='') as file: writer = csv.writer(file, delimiter=';') writer.writerow(['col1', 'col2', 'col3']) writer.writerow(['val1', 'val2', 'val3'])
JSON Files
JSON (JavaScript Object Notation) là format phổ biến cho structured data.
Reading JSON Files
import json # Read JSON filewith open('data.json', 'r', encoding='utf-8') as file: data = json.load(file) print(data) print(type(data)) # dict or list # Example JSON file (user.json):# {# "name": "Alice",# "email": "[email protected]",# "age": 25,# "hobbies": ["reading", "coding"]# } with open('user.json', 'r', encoding='utf-8') as file: user = json.load(file) print(f"Name: {user['name']}") print(f"Email: {user['email']}") print(f"Hobbies: {', '.join(user['hobbies'])}") # JSON with array (users.json):# [# {"name": "Alice", "age": 25},# {"name": "Bob", "age": 30}# ] with open('users.json', 'r', encoding='utf-8') as file: users = json.load(file) for user in users: print(f"{user['name']}: {user['age']}")
Writing JSON Files
import json # Write JSON fileuser = { "name": "Alice", "email": "[email protected]", "age": 25, "hobbies": ["reading", "coding"], "active": True} with open('user.json', 'w', encoding='utf-8') as file: json.dump(user, file) # Pretty print with indentwith open('user.json', 'w', encoding='utf-8') as file: json.dump(user, file, indent=2) # Output:# {# "name": "Alice",# "email": "[email protected]",# "age": 25,# ...# } # Ensure ASCII False for Unicode (Vietnamese)with open('user.json', 'w', encoding='utf-8') as file: json.dump(user, file, indent=2, ensure_ascii=False) # Write arrayusers = [ {"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}, {"name": "Charlie", "age": 35}] with open('users.json', 'w', encoding='utf-8') as file: json.dump(users, file, indent=2, ensure_ascii=False)
JSON Strings
import json # Python object → JSON stringuser = {"name": "Alice", "age": 25}json_string = json.dumps(user)print(json_string) # '{"name": "Alice", "age": 25}' # Pretty printjson_string = json.dumps(user, indent=2)print(json_string)# {# "name": "Alice",# "age": 25# } # JSON string → Python objectjson_string = '{"name": "Bob", "age": 30}'user = json.loads(json_string)print(user) # {'name': 'Bob', 'age': 30}print(type(user)) # <class 'dict'> # Handle nested structuresdata = { "users": [ {"name": "Alice", "age": 25}, {"name": "Bob", "age": 30} ], "total": 2} json_string = json.dumps(data, indent=2)print(json_string)
JSON Error Handling
import json # Handle invalid JSONtry: with open('data.json', 'r', encoding='utf-8') as file: data = json.load(file)except FileNotFoundError: print("File not found")except json.JSONDecodeError as e: print(f"Invalid JSON: {e}") # Validate JSON stringdef is_valid_json(json_string): """Check if string is valid JSON.""" try: json.loads(json_string) return True except json.JSONDecodeError: return False print(is_valid_json('{"name": "Alice"}')) # Trueprint(is_valid_json('{invalid}')) # False
Binary Files
Binary files chứa non-text data (images, audio, video, etc.).
Reading Binary Files
# Read binary filewith open('image.jpg', 'rb') as file: data = file.read() print(f"File size: {len(data)} bytes") print(f"First 10 bytes: {data[:10]}") # Read binary in chunks (memory efficient)def read_binary_chunks(filename, chunk_size=1024): """Read binary file in chunks.""" with open(filename, 'rb') as file: while True: chunk = file.read(chunk_size) if not chunk: break yield chunk # Usagefor chunk in read_binary_chunks('large_file.bin'): process_chunk(chunk)
Writing Binary Files
# Write binary filedata = b'\x00\x01\x02\x03\x04\x05' # Bytes with open('output.bin', 'wb') as file: file.write(data) # Copy binary filewith open('source.jpg', 'rb') as src: with open('destination.jpg', 'wb') as dst: dst.write(src.read()) # Copy in chunks (better for large files)with open('source.jpg', 'rb') as src: with open('destination.jpg', 'wb') as dst: while True: chunk = src.read(8192) # 8KB chunks if not chunk: break dst.write(chunk)
Bytes vs Strings
# String to bytestext = "Hello, World!"data = text.encode('utf-8') # b'Hello, World!'print(type(data)) # <class 'bytes'> # Bytes to stringdata = b'Hello, World!'text = data.decode('utf-8') # 'Hello, World!'print(type(text)) # <class 'str'> # Vietnamese texttext = "Xin chào Việt Nam"data = text.encode('utf-8')print(data) # Decode backtext = data.decode('utf-8')print(text) # Xin chào Việt Nam
File System Operations - os Module
File and Directory Info
import os # Check if existsprint(os.path.exists('file.txt')) # True/False # Check if fileprint(os.path.isfile('file.txt')) # True/False # Check if directoryprint(os.path.isdir('folder')) # True/False # Get file sizesize = os.path.getsize('file.txt')print(f"Size: {size} bytes") # Get absolute pathabs_path = os.path.abspath('file.txt')print(abs_path) # Get file name from pathfilename = os.path.basename('/path/to/file.txt')print(filename) # file.txt # Get directory from pathdirname = os.path.dirname('/path/to/file.txt')print(dirname) # /path/to # Split pathpath, filename = os.path.split('/path/to/file.txt')print(path) # /path/toprint(filename) # file.txt # Split extensionname, ext = os.path.splitext('file.txt')print(name) # fileprint(ext) # .txt
Directory Operations
import os # Get current directorycurrent = os.getcwd()print(current) # Change directoryos.chdir('/path/to/directory') # List files and directoriesitems = os.listdir('.')print(items) # List only filesfiles = [f for f in os.listdir('.') if os.path.isfile(f)] # List only directoriesdirs = [d for d in os.listdir('.') if os.path.isdir(d)] # List with full pathitems = [os.path.join('.', item) for item in os.listdir('.')] # Create directoryos.mkdir('new_folder') # Create nested directoriesos.makedirs('path/to/nested/folder', exist_ok=True) # Remove empty directoryos.rmdir('folder') # Remove fileos.remove('file.txt') # Rename/move fileos.rename('old_name.txt', 'new_name.txt')
Path Operations
import os # Join paths (cross-platform)path = os.path.join('folder', 'subfolder', 'file.txt')print(path)# Windows: folder\subfolder\file.txt# Unix: folder/subfolder/file.txt # Build relative pathpath = os.path.join('..', 'data', 'input.txt')print(path) # ../data/input.txt # Check if path is absoluteprint(os.path.isabs('/absolute/path')) # Trueprint(os.path.isabs('relative/path')) # False # Get common pathpath1 = '/home/user/project/file1.txt'path2 = '/home/user/project/file2.txt'common = os.path.commonpath([path1, path2])print(common) # /home/user/project
Walk Directory Tree
import os # Walk directory recursivelyfor root, dirs, files in os.walk('.'): print(f"Directory: {root}") print(f" Subdirs: {dirs}") print(f" Files: {files}") # Find all Python filesdef find_files(directory, extension): """Find all files with extension.""" result = [] for root, dirs, files in os.walk(directory): for file in files: if file.endswith(extension): result.append(os.path.join(root, file)) return result # Usagepy_files = find_files('.', '.py')for file in py_files: print(file) # Count files by extensiondef count_by_extension(directory): """Count files by extension.""" counts = {} for root, dirs, files in os.walk(directory): for file in files: _, ext = os.path.splitext(file) counts[ext] = counts.get(ext, 0) + 1 return counts # Usagecounts = count_by_extension('.')for ext, count in counts.items(): print(f"{ext}: {count} files")
pathlib Module (Modern Approach)
pathlib là modern, object-oriented approach cho file paths.
Basic pathlib
from pathlib import Path # Create Path objectpath = Path('data/file.txt')print(path) # Current directorycurrent = Path.cwd()print(current) # Home directoryhome = Path.home()print(home) # Join paths with /path = Path('data') / 'subfolder' / 'file.txt'print(path) # Check if existspath = Path('file.txt')print(path.exists()) # Check if file/directoryprint(path.is_file())print(path.is_dir()) # Get partspath = Path('/home/user/data/file.txt')print(path.name) # file.txtprint(path.stem) # fileprint(path.suffix) # .txtprint(path.parent) # /home/user/dataprint(path.parts) # ('/', 'home', 'user', 'data', 'file.txt')
File Operations with pathlib
from pathlib import Path # Read filepath = Path('data.txt')content = path.read_text(encoding='utf-8')print(content) # Write filepath = Path('output.txt')path.write_text('Hello, World!', encoding='utf-8') # Read binarypath = Path('image.jpg')data = path.read_bytes() # Write binarypath = Path('output.bin')path.write_bytes(b'\x00\x01\x02') # List directorypath = Path('.')for item in path.iterdir(): print(item) # List only filesfiles = [f for f in path.iterdir() if f.is_file()] # Glob patternspath = Path('.')py_files = list(path.glob('*.py'))all_py_files = list(path.rglob('*.py')) # Recursive # Create directorypath = Path('new_folder')path.mkdir(exist_ok=True) # Create nested directoriespath = Path('path/to/nested')path.mkdir(parents=True, exist_ok=True) # Delete filepath = Path('file.txt')path.unlink(missing_ok=True) # Delete directorypath = Path('folder')path.rmdir() # Only if empty
pathlib vs os.path
from pathlib import Pathimport os # os.path approachfilepath = os.path.join('data', 'file.txt')if os.path.exists(filepath): with open(filepath, 'r') as f: content = f.read() # pathlib approach (cleaner)filepath = Path('data') / 'file.txt'if filepath.exists(): content = filepath.read_text() # Get file info# os.pathsize = os.path.getsize('file.txt')modified = os.path.getmtime('file.txt') # pathlibpath = Path('file.txt')size = path.stat().st_sizemodified = path.stat().st_mtime
Ví Dụ Thực Tế
1. CSV to JSON Converter
import csvimport json def csv_to_json(csv_file, json_file): """ Convert CSV to JSON. Args: csv_file (str): Input CSV file json_file (str): Output JSON file Returns: bool: True if success """ try: # Read CSV with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) data = list(reader) # Write JSON with open(json_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"Converted {len(data)} rows") return True except Exception as e: print(f"Error: {e}") return False # Usagecsv_to_json('users.csv', 'users.json')
2. JSON Database Manager
import jsonfrom pathlib import Path class JSONDatabase: """Simple JSON-based database.""" def __init__(self, filepath): self.filepath = Path(filepath) self.data = self._load() def _load(self): """Load data from file.""" if self.filepath.exists(): return json.loads(self.filepath.read_text(encoding='utf-8')) return [] def _save(self): """Save data to file.""" self.filepath.write_text( json.dumps(self.data, indent=2, ensure_ascii=False), encoding='utf-8' ) def add(self, item): """Add item to database.""" self.data.append(item) self._save() def all(self): """Get all items.""" return self.data def find(self, key, value): """Find items by key-value.""" return [item for item in self.data if item.get(key) == value] def update(self, key, value, updates): """Update items matching key-value.""" for item in self.data: if item.get(key) == value: item.update(updates) self._save() def delete(self, key, value): """Delete items matching key-value.""" self.data = [item for item in self.data if item.get(key) != value] self._save() # Usagedb = JSONDatabase('users.json') # Add usersdb.add({"id": 1, "name": "Alice", "age": 25})db.add({"id": 2, "name": "Bob", "age": 30}) # Get allusers = db.all()print(users) # Findalice = db.find("name", "Alice")print(alice) # Updatedb.update("name", "Alice", {"age": 26}) # Deletedb.delete("name", "Bob")
3. File Backup Utility
from pathlib import Pathimport shutilfrom datetime import datetime def backup_file(source, backup_dir='backups'): """ Create timestamped backup of file. Args: source (str): Source file path backup_dir (str): Backup directory Returns: Path: Backup file path or None """ try: source = Path(source) if not source.exists(): print(f"Source file not found: {source}") return None # Create backup directory backup_path = Path(backup_dir) backup_path.mkdir(exist_ok=True) # Generate backup filename with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f"{source.stem}_{timestamp}{source.suffix}" backup_file = backup_path / backup_name # Copy file shutil.copy2(source, backup_file) print(f"Backup created: {backup_file}") return backup_file except Exception as e: print(f"Backup failed: {e}") return None def cleanup_old_backups(backup_dir='backups', keep_count=5): """Keep only recent backups.""" backup_path = Path(backup_dir) if not backup_path.exists(): return # Get all backup files sorted by modification time backups = sorted(backup_path.iterdir(), key=lambda p: p.stat().st_mtime) # Delete old backups while len(backups) > keep_count: old = backups.pop(0) old.unlink() print(f"Deleted old backup: {old}") # Usagebackup_file('important_data.json')cleanup_old_backups(keep_count=3)
4. Directory Statistics
from pathlib import Path def analyze_directory(directory): """ Analyze directory contents. Returns: dict: Statistics """ path = Path(directory) if not path.exists(): return None stats = { 'total_files': 0, 'total_dirs': 0, 'total_size': 0, 'by_extension': {}, 'largest_files': [] } files_with_size = [] for item in path.rglob('*'): if item.is_file(): stats['total_files'] += 1 size = item.stat().st_size stats['total_size'] += size # Count by extension ext = item.suffix or 'no extension' stats['by_extension'][ext] = stats['by_extension'].get(ext, 0) + 1 # Track for largest files files_with_size.append((item, size)) elif item.is_dir(): stats['total_dirs'] += 1 # Get 10 largest files files_with_size.sort(key=lambda x: x[1], reverse=True) stats['largest_files'] = [ {'path': str(f), 'size': s} for f, s in files_with_size[:10] ] return stats def format_size(bytes): """Format bytes to human readable.""" for unit in ['B', 'KB', 'MB', 'GB']: if bytes < 1024: return f"{bytes:.2f} {unit}" bytes /= 1024 return f"{bytes:.2f} TB" # Usagestats = analyze_directory('.')if stats: print(f"Total files: {stats['total_files']}") print(f"Total directories: {stats['total_dirs']}") print(f"Total size: {format_size(stats['total_size'])}") print("\nFiles by extension:") for ext, count in sorted(stats['by_extension'].items()): print(f" {ext}: {count}") print("\nLargest files:") for file in stats['largest_files'][:5]: print(f" {file['path']}: {format_size(file['size'])}")
5. Configuration Manager (JSON + ENV)
import jsonimport osfrom pathlib import Path class Config: """Configuration manager.""" def __init__(self, config_file='config.json'): self.config_file = Path(config_file) self.config = self._load() def _load(self): """Load configuration.""" # Load from JSON file if self.config_file.exists(): config = json.loads( self.config_file.read_text(encoding='utf-8') ) else: config = {} # Override with environment variables for key in config: env_key = f"APP_{key.upper()}" if env_key in os.environ: config[key] = os.environ[env_key] return config def get(self, key, default=None): """Get configuration value.""" return self.config.get(key, default) def set(self, key, value): """Set configuration value.""" self.config[key] = value def save(self): """Save configuration to file.""" self.config_file.write_text( json.dumps(self.config, indent=2, ensure_ascii=False), encoding='utf-8' ) def __getitem__(self, key): return self.config[key] def __setitem__(self, key, value): self.config[key] = value # Usageconfig = Config('app_config.json') # Get valuesapp_name = config.get('app_name', 'My App')debug = config.get('debug', False)port = int(config.get('port', 8000)) print(f"App: {app_name}")print(f"Debug: {debug}")print(f"Port: {port}") # Set valuesconfig['version'] = '2.0.0'config.save()
Best Practices
# 1. Always specify encodingwith open('file.txt', 'r', encoding='utf-8') as f: content = f.read() # 2. Use pathlib for modern codefrom pathlib import Pathpath = Path('data') / 'file.txt' # 3. Handle errors gracefullytry: with open('file.txt', 'r') as f: content = f.read()except FileNotFoundError: print("File not found") # 4. Use json.dump with ensure_ascii=Falsewith open('data.json', 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # 5. Use csv.DictReader for better readabilitywith open('data.csv', 'r') as f: reader = csv.DictReader(f) for row in reader: print(row['column_name']) # 6. Process large files in chunkswith open('large.bin', 'rb') as f: while chunk := f.read(8192): process(chunk)
Bài Tập Thực Hành
Bài 1: CSV Analyzer
Viết program:
- Read CSV file
- Calculate statistics (sum, avg, min, max for numeric columns)
- Export summary to JSON
Bài 2: JSON Merger
Viết function merge multiple JSON files:
- Read all JSON files in directory
- Merge into single list/dict
- Save to output file
Bài 3: File Organizer
Viết program organize files by extension:
- Scan directory
- Create folders by extension
- Move files to appropriate folders
Bài 4: Backup System
Create backup system:
- Backup files with timestamp
- Compress old backups
- Clean up old backups (keep latest N)
Bài 5: Data Converter
Create universal converter:
- Support CSV ↔ JSON ↔ Text
- Handle different encodings
- Command-line interface
Tóm Tắt
✅ CSV: csv.reader(), csv.DictReader(), csv.writer()
✅ JSON: json.load(), json.dump(), json.loads(), json.dumps()
✅ Binary: 'rb', 'wb' modes, bytes type
✅ os module: File/directory operations, path handling
✅ pathlib: Modern OOP approach, Path class
✅ Best practice: Use pathlib, UTF-8 encoding, error handling
Bài Tiếp Theo
Bài 14: Exception Handling - Try/except, custom exceptions, error handling patterns, và debugging.
Remember:
- Use
csv.DictReaderfor easier CSV handling - Always use
ensure_ascii=Falsefor JSON with Vietnamese pathlibis modern and cleaner thanos.path- Handle FileNotFoundError and JSONDecodeError
- Process large files in chunks!