Bài 18: Performance Optimization - Tối Ưu Hiệu Suất
Mục Tiêu Bài Học
Sau khi hoàn thành bài này, bạn sẽ:
- ✅ Profile code với cProfile
- ✅ Thực hiện memory profiling
- ✅ Optimize list comprehensions vs loops
- ✅ Hiểu generator vs list
- ✅ Áp dụng caching strategies
- ✅ Identify common bottlenecks
Tại Sao Cần Optimization?
Performance optimization giúp:
- Faster execution - Giảm response time
- Resource efficiency - Tiết kiệm CPU/Memory
- Better UX - Cải thiện user experience
- Cost savings - Giảm infrastructure costs
# Measure first, optimize later!import time def measure_time(func): def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f"{func.__name__}: {end - start:.4f}s") return result return wrapper
1. Profiling - Đo Hiệu Suất
timeit - Basic Timing
import timeit # Time single statementtime = timeit.timeit('"-".join(str(n) for n in range(100))', number=10000)print(f"Time: {time:.4f}s") # Compare approacheslist_comp = timeit.timeit('[x**2 for x in range(100)]', number=10000)map_func = timeit.timeit('list(map(lambda x: x**2, range(100)))', number=10000) print(f"List comp: {list_comp:.4f}s")print(f"Map: {map_func:.4f}s") # Time functiondef slow_function(): result = [] for i in range(1000): result.append(i ** 2) return result time = timeit.timeit(slow_function, number=1000)print(f"Slow function: {time:.4f}s")
cProfile - Detailed Profiling
import cProfileimport pstatsfrom io import StringIO def fibonacci(n): """Slow recursive fibonacci""" if n <= 1: return n return fibonacci(n-1) + fibonacci(n-2) def calculate_fibonacci(limit): """Calculate multiple fibonacci numbers""" results = [] for i in range(limit): results.append(fibonacci(i)) return results # Profile functionprofiler = cProfile.Profile()profiler.enable() calculate_fibonacci(25) profiler.disable() # Print statsstats = pstats.Stats(profiler)stats.sort_stats('cumulative')stats.print_stats(10) # Top 10 functions # Output:# ncalls tottime percall cumtime percall filename:lineno(function)# 242785 0.123 0.000 0.123 0.000 fibonacci# 25 0.000 0.000 0.123 0.005 calculate_fibonacci
line_profiler - Line-by-Line Profiling
# Installpip install line_profiler
# @profile decorator (no import needed when using kernprof)@profiledef process_data(data): result = [] for item in data: # Slow operation processed = item ** 2 result.append(processed) return result # Run với kernprof:# kernprof -l -v script.py # Output shows time per line:# Line # Hits Time Per Hit % Time Line Contents# ==============================================================# 2 @profile# 3 def process_data(data):# 4 1 2.0 2.0 0.1 result = []# 5 1001 523.0 0.5 19.8 for item in data:# 6 1000 1890.0 1.9 71.5 processed = item ** 2# 7 1000 228.0 0.2 8.6 result.append(processed)# 8 1 0.0 0.0 0.0 return result
2. Memory Profiling
memory_profiler
pip install memory_profiler
from memory_profiler import profile @profiledef memory_intensive(): # Create large list big_list = [i for i in range(1000000)] # Create duplicate another_list = big_list[:] # Process result = [x * 2 for x in big_list] return result # Run: python -m memory_profiler script.py # Output:# Line # Mem usage Increment Occurrences Line Contents# =============================================================# 3 38.1 MiB 38.1 MiB 1 @profile# 4 def memory_intensive():# 5 45.8 MiB 7.7 MiB 1 big_list = [i for i in range(1000000)]# 6 53.5 MiB 7.7 MiB 1 another_list = big_list[:]# 7 61.3 MiB 7.7 MiB 1 result = [x * 2 for x in big_list]# 8 61.3 MiB 0.0 MiB 1 return result
tracemalloc - Built-in Memory Tracking
import tracemalloc # Start trackingtracemalloc.start() # Your codedata = [i ** 2 for i in range(100000)] # Get current memorycurrent, peak = tracemalloc.get_traced_memory()print(f"Current memory: {current / 1024 / 1024:.2f} MB")print(f"Peak memory: {peak / 1024 / 1024:.2f} MB") # Get top memory consumerssnapshot = tracemalloc.take_snapshot()top_stats = snapshot.statistics('lineno') print("Top 5 memory consumers:")for stat in top_stats[:5]: print(stat) tracemalloc.stop()
3. Data Structure Optimization
Lists vs Generators
import sys # List - Loads all into memorydef get_numbers_list(n): return [i for i in range(n)] # Generator - Lazy evaluationdef get_numbers_generator(n): return (i for i in range(n)) # Memory comparisonlist_data = get_numbers_list(100000)gen_data = get_numbers_generator(100000) print(f"List size: {sys.getsizeof(list_data)} bytes") # ~800KBprint(f"Generator size: {sys.getsizeof(gen_data)} bytes") # ~200 bytes # ✅ Use generator when possiblesum_list = sum([x**2 for x in range(100000)]) # Creates list firstsum_gen = sum(x**2 for x in range(100000)) # No intermediate list
Set vs List for Lookups
import time # Create large datasetdata_list = list(range(100000))data_set = set(range(100000)) # Lookup in list (O(n))start = time.time()for _ in range(1000): 99999 in data_listlist_time = time.time() - start # Lookup in set (O(1))start = time.time()for _ in range(1000): 99999 in data_setset_time = time.time() - start print(f"List lookup: {list_time:.4f}s") # ~0.5sprint(f"Set lookup: {set_time:.4f}s") # ~0.0001s # ✅ Use set for membership testing# ❌ Don't use list for lookupsif item in large_list: # Slow O(n) pass # ✅ Do thisif item in large_set: # Fast O(1) pass
Dict vs List for Key-Value Storage
# ❌ List of tuples - O(n) lookupdata_list = [(f"key{i}", i) for i in range(10000)] def find_in_list(key): for k, v in data_list: if k == key: return v return None # ✅ Dict - O(1) lookupdata_dict = {f"key{i}": i for i in range(10000)} def find_in_dict(key): return data_dict.get(key) # Performance comparisonimport timeitlist_time = timeit.timeit(lambda: find_in_list("key9999"), number=1000)dict_time = timeit.timeit(lambda: find_in_dict("key9999"), number=1000) print(f"List: {list_time:.4f}s") # ~1.5sprint(f"Dict: {dict_time:.4f}s") # ~0.0001s
4. Loop Optimization
List Comprehension vs Loop
import timeit # ❌ Traditional loopdef loop_approach(): result = [] for i in range(1000): result.append(i ** 2) return result # ✅ List comprehension (faster)def comprehension_approach(): return [i ** 2 for i in range(1000)] # ✅ Generator expression (memory efficient)def generator_approach(): return (i ** 2 for i in range(1000)) # Benchmarkloop_time = timeit.timeit(loop_approach, number=10000)comp_time = timeit.timeit(comprehension_approach, number=10000) print(f"Loop: {loop_time:.4f}s") # ~0.8sprint(f"Comprehension: {comp_time:.4f}s") # ~0.6s (25% faster)
map() and filter()
# List comprehension vs mapnumbers = range(10000) # List comprehensionresult1 = [x ** 2 for x in numbers] # map (slightly faster)result2 = list(map(lambda x: x ** 2, numbers)) # filter vs comprehension# Comprehensionevens1 = [x for x in numbers if x % 2 == 0] # filterevens2 = list(filter(lambda x: x % 2 == 0, numbers)) # ✅ For simple operations, map/filter can be faster# ✅ For complex logic, list comprehension is more readable
Local Variables
import time # ❌ Global lookup (slower)items = range(100000) def process_global(): result = [] for i in items: result.append(i ** 2) return result # ✅ Local variables (faster)def process_local(): items_local = range(100000) result = [] for i in items_local: result.append(i ** 2) return result # Local is faster due to variable lookup optimization
5. String Operations
String Concatenation
import timeit # ❌ String concatenation with + (slow for many operations)def concat_plus(): result = "" for i in range(1000): result += str(i) return result # ✅ join() method (much faster)def concat_join(): return "".join(str(i) for i in range(1000)) # ✅ List append + join (also fast)def concat_list(): result = [] for i in range(1000): result.append(str(i)) return "".join(result) # Benchmarkplus_time = timeit.timeit(concat_plus, number=1000)join_time = timeit.timeit(concat_join, number=1000)list_time = timeit.timeit(concat_list, number=1000) print(f"+ operator: {plus_time:.4f}s") # ~0.3sprint(f"join: {join_time:.4f}s") # ~0.15s (50% faster)print(f"list+join: {list_time:.4f}s") # ~0.16s
String Formatting
import timeit name = "John"age = 30 # Different formatting methodsdef format_percent(): return "Name: %s, Age: %d" % (name, age) def format_format(): return "Name: {}, Age: {}".format(name, age) def format_fstring(): return f"Name: {name}, Age: {age}" # Benchmark (f-strings are fastest)percent_time = timeit.timeit(format_percent, number=100000)format_time = timeit.timeit(format_format, number=100000)fstring_time = timeit.timeit(format_fstring, number=100000) print(f"% formatting: {percent_time:.4f}s")print(f".format(): {format_time:.4f}s")print(f"f-strings: {fstring_time:.4f}s") # Fastest
6. Caching
functools.lru_cache
from functools import lru_cacheimport time # ❌ Without cachedef fibonacci_slow(n): if n <= 1: return n return fibonacci_slow(n-1) + fibonacci_slow(n-2) # ✅ With cache@lru_cache(maxsize=128)def fibonacci_fast(n): if n <= 1: return n return fibonacci_fast(n-1) + fibonacci_fast(n-2) # Benchmarkstart = time.time()fibonacci_slow(30)slow_time = time.time() - start start = time.time()fibonacci_fast(30)fast_time = time.time() - start print(f"Without cache: {slow_time:.4f}s") # ~0.3sprint(f"With cache: {fast_time:.4f}s") # ~0.00001s (30,000x faster!) # Cache infoprint(fibonacci_fast.cache_info())# CacheInfo(hits=28, misses=31, maxsize=128, currsize=31) # Clear cachefibonacci_fast.cache_clear()
Custom Cache
from functools import wrapsfrom typing import Callableimport time class TimedCache: """Cache với TTL""" def __init__(self, ttl: int = 60): self.ttl = ttl self.cache = {} def __call__(self, func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): key = str(args) + str(kwargs) # Check cache if key in self.cache: result, timestamp = self.cache[key] if time.time() - timestamp < self.ttl: print(f"Cache hit for {func.__name__}") return result # Call function print(f"Cache miss for {func.__name__}") result = func(*args, **kwargs) # Store in cache self.cache[key] = (result, time.time()) return result return wrapper # Sử dụng@TimedCache(ttl=5)def expensive_operation(x): time.sleep(1) # Simulate slow operation return x ** 2 # First call - cache missprint(expensive_operation(5)) # Takes 1s # Second call within TTL - cache hitprint(expensive_operation(5)) # Instant # After TTL expires - cache misstime.sleep(6)print(expensive_operation(5)) # Takes 1s again
5 Ứng Dụng Thực Tế
1. Performance Monitor
import timeimport functoolsfrom typing import Callable, Dictimport statistics class PerformanceMonitor: """Monitor function performance""" def __init__(self): self.metrics: Dict[str, list] = {} def measure(self, func: Callable) -> Callable: """Decorator để measure performance""" @functools.wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() try: result = func(*args, **kwargs) return result finally: duration = time.perf_counter() - start # Store metric func_name = f"{func.__module__}.{func.__name__}" if func_name not in self.metrics: self.metrics[func_name] = [] self.metrics[func_name].append(duration) return wrapper def get_stats(self, func_name: str = None): """Get performance statistics""" if func_name: metrics = {func_name: self.metrics.get(func_name, [])} else: metrics = self.metrics stats = {} for name, durations in metrics.items(): if durations: stats[name] = { 'count': len(durations), 'total': sum(durations), 'mean': statistics.mean(durations), 'median': statistics.median(durations), 'min': min(durations), 'max': max(durations), 'stdev': statistics.stdev(durations) if len(durations) > 1 else 0 } return stats def print_report(self): """Print performance report""" stats = self.get_stats() print("\n" + "="*80) print("PERFORMANCE REPORT") print("="*80) for func_name, data in sorted(stats.items()): print(f"\n{func_name}:") print(f" Calls: {data['count']}") print(f" Total: {data['total']:.4f}s") print(f" Mean: {data['mean']:.4f}s") print(f" Median: {data['median']:.4f}s") print(f" Min: {data['min']:.4f}s") print(f" Max: {data['max']:.4f}s") print(f" StdDev: {data['stdev']:.4f}s") # Sử dụngmonitor = PerformanceMonitor() @monitor.measuredef process_data(size): return [i ** 2 for i in range(size)] @monitor.measuredef slow_operation(): time.sleep(0.1) return "done" # Run functionsfor _ in range(10): process_data(10000) slow_operation() # Print reportmonitor.print_report()
2. Query Optimizer
from functools import lru_cacheimport timefrom typing import List, Dict, Any class QueryOptimizer: """Optimize database queries với caching""" def __init__(self, cache_size: int = 128): self.cache_size = cache_size self.query_count = 0 self.cache_hits = 0 @lru_cache(maxsize=128) def _cached_query(self, query: str, params_str: str): """Internal cached query method""" self.query_count += 1 # Simulate database query time.sleep(0.01) # Simulate DB latency # Return mock result return [{"id": i, "data": f"row_{i}"} for i in range(10)] def query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]: """Execute query với caching""" # Convert params to string for caching params_str = str(params) # Check if in cache cache_info = self._cached_query.cache_info() old_hits = cache_info.hits # Execute query result = self._cached_query(sql, params_str) # Update cache stats new_cache_info = self._cached_query.cache_info() if new_cache_info.hits > old_hits: self.cache_hits += 1 return result def get_stats(self) -> Dict[str, Any]: """Get query statistics""" cache_info = self._cached_query.cache_info() return { 'total_queries': self.query_count, 'cache_hits': cache_info.hits, 'cache_misses': cache_info.misses, 'hit_rate': cache_info.hits / self.query_count if self.query_count > 0 else 0, 'cache_size': cache_info.currsize, 'max_cache_size': cache_info.maxsize } def clear_cache(self): """Clear query cache""" self._cached_query.cache_clear() # Sử dụngoptimizer = QueryOptimizer() # Execute same queries multiple timesstart = time.time() for _ in range(100): optimizer.query("SELECT * FROM users WHERE id = ?", (1,)) optimizer.query("SELECT * FROM posts WHERE user_id = ?", (1,)) optimizer.query("SELECT * FROM users WHERE id = ?", (2,)) elapsed = time.time() - start print(f"Total time: {elapsed:.4f}s")print(f"\nStatistics:")stats = optimizer.get_stats()for key, value in stats.items(): print(f" {key}: {value}")
3. Batch Processor
from typing import List, Callable, Anyimport time class BatchProcessor: """Process items in batches for better performance""" def __init__(self, batch_size: int = 100): self.batch_size = batch_size def process( self, items: List[Any], process_func: Callable[[List[Any]], Any], progress_callback: Callable[[int, int], None] = None ) -> List[Any]: """Process items in batches""" results = [] total = len(items) for i in range(0, total, self.batch_size): batch = items[i:i + self.batch_size] # Process batch batch_results = process_func(batch) results.extend(batch_results) # Progress callback if progress_callback: progress_callback(min(i + self.batch_size, total), total) return results def parallel_process( self, items: List[Any], process_func: Callable[[Any], Any] ) -> List[Any]: """Process items in parallel batches""" from concurrent.futures import ThreadPoolExecutor def process_batch(batch): return [process_func(item) for item in batch] batches = [ items[i:i + self.batch_size] for i in range(0, len(items), self.batch_size) ] results = [] with ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(process_batch, batch) for batch in batches] for future in futures: results.extend(future.result()) return results # Sử dụngprocessor = BatchProcessor(batch_size=50) # Sample datadata = list(range(1000)) def process_batch(batch): """Process batch of items""" # Simulate processing return [x ** 2 for x in batch] def progress(current, total): """Progress callback""" percent = (current / total) * 100 print(f"Progress: {percent:.1f}% ({current}/{total})") # Sequential batch processingstart = time.time()results = processor.process(data, process_batch, progress)sequential_time = time.time() - start print(f"\nSequential: {sequential_time:.4f}s") # Parallel batch processingstart = time.time()results = processor.parallel_process(data, lambda x: x ** 2)parallel_time = time.time() - start print(f"Parallel: {parallel_time:.4f}s")print(f"Speedup: {sequential_time / parallel_time:.2f}x")
4. Memory-Efficient Data Pipeline
from typing import Iterator, Callable, Anyimport sys class DataPipeline: """Memory-efficient data processing pipeline""" def __init__(self): self.transformations = [] def add_transformation(self, func: Callable[[Any], Any]): """Add transformation to pipeline""" self.transformations.append(func) return self def process_stream(self, data: Iterator[Any]) -> Iterator[Any]: """Process data stream (memory efficient)""" for item in data: # Apply all transformations result = item for transform in self.transformations: result = transform(result) yield result def process_batch(self, data: list, chunk_size: int = 1000) -> Iterator[Any]: """Process data in chunks""" for i in range(0, len(data), chunk_size): chunk = data[i:i + chunk_size] # Process chunk for item in chunk: result = item for transform in self.transformations: result = transform(result) yield result # Sử dụngpipeline = DataPipeline() # Add transformationspipeline.add_transformation(lambda x: x * 2)pipeline.add_transformation(lambda x: x + 10)pipeline.add_transformation(lambda x: x ** 2) # ❌ Load all into memorydata = list(range(1000000))results = []for item in data: result = item * 2 result = result + 10 result = result ** 2 results.append(result) print(f"List size: {sys.getsizeof(results) / 1024 / 1024:.2f} MB") # ✅ Stream processing (memory efficient)data_stream = range(1000000)result_stream = pipeline.process_stream(data_stream) # Process without loading all into memorycount = 0for result in result_stream: count += 1 if count >= 10: # Process only what you need break print(f"Generator size: {sys.getsizeof(result_stream)} bytes")
5. Algorithm Optimizer
import timefrom typing import Callable, List, Tupleimport functools class AlgorithmOptimizer: """Compare and optimize algorithms""" def __init__(self): self.results = {} def benchmark( self, algorithms: List[Tuple[str, Callable]], test_data: any, iterations: int = 100 ): """Benchmark multiple algorithms""" print(f"Benchmarking {len(algorithms)} algorithms...") print(f"Iterations: {iterations}\n") for name, algo in algorithms: times = [] for _ in range(iterations): start = time.perf_counter() result = algo(test_data) duration = time.perf_counter() - start times.append(duration) avg_time = sum(times) / len(times) min_time = min(times) max_time = max(times) self.results[name] = { 'avg': avg_time, 'min': min_time, 'max': max_time, 'times': times } print(f"{name}:") print(f" Average: {avg_time*1000:.4f}ms") print(f" Min: {min_time*1000:.4f}ms") print(f" Max: {max_time*1000:.4f}ms") # Find fastest fastest = min(self.results.items(), key=lambda x: x[1]['avg']) print(f"\n✅ Fastest: {fastest[0]}") # Compare others to fastest print("\nComparison to fastest:") for name, data in self.results.items(): if name != fastest[0]: speedup = data['avg'] / fastest[1]['avg'] print(f" {name}: {speedup:.2f}x slower") # Sử dụngoptimizer = AlgorithmOptimizer() # Test datatest_data = list(range(1000)) # Different sorting algorithmsdef bubble_sort(arr): arr = arr.copy() n = len(arr) for i in range(n): for j in range(0, n-i-1): if arr[j] > arr[j+1]: arr[j], arr[j+1] = arr[j+1], arr[j] return arr def quick_sort(arr): if len(arr) <= 1: return arr pivot = arr[len(arr) // 2] left = [x for x in arr if x < pivot] middle = [x for x in arr if x == pivot] right = [x for x in arr if x > pivot] return quick_sort(left) + middle + quick_sort(right) def builtin_sort(arr): return sorted(arr) # Benchmarkalgorithms = [ ("Bubble Sort", bubble_sort), ("Quick Sort", quick_sort), ("Built-in Sort", builtin_sort)] optimizer.benchmark(algorithms, test_data, iterations=10)
Best Practices
1. Premature Optimization
# ❌ Don't optimize prematurelydef process(): # Complex optimization that makes code unreadable return sum(map(lambda x: x**2, filter(lambda x: x%2==0, range(1000)))) # ✅ Write clear code firstdef process(): even_numbers = [x for x in range(1000) if x % 2 == 0] squares = [x ** 2 for x in even_numbers] return sum(squares) # Then profile and optimize if needed
2. Measure First
# ✅ Always measure before optimizingimport cProfile def main(): # Your code pass # Profile to find bottleneckscProfile.run('main()') # Then optimize the slow parts only
3. Use Built-ins
# ❌ Custom implementationdef my_sum(numbers): total = 0 for n in numbers: total += n return total # ✅ Use built-in (optimized in C)total = sum(numbers) # Built-ins are usually faster:# sum(), min(), max(), sorted(), len(), any(), all()
4. Choose Right Data Structure
# For frequent lookups: dict/set (O(1))# For ordered data: list (O(1) access by index)# For FIFO/LIFO: deque (O(1) append/pop from both ends)# For priority queue: heapq from collections import dequeimport heapq # ✅ Right tool for the jobqueue = deque() # Fast append/popleftqueue.append(1)queue.popleft() # Priority queueheap = []heapq.heappush(heap, (priority, item))item = heapq.heappop(heap)
Bài Tập Thực Hành
Bài 1: Profile Slow Code
Tìm bottlenecks trong code với cProfile và tối ưu.
Bài 2: Memory Optimization
Optimize memory usage của data processing function.
Bài 3: Algorithm Comparison
So sánh performance của different sorting/searching algorithms.
Bài 4: Caching Strategy
Implement caching strategy cho expensive operations.
Bài 5: Batch Processing
Optimize data processing với batch và parallel execution.
Tóm Tắt
Trong bài này chúng ta đã học:
- ✅ Profiling - timeit, cProfile, line_profiler, memory_profiler
- ✅ Data Structures - Generators, sets, dicts optimization
- ✅ Loop Optimization - List comprehensions, map/filter
- ✅ String Operations - join vs +, f-strings
- ✅ Caching - lru_cache, custom cache với TTL
- ✅ Real Applications - Performance monitor, query optimizer, batch processor
Remember: Measure first, optimize later! 📊
Bài tiếp theo: Bài 19: Best Practices - Học Python best practices và coding standards! ⭐