# 基本性能分析示例
import cProfile
def slow_function():
total = 0
for i in range(1000000):
total += i ** 2
return total
cProfile.run('slow_function()', sort='cumtime')
# 确定性分析 vs 统计性分析对比
import cProfile
import random
def analyze_data(data):
result = []
for item in data:
result.append(process_item(item))
return result
def process_item(item):
# 模拟耗时操作
return [x * x for x in range(100) if x % 2 == 0]
# cProfile是确定性分析工具,记录每次函数调用
cProfile.run('analyze_data(range(100))', sort='time')
# Profiling vs Benchmarking 示例
import cProfile
import time
# Benchmarking:测量"执行需要多长时间"
start = time.perf_counter()
result = sum(i ** 2 for i in range(100000))
end = time.perf_counter()
print(f"Benchmark: {end - start:.4f}s")
# Profiling:分析"时间花在了哪里"
cProfile.run('sum(i ** 2 for i in range(100000))', sort='cumtime')
# 使用Profile API进行选择性分析
import cProfile
def data_processing():
total = 0
for i in range(500000):
total += i * i
return total
def io_operation():
# 模拟IO操作
data = [i for i in range(10000)]
return sum(data)
# 精确控制分析范围
profiler = cProfile.Profile()
profiler.enable() # 开始分析
# 只对关键的数据处理部分进行分析
result1 = data_processing()
result2 = io_operation()
profiler.disable() # 停止分析
profiler.print_stats(sort='cumtime')
# run() 和 runcall() 方法使用
import cProfile
def complex_calculation(n, multiplier):
result = []
for i in range(n):
result.append(i * multiplier)
return sum(result)
profiler = cProfile.Profile()
# 使用字符串执行(和cProfile.run类似)
profiler.run('complex_calculation(10000, 3)')
# 直接调用函数并传递参数
profiler.runcall(complex_calculation, 5000, 2)
# 手动生成统计并输出
profiler.create_stats()
profiler.print_stats(sort='cumtime')
# 上下文管理器风格封装
import cProfile
from contextlib import contextmanager
@contextmanager
def profile_context(sort='cumtime', lines=30):
profiler = cProfile.Profile()
profiler.enable()
try:
yield
finally:
profiler.disable()
import pstats, io
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats(sort)
ps.print_stats(lines)
print(s.getvalue())
# 使用上下文管理器进行性能分析
with profile_context(sort='time', lines=15):
result = [i ** 3 for i in range(100000)]
print(f"计算结果长度: {len(result)}")
# cProfile vs profile 性能开销对比
import cProfile
import profile
import time
def compute():
total = 0
for i in range(500000):
total += i * i
return total
# 使用cProfile(C扩展实现,低开销)
start = time.time()
cProfile.run('compute()', sort='cumtime')
cprofile_time = time.time() - start
# 使用profile(纯Python实现,高开销)
start = time.time()
profile.run('compute()', sort='cumtime')
profile_time = time.time() - start
print(f"\ncProfile总耗时: {cprofile_time:.3f}s")
print(f"profile总耗时: {profile_time:.3f}s")
print(f"profile开销是cProfile的 {profile_time/cprofile_time:.1f} 倍")
# 建议:日常开发用cProfile,仅当需要定制分析行为时用profile
# 多线程环境下的性能分析
import cProfile
import threading
def worker(thread_id):
"""工作线程函数"""
total = 0
for i in range(200000):
total += i * thread_id
return total
def threaded_work():
threads = []
for i in range(4):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# cProfile支持线程安全(Python 3.8+)
profiler = cProfile.Profile()
profiler.enable()
threaded_work()
profiler.disable()
profiler.print_stats(sort='cumtime')
# 注意:cProfile默认汇总所有线程的数据
# 如需按线程单独分析,需在每个线程中单独创建Profile实例
本节通过三个实战案例展示cProfile在实际项目中的应用。Web请求性能剖析展示了如何分析Web应用接口的响应时间分布,定位高延迟的端点处理函数,特别是序列化和数据聚合类操作的优化。数据处理管线优化展示了在数据科学和ETL场景中如何使用cProfile发现数据处理瓶颈,对比不同实现方式(列表推导式 vs for循环、内置函数 vs 手动实现)的性能差异。数据库查询性能分析展示了在ORM框架场景中通过cProfile分析N+1查询问题、识别不必要的重复查询,从而优化数据库访问性能。每个案例都包含了完整的分析过程、关键指标解读和优化建议。
# 案例1: Web请求性能剖析
import cProfile
import pstats
import io
class RequestHandler:
"""模拟Web请求处理器"""
def handle_request(self, endpoint, params):
if endpoint == '/api/users':
return self.get_users()
elif endpoint == '/api/orders':
return self.get_orders()
elif endpoint == '/api/dashboard':
return self.get_dashboard()
def get_users(self):
# 模拟数据库查询 + 序列化
users = [{'id': i, 'name': f'user_{i}'} for i in range(500)]
return self.serialize_users(users)
def serialize_users(self, users):
# 逐个序列化为大写
return [{'id': u['id'], 'name': u['name'].upper()} for u in users]
def get_orders(self):
orders = [{'id': i, 'amount': i * 10.5} for i in range(300)]
return self.calculate_totals(orders)
def calculate_totals(self, orders):
for order in orders:
order['tax'] = order['amount'] * 0.13
order['total'] = order['amount'] + order['tax']
return orders
def get_dashboard(self):
# 同时获取用户和订单数据进行聚合
data = {
'users': len(self.get_users()),
'orders': len(self.get_orders())
}
return self.aggregate_stats(data)
def aggregate_stats(self, data):
return {k: v for k, v in data.items()}
# 模拟100次并发请求
handler = RequestHandler()
profiler = cProfile.Profile()
profiler.enable()
for _ in range(100):
handler.handle_request('/api/users', {})
handler.handle_request('/api/orders', {})
profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumtime')
ps.print_stats(15)
print("=== Web请求性能分析 ===")
print(s.getvalue())
# 观察:get_users和get_orders各耗时多少?
# 哪个序列化方法是热点?
# 案例2: 数据处理管线优化
import cProfile
import pstats
import io
class DataPipeline:
"""数据处理管线 — 对比优化前后性能"""
def __init__(self, data_size=10000):
self.data = list(range(data_size))
def pipeline_v1(self):
"""未优化版本:手动循环"""
result = self.filter_data(self.data)
result = self.transform_data(result)
result = self.aggregate_data(result)
return result
def pipeline_v2(self):
"""优化版本:内置函数+生成器"""
result = self.filter_data_fast(self.data)
result = self.transform_data_fast(result)
result = self.aggregate_data_fast(result)
return result
def filter_data(self, data):
return [x for x in data if x % 2 == 0]
def filter_data_fast(self, data):
return list(filter(lambda x: x % 2 == 0, data))
def transform_data(self, data):
result = []
for x in data:
result.append(x * 2 + 1)
return result
def transform_data_fast(self, data):
return [x * 2 + 1 for x in data]
def aggregate_data(self, data):
total = 0
for x in data:
total += x
return total / len(data) if data else 0
def aggregate_data_fast(self, data):
return sum(data) / len(data) if data else 0
pipeline = DataPipeline(20000)
# 分析未优化版本
profiler = cProfile.Profile()
profiler.enable()
pipeline.pipeline_v1()
profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumtime')
ps.print_stats(15)
print("=== 未优化版本 ===")
print(s.getvalue())
# 分析优化版本
profiler2 = cProfile.Profile()
profiler2.enable()
pipeline.pipeline_v2()
profiler2.disable()
s2 = io.StringIO()
ps2 = pstats.Stats(profiler2, stream=s2).sort_stats('cumtime')
ps2.print_stats(15)
print("\n=== 优化版本 ===")
print(s2.getvalue())
# 案例3: 数据库N+1查询问题分析(模拟)
import cProfile
import pstats
import io
class DatabaseSimulator:
"""模拟数据库操作"""
def __init__(self):
self.cache = {}
def slow_query(self, table, record_id):
"""模拟无索引的全表扫描查询"""
result = [i for i in range(10000) if i == record_id]
return result
def n_plus_one_query(self, count=50):
"""N+1查询问题:1次主查询 + N次关联查询"""
# 1次主查询获取所有主记录
main_records = [{'id': i} for i in range(count)]
# N次关联查询:对每条记录做一次额外查询
for record in main_records:
# 每次查询都需要独立的扫描操作
sub_data = [
self.slow_query('sub_table', record['id'])
for _ in range(10)
]
return main_records
def batched_query(self, count=50):
"""优化版本:批量查询替代逐条查询"""
main_records = [{'id': i} for i in range(count)]
# 一次批量查询获取所有关联数据
batch_data = {
i: [self.slow_query('sub_table', i)]
for i in range(count)
}
return main_records
db = DatabaseSimulator()
# 分析N+1问题版本
profiler = cProfile.Profile()
profiler.enable()
db.n_plus_one_query(30)
profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumtime')
ps.print_stats(15)
print("=== N+1查询问题分析 ===")
print(s.getvalue())
# 观察:slow_query被调用了多少次?
# 优化思路:使用IN查询一次批量加载所有关联记录
# 或者使用懒加载+缓存减少重复查询