Bài 18: Performance Optimization - Tối Ưu Hiệu Suất

Mục Tiêu Bài Học

Sau khi hoàn thành bài này, bạn sẽ:

  • ✅ Profile code với cProfile
  • ✅ Thực hiện memory profiling
  • ✅ Optimize list comprehensions vs loops
  • ✅ Hiểu generator vs list
  • ✅ Áp dụng caching strategies
  • ✅ Identify common bottlenecks

Tại Sao Cần Optimization?

Performance optimization giúp:

  • Faster execution - Giảm response time
  • Resource efficiency - Tiết kiệm CPU/Memory
  • Better UX - Cải thiện user experience
  • Cost savings - Giảm infrastructure costs
# Measure first, optimize later!import time def measure_time(func):    def wrapper(*args, **kwargs):        start = time.time()        result = func(*args, **kwargs)        end = time.time()        print(f"{func.__name__}: {end - start:.4f}s")        return result    return wrapper

1. Profiling - Đo Hiệu Suất

timeit - Basic Timing

import timeit # Time single statementtime = timeit.timeit('"-".join(str(n) for n in range(100))', number=10000)print(f"Time: {time:.4f}s") # Compare approacheslist_comp = timeit.timeit('[x**2 for x in range(100)]', number=10000)map_func = timeit.timeit('list(map(lambda x: x**2, range(100)))', number=10000) print(f"List comp: {list_comp:.4f}s")print(f"Map: {map_func:.4f}s") # Time functiondef slow_function():    result = []    for i in range(1000):        result.append(i ** 2)    return result time = timeit.timeit(slow_function, number=1000)print(f"Slow function: {time:.4f}s")

cProfile - Detailed Profiling

import cProfileimport pstatsfrom io import StringIO def fibonacci(n):    """Slow recursive fibonacci"""    if n <= 1:        return n    return fibonacci(n-1) + fibonacci(n-2) def calculate_fibonacci(limit):    """Calculate multiple fibonacci numbers"""    results = []    for i in range(limit):        results.append(fibonacci(i))    return results # Profile functionprofiler = cProfile.Profile()profiler.enable() calculate_fibonacci(25) profiler.disable() # Print statsstats = pstats.Stats(profiler)stats.sort_stats('cumulative')stats.print_stats(10)  # Top 10 functions # Output:#    ncalls  tottime  percall  cumtime  percall filename:lineno(function)#    242785    0.123    0.000    0.123    0.000 fibonacci#        25    0.000    0.000    0.123    0.005 calculate_fibonacci

line_profiler - Line-by-Line Profiling

# Installpip install line_profiler
# @profile decorator (no import needed when using kernprof)@profiledef process_data(data):    result = []    for item in data:        # Slow operation        processed = item ** 2        result.append(processed)    return result # Run với kernprof:# kernprof -l -v script.py # Output shows time per line:# Line #      Hits         Time  Per Hit   % Time  Line Contents# ==============================================================#      2                                           @profile#      3                                           def process_data(data):#      4         1          2.0      2.0      0.1      result = []#      5      1001        523.0      0.5     19.8      for item in data:#      6      1000       1890.0      1.9     71.5          processed = item ** 2#      7      1000        228.0      0.2      8.6          result.append(processed)#      8         1          0.0      0.0      0.0      return result

2. Memory Profiling

memory_profiler

pip install memory_profiler
from memory_profiler import profile @profiledef memory_intensive():    # Create large list    big_list = [i for i in range(1000000)]        # Create duplicate    another_list = big_list[:]        # Process    result = [x * 2 for x in big_list]        return result # Run: python -m memory_profiler script.py # Output:# Line #    Mem usage    Increment  Occurrences   Line Contents# =============================================================#      3     38.1 MiB     38.1 MiB           1   @profile#      4                                         def memory_intensive():#      5     45.8 MiB      7.7 MiB           1       big_list = [i for i in range(1000000)]#      6     53.5 MiB      7.7 MiB           1       another_list = big_list[:]#      7     61.3 MiB      7.7 MiB           1       result = [x * 2 for x in big_list]#      8     61.3 MiB      0.0 MiB           1       return result

tracemalloc - Built-in Memory Tracking

import tracemalloc # Start trackingtracemalloc.start() # Your codedata = [i ** 2 for i in range(100000)] # Get current memorycurrent, peak = tracemalloc.get_traced_memory()print(f"Current memory: {current / 1024 / 1024:.2f} MB")print(f"Peak memory: {peak / 1024 / 1024:.2f} MB") # Get top memory consumerssnapshot = tracemalloc.take_snapshot()top_stats = snapshot.statistics('lineno') print("Top 5 memory consumers:")for stat in top_stats[:5]:    print(stat) tracemalloc.stop()

3. Data Structure Optimization

Lists vs Generators

import sys # List - Loads all into memorydef get_numbers_list(n):    return [i for i in range(n)] # Generator - Lazy evaluationdef get_numbers_generator(n):    return (i for i in range(n)) # Memory comparisonlist_data = get_numbers_list(100000)gen_data = get_numbers_generator(100000) print(f"List size: {sys.getsizeof(list_data)} bytes")  # ~800KBprint(f"Generator size: {sys.getsizeof(gen_data)} bytes")  # ~200 bytes # ✅ Use generator when possiblesum_list = sum([x**2 for x in range(100000)])  # Creates list firstsum_gen = sum(x**2 for x in range(100000))     # No intermediate list

Set vs List for Lookups

import time # Create large datasetdata_list = list(range(100000))data_set = set(range(100000)) # Lookup in list (O(n))start = time.time()for _ in range(1000):    99999 in data_listlist_time = time.time() - start # Lookup in set (O(1))start = time.time()for _ in range(1000):    99999 in data_setset_time = time.time() - start print(f"List lookup: {list_time:.4f}s")  # ~0.5sprint(f"Set lookup: {set_time:.4f}s")    # ~0.0001s # ✅ Use set for membership testing# ❌ Don't use list for lookupsif item in large_list:  # Slow O(n)    pass # ✅ Do thisif item in large_set:   # Fast O(1)    pass

Dict vs List for Key-Value Storage

# ❌ List of tuples - O(n) lookupdata_list = [(f"key{i}", i) for i in range(10000)] def find_in_list(key):    for k, v in data_list:        if k == key:            return v    return None # ✅ Dict - O(1) lookupdata_dict = {f"key{i}": i for i in range(10000)} def find_in_dict(key):    return data_dict.get(key) # Performance comparisonimport timeitlist_time = timeit.timeit(lambda: find_in_list("key9999"), number=1000)dict_time = timeit.timeit(lambda: find_in_dict("key9999"), number=1000) print(f"List: {list_time:.4f}s")  # ~1.5sprint(f"Dict: {dict_time:.4f}s")  # ~0.0001s

4. Loop Optimization

List Comprehension vs Loop

import timeit # ❌ Traditional loopdef loop_approach():    result = []    for i in range(1000):        result.append(i ** 2)    return result # ✅ List comprehension (faster)def comprehension_approach():    return [i ** 2 for i in range(1000)] # ✅ Generator expression (memory efficient)def generator_approach():    return (i ** 2 for i in range(1000)) # Benchmarkloop_time = timeit.timeit(loop_approach, number=10000)comp_time = timeit.timeit(comprehension_approach, number=10000) print(f"Loop: {loop_time:.4f}s")           # ~0.8sprint(f"Comprehension: {comp_time:.4f}s")  # ~0.6s (25% faster)

map() and filter()

# List comprehension vs mapnumbers = range(10000) # List comprehensionresult1 = [x ** 2 for x in numbers] # map (slightly faster)result2 = list(map(lambda x: x ** 2, numbers)) # filter vs comprehension# Comprehensionevens1 = [x for x in numbers if x % 2 == 0] # filterevens2 = list(filter(lambda x: x % 2 == 0, numbers)) # ✅ For simple operations, map/filter can be faster# ✅ For complex logic, list comprehension is more readable

Local Variables

import time # ❌ Global lookup (slower)items = range(100000) def process_global():    result = []    for i in items:        result.append(i ** 2)    return result # ✅ Local variables (faster)def process_local():    items_local = range(100000)    result = []    for i in items_local:        result.append(i ** 2)    return result # Local is faster due to variable lookup optimization

5. String Operations

String Concatenation

import timeit # ❌ String concatenation with + (slow for many operations)def concat_plus():    result = ""    for i in range(1000):        result += str(i)    return result # ✅ join() method (much faster)def concat_join():    return "".join(str(i) for i in range(1000)) # ✅ List append + join (also fast)def concat_list():    result = []    for i in range(1000):        result.append(str(i))    return "".join(result) # Benchmarkplus_time = timeit.timeit(concat_plus, number=1000)join_time = timeit.timeit(concat_join, number=1000)list_time = timeit.timeit(concat_list, number=1000) print(f"+ operator: {plus_time:.4f}s")  # ~0.3sprint(f"join: {join_time:.4f}s")        # ~0.15s (50% faster)print(f"list+join: {list_time:.4f}s")   # ~0.16s

String Formatting

import timeit name = "John"age = 30 # Different formatting methodsdef format_percent():    return "Name: %s, Age: %d" % (name, age) def format_format():    return "Name: {}, Age: {}".format(name, age) def format_fstring():    return f"Name: {name}, Age: {age}" # Benchmark (f-strings are fastest)percent_time = timeit.timeit(format_percent, number=100000)format_time = timeit.timeit(format_format, number=100000)fstring_time = timeit.timeit(format_fstring, number=100000) print(f"% formatting: {percent_time:.4f}s")print(f".format(): {format_time:.4f}s")print(f"f-strings: {fstring_time:.4f}s")  # Fastest

6. Caching

functools.lru_cache

from functools import lru_cacheimport time # ❌ Without cachedef fibonacci_slow(n):    if n <= 1:        return n    return fibonacci_slow(n-1) + fibonacci_slow(n-2) # ✅ With cache@lru_cache(maxsize=128)def fibonacci_fast(n):    if n <= 1:        return n    return fibonacci_fast(n-1) + fibonacci_fast(n-2) # Benchmarkstart = time.time()fibonacci_slow(30)slow_time = time.time() - start start = time.time()fibonacci_fast(30)fast_time = time.time() - start print(f"Without cache: {slow_time:.4f}s")  # ~0.3sprint(f"With cache: {fast_time:.4f}s")     # ~0.00001s (30,000x faster!) # Cache infoprint(fibonacci_fast.cache_info())# CacheInfo(hits=28, misses=31, maxsize=128, currsize=31) # Clear cachefibonacci_fast.cache_clear()

Custom Cache

from functools import wrapsfrom typing import Callableimport time class TimedCache:    """Cache với TTL"""        def __init__(self, ttl: int = 60):        self.ttl = ttl        self.cache = {}        def __call__(self, func: Callable) -> Callable:        @wraps(func)        def wrapper(*args, **kwargs):            key = str(args) + str(kwargs)                        # Check cache            if key in self.cache:                result, timestamp = self.cache[key]                if time.time() - timestamp < self.ttl:                    print(f"Cache hit for {func.__name__}")                    return result                        # Call function            print(f"Cache miss for {func.__name__}")            result = func(*args, **kwargs)                        # Store in cache            self.cache[key] = (result, time.time())                        return result                return wrapper # Sử dụng@TimedCache(ttl=5)def expensive_operation(x):    time.sleep(1)  # Simulate slow operation    return x ** 2 # First call - cache missprint(expensive_operation(5))  # Takes 1s # Second call within TTL - cache hitprint(expensive_operation(5))  # Instant # After TTL expires - cache misstime.sleep(6)print(expensive_operation(5))  # Takes 1s again

5 Ứng Dụng Thực Tế

1. Performance Monitor

import timeimport functoolsfrom typing import Callable, Dictimport statistics class PerformanceMonitor:    """Monitor function performance"""        def __init__(self):        self.metrics: Dict[str, list] = {}        def measure(self, func: Callable) -> Callable:        """Decorator để measure performance"""        @functools.wraps(func)        def wrapper(*args, **kwargs):            start = time.perf_counter()                        try:                result = func(*args, **kwargs)                return result            finally:                duration = time.perf_counter() - start                                # Store metric                func_name = f"{func.__module__}.{func.__name__}"                if func_name not in self.metrics:                    self.metrics[func_name] = []                                self.metrics[func_name].append(duration)                return wrapper        def get_stats(self, func_name: str = None):        """Get performance statistics"""        if func_name:            metrics = {func_name: self.metrics.get(func_name, [])}        else:            metrics = self.metrics                stats = {}        for name, durations in metrics.items():            if durations:                stats[name] = {                    'count': len(durations),                    'total': sum(durations),                    'mean': statistics.mean(durations),                    'median': statistics.median(durations),                    'min': min(durations),                    'max': max(durations),                    'stdev': statistics.stdev(durations) if len(durations) > 1 else 0                }                return stats        def print_report(self):        """Print performance report"""        stats = self.get_stats()                print("\n" + "="*80)        print("PERFORMANCE REPORT")        print("="*80)                for func_name, data in sorted(stats.items()):            print(f"\n{func_name}:")            print(f"  Calls: {data['count']}")            print(f"  Total: {data['total']:.4f}s")            print(f"  Mean: {data['mean']:.4f}s")            print(f"  Median: {data['median']:.4f}s")            print(f"  Min: {data['min']:.4f}s")            print(f"  Max: {data['max']:.4f}s")            print(f"  StdDev: {data['stdev']:.4f}s") # Sử dụngmonitor = PerformanceMonitor() @monitor.measuredef process_data(size):    return [i ** 2 for i in range(size)] @monitor.measuredef slow_operation():    time.sleep(0.1)    return "done" # Run functionsfor _ in range(10):    process_data(10000)    slow_operation() # Print reportmonitor.print_report()

2. Query Optimizer

from functools import lru_cacheimport timefrom typing import List, Dict, Any class QueryOptimizer:    """Optimize database queries với caching"""        def __init__(self, cache_size: int = 128):        self.cache_size = cache_size        self.query_count = 0        self.cache_hits = 0        @lru_cache(maxsize=128)    def _cached_query(self, query: str, params_str: str):        """Internal cached query method"""        self.query_count += 1                # Simulate database query        time.sleep(0.01)  # Simulate DB latency                # Return mock result        return [{"id": i, "data": f"row_{i}"} for i in range(10)]        def query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]:        """Execute query với caching"""        # Convert params to string for caching        params_str = str(params)                # Check if in cache        cache_info = self._cached_query.cache_info()        old_hits = cache_info.hits                # Execute query        result = self._cached_query(sql, params_str)                # Update cache stats        new_cache_info = self._cached_query.cache_info()        if new_cache_info.hits > old_hits:            self.cache_hits += 1                return result        def get_stats(self) -> Dict[str, Any]:        """Get query statistics"""        cache_info = self._cached_query.cache_info()                return {            'total_queries': self.query_count,            'cache_hits': cache_info.hits,            'cache_misses': cache_info.misses,            'hit_rate': cache_info.hits / self.query_count if self.query_count > 0 else 0,            'cache_size': cache_info.currsize,            'max_cache_size': cache_info.maxsize        }        def clear_cache(self):        """Clear query cache"""        self._cached_query.cache_clear() # Sử dụngoptimizer = QueryOptimizer() # Execute same queries multiple timesstart = time.time() for _ in range(100):    optimizer.query("SELECT * FROM users WHERE id = ?", (1,))    optimizer.query("SELECT * FROM posts WHERE user_id = ?", (1,))    optimizer.query("SELECT * FROM users WHERE id = ?", (2,)) elapsed = time.time() - start print(f"Total time: {elapsed:.4f}s")print(f"\nStatistics:")stats = optimizer.get_stats()for key, value in stats.items():    print(f"  {key}: {value}")

3. Batch Processor

from typing import List, Callable, Anyimport time class BatchProcessor:    """Process items in batches for better performance"""        def __init__(self, batch_size: int = 100):        self.batch_size = batch_size        def process(        self,        items: List[Any],        process_func: Callable[[List[Any]], Any],        progress_callback: Callable[[int, int], None] = None    ) -> List[Any]:        """Process items in batches"""        results = []        total = len(items)                for i in range(0, total, self.batch_size):            batch = items[i:i + self.batch_size]                        # Process batch            batch_results = process_func(batch)            results.extend(batch_results)                        # Progress callback            if progress_callback:                progress_callback(min(i + self.batch_size, total), total)                return results        def parallel_process(        self,        items: List[Any],        process_func: Callable[[Any], Any]    ) -> List[Any]:        """Process items in parallel batches"""        from concurrent.futures import ThreadPoolExecutor                def process_batch(batch):            return [process_func(item) for item in batch]                batches = [            items[i:i + self.batch_size]            for i in range(0, len(items), self.batch_size)        ]                results = []        with ThreadPoolExecutor(max_workers=4) as executor:            futures = [executor.submit(process_batch, batch) for batch in batches]                        for future in futures:                results.extend(future.result())                return results # Sử dụngprocessor = BatchProcessor(batch_size=50) # Sample datadata = list(range(1000)) def process_batch(batch):    """Process batch of items"""    # Simulate processing    return [x ** 2 for x in batch] def progress(current, total):    """Progress callback"""    percent = (current / total) * 100    print(f"Progress: {percent:.1f}% ({current}/{total})") # Sequential batch processingstart = time.time()results = processor.process(data, process_batch, progress)sequential_time = time.time() - start print(f"\nSequential: {sequential_time:.4f}s") # Parallel batch processingstart = time.time()results = processor.parallel_process(data, lambda x: x ** 2)parallel_time = time.time() - start print(f"Parallel: {parallel_time:.4f}s")print(f"Speedup: {sequential_time / parallel_time:.2f}x")

4. Memory-Efficient Data Pipeline

from typing import Iterator, Callable, Anyimport sys class DataPipeline:    """Memory-efficient data processing pipeline"""        def __init__(self):        self.transformations = []        def add_transformation(self, func: Callable[[Any], Any]):        """Add transformation to pipeline"""        self.transformations.append(func)        return self        def process_stream(self, data: Iterator[Any]) -> Iterator[Any]:        """Process data stream (memory efficient)"""        for item in data:            # Apply all transformations            result = item            for transform in self.transformations:                result = transform(result)                        yield result        def process_batch(self, data: list, chunk_size: int = 1000) -> Iterator[Any]:        """Process data in chunks"""        for i in range(0, len(data), chunk_size):            chunk = data[i:i + chunk_size]                        # Process chunk            for item in chunk:                result = item                for transform in self.transformations:                    result = transform(result)                                yield result # Sử dụngpipeline = DataPipeline() # Add transformationspipeline.add_transformation(lambda x: x * 2)pipeline.add_transformation(lambda x: x + 10)pipeline.add_transformation(lambda x: x ** 2) # ❌ Load all into memorydata = list(range(1000000))results = []for item in data:    result = item * 2    result = result + 10    result = result ** 2    results.append(result) print(f"List size: {sys.getsizeof(results) / 1024 / 1024:.2f} MB") # ✅ Stream processing (memory efficient)data_stream = range(1000000)result_stream = pipeline.process_stream(data_stream) # Process without loading all into memorycount = 0for result in result_stream:    count += 1    if count >= 10:  # Process only what you need        break print(f"Generator size: {sys.getsizeof(result_stream)} bytes")

5. Algorithm Optimizer

import timefrom typing import Callable, List, Tupleimport functools class AlgorithmOptimizer:    """Compare and optimize algorithms"""        def __init__(self):        self.results = {}        def benchmark(        self,        algorithms: List[Tuple[str, Callable]],        test_data: any,        iterations: int = 100    ):        """Benchmark multiple algorithms"""        print(f"Benchmarking {len(algorithms)} algorithms...")        print(f"Iterations: {iterations}\n")                for name, algo in algorithms:            times = []                        for _ in range(iterations):                start = time.perf_counter()                result = algo(test_data)                duration = time.perf_counter() - start                times.append(duration)                        avg_time = sum(times) / len(times)            min_time = min(times)            max_time = max(times)                        self.results[name] = {                'avg': avg_time,                'min': min_time,                'max': max_time,                'times': times            }                        print(f"{name}:")            print(f"  Average: {avg_time*1000:.4f}ms")            print(f"  Min: {min_time*1000:.4f}ms")            print(f"  Max: {max_time*1000:.4f}ms")                # Find fastest        fastest = min(self.results.items(), key=lambda x: x[1]['avg'])        print(f"\n✅ Fastest: {fastest[0]}")                # Compare others to fastest        print("\nComparison to fastest:")        for name, data in self.results.items():            if name != fastest[0]:                speedup = data['avg'] / fastest[1]['avg']                print(f"  {name}: {speedup:.2f}x slower") # Sử dụngoptimizer = AlgorithmOptimizer() # Test datatest_data = list(range(1000)) # Different sorting algorithmsdef bubble_sort(arr):    arr = arr.copy()    n = len(arr)    for i in range(n):        for j in range(0, n-i-1):            if arr[j] > arr[j+1]:                arr[j], arr[j+1] = arr[j+1], arr[j]    return arr def quick_sort(arr):    if len(arr) <= 1:        return arr    pivot = arr[len(arr) // 2]    left = [x for x in arr if x < pivot]    middle = [x for x in arr if x == pivot]    right = [x for x in arr if x > pivot]    return quick_sort(left) + middle + quick_sort(right) def builtin_sort(arr):    return sorted(arr) # Benchmarkalgorithms = [    ("Bubble Sort", bubble_sort),    ("Quick Sort", quick_sort),    ("Built-in Sort", builtin_sort)] optimizer.benchmark(algorithms, test_data, iterations=10)

Best Practices

1. Premature Optimization

# ❌ Don't optimize prematurelydef process():    # Complex optimization that makes code unreadable    return sum(map(lambda x: x**2, filter(lambda x: x%2==0, range(1000)))) # ✅ Write clear code firstdef process():    even_numbers = [x for x in range(1000) if x % 2 == 0]    squares = [x ** 2 for x in even_numbers]    return sum(squares) # Then profile and optimize if needed

2. Measure First

# ✅ Always measure before optimizingimport cProfile def main():    # Your code    pass # Profile to find bottleneckscProfile.run('main()') # Then optimize the slow parts only

3. Use Built-ins

# ❌ Custom implementationdef my_sum(numbers):    total = 0    for n in numbers:        total += n    return total # ✅ Use built-in (optimized in C)total = sum(numbers) # Built-ins are usually faster:# sum(), min(), max(), sorted(), len(), any(), all()

4. Choose Right Data Structure

# For frequent lookups: dict/set (O(1))# For ordered data: list (O(1) access by index)# For FIFO/LIFO: deque (O(1) append/pop from both ends)# For priority queue: heapq from collections import dequeimport heapq # ✅ Right tool for the jobqueue = deque()  # Fast append/popleftqueue.append(1)queue.popleft() # Priority queueheap = []heapq.heappush(heap, (priority, item))item = heapq.heappop(heap)

Bài Tập Thực Hành

Bài 1: Profile Slow Code

Tìm bottlenecks trong code với cProfile và tối ưu.

Bài 2: Memory Optimization

Optimize memory usage của data processing function.

Bài 3: Algorithm Comparison

So sánh performance của different sorting/searching algorithms.

Bài 4: Caching Strategy

Implement caching strategy cho expensive operations.

Bài 5: Batch Processing

Optimize data processing với batch và parallel execution.

Tóm Tắt

Trong bài này chúng ta đã học:

  1. Profiling - timeit, cProfile, line_profiler, memory_profiler
  2. Data Structures - Generators, sets, dicts optimization
  3. Loop Optimization - List comprehensions, map/filter
  4. String Operations - join vs +, f-strings
  5. Caching - lru_cache, custom cache với TTL
  6. Real Applications - Performance monitor, query optimizer, batch processor

Remember: Measure first, optimize later! 📊


Bài tiếp theo: Bài 19: Best Practices - Học Python best practices và coding standards! ⭐