← 返回并发编程目录
← 返回学习笔记首页
专题: Python并发编程系统学习
关键词: Python, 并发编程, 并发陷阱, 竞态条件, 最佳实践, 线程安全, GIL误解
一、陷阱1:认为Python线程可以实现CPU并行
这是Python开发者进入并发领域时最容易犯的错误。许多人听说Python支持多线程编程,就自然地认为创建多个线程就能利用多核CPU同时执行计算密集型任务获得加速。但事实恰恰相反——由于全局解释器锁(GIL)的存在,Python线程在CPU密集型任务上不仅不能并行,甚至可能比串行更慢。
GIL的本质
CPython解释器的内存管理不是线程安全的,因此设计了一把大锁——GIL,确保同一时刻只有一个线程在执行Python字节码。这意味着无论你有多少CPU核心,Python线程永远无法真正并行执行CPU计算。GIL在每个线程执行一定数量的字节码指令后会主动释放(每100个ticks),但即便如此,多个线程仍然是在轮流占用一个核心,而不是分散到多个核心上。
import time
import threading
def cpu_heavy(n):
"""CPU密集型任务:计算斐波那契数列"""
if n <= 1:
return n
return cpu_heavy(n - 1) + cpu_heavy(n - 2)
# 串行执行
start = time.time()
for _ in range(4):
cpu_heavy(35)
print(f"串行执行耗时: {time.time() - start:.2f}s")
# 多线程执行
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_heavy, args=(35,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"多线程执行耗时: {time.time() - start:.2f}s")
运行这段代码你会发现,多线程版本不仅没有加速,反而比串行更慢(线程切换带来了额外开销)。这就是GIL对CPU密集型任务造成的典型影响。
正确的做法:使用多进程
对于CPU密集型任务,应该使用multiprocessing模块或concurrent.futures.ProcessPoolExecutor。每个进程拥有独立的Python解释器和GIL,可以真正并行运行在多核上。
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_heavy(n):
if n <= 1:
return n
return cpu_heavy(n - 1) + cpu_heavy(n - 2)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_heavy, 35) for _ in range(4)]
results = [f.result() for f in futures]
print(f"多进程执行耗时: {time.time() - start:.2f}s")
多进程版本能够利用所有CPU核心,理论上可以获得接近核心数的线性加速比。但需要注意:进程间通信(IPC)的开销远大于线程间通信,不适合需要频繁共享数据的场景。
什么时候该用多线程
多线程在Python中并非一无是处。对于I/O密集型任务(网络请求、文件读写、数据库查询),线程在等待I/O时会释放GIL,因此多线程可以显著提升吞吐量。总结来说:
I/O密集型 :多线程有效,因为I/O等待时GIL被释放
CPU密集型 :使用多进程,绕过GIL限制
混合型 :使用多进程+异步I/O的组合模式
核心教训: 不要用多线程加速CPU密集型计算。对于I/O密集型任务使用多线程或asyncio,对于CPU密集型任务使用多进程。理解GIL不是bug而是CPython的设计决策(为了保持C扩展的兼容性)。
二、陷阱2:忽略共享数据的同步
即使知道了GIL的存在,许多开发者仍然认为Python中的简单操作是线程安全的——毕竟GIL保证了一时刻只有一个线程执行字节码。但问题在于,Python中的一行代码(如count += 1)实际上对应了多条字节码指令,线程可能在执行中途被切换,导致数据竞争。
问题重现
import threading
counter = 0
def increment():
global counter
for _ in range(100000):
# 问题所在:counter += 1 不是原子操作
# 它实际上对应:LOAD counter → ADD 1 → STORE counter
# 线程可能在这三条指令之间被切换
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"期望结果: 1000000, 实际结果: {counter}")
# 每次运行结果都可能不同,很可能远小于1000000
即使有GIL的存在,counter += 1也不是原子操作。两个线程可能同时读到相同的counter值,分别加1后写回,导致一次递增被覆盖。GIL只保证字节码级别的原子性,不保证Python语句级别的原子性。
正确的做法:使用锁保护共享数据
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 确保同一时刻只有一个线程修改counter
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"加锁后的结果: {counter}")
# 现在结果正确:1000000
更好的方案:使用队列和避免共享状态
锁虽然可以解决问题,但引入了死锁、性能下降等风险。更好的并发设计原则是"不要共享状态"。使用queue.Queue在线程间传递数据,而不是直接操作共享变量。
import threading
import queue
def worker(task_queue, result_queue):
"""工作线程从任务队列取任务,结果放入结果队列"""
while True:
item = task_queue.get()
if item is None: # 哨兵值,表示结束
task_queue.task_done()
break
# 处理任务
result_queue.put(item * 2)
task_queue.task_done()
task_q = queue.Queue()
result_q = queue.Queue()
# 启动工作线程
threads = [threading.Thread(target=worker, args=(task_q, result_q)) for _ in range(4)]
for t in threads:
t.start()
# 放入任务
for i in range(100):
task_q.put(i)
# 等待所有任务完成
task_q.join()
# 结束工作线程
for _ in threads:
task_q.put(None)
for t in threads:
t.join()
# 收集结果
while not result_q.empty():
print(result_q.get())
核心教训: GIL不能替代显式同步。修改共享变量必须使用Lock保护。最佳实践是使用Queue等线程安全的数据结构传递数据而非共享状态。如果必须共享状态,使用threading模块提供的同步原语(Lock、RLock、Semaphore、Event等)。
三、陷阱3:锁的顺序不一致导致死锁
当多个线程需要同时持有多个锁时,如果每个线程获取锁的顺序不一致,就可能发生死锁——所有线程都在等待对方释放锁,程序永久停滞。这是并发编程中最经典也最难以调试的问题之一。
死锁的经典场景
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
print("线程1获取了锁A")
time.sleep(0.1) # 模拟一些工作
with lock_b:
print("线程1获取了锁B")
def thread_2():
with lock_b:
print("线程2获取了锁B")
time.sleep(0.1) # 模拟一些工作
with lock_a:
print("线程2获取了锁A")
t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start()
t2.start()
# 程序很可能永远卡在这里,两个线程互相等待
线程1持有锁A等待锁B,线程2持有锁B等待锁A,形成经典的"循环等待"死锁条件。程序将永久挂起,不会抛出异常,很难排查。
解决方案一:固定锁获取顺序
最简单的解决方案是确保所有线程以相同的顺序获取锁。如果所有线程都先获取锁A再获取锁B,则永远不会出现循环等待。
def thread_1():
with lock_a:
print("线程1获取了锁A")
time.sleep(0.1)
with lock_b:
print("线程1获取了锁B")
def thread_2():
# 同样先获取锁A,再获取锁B
with lock_a: # 与thread_1保持一致的顺序
print("线程2获取了锁A")
time.sleep(0.1)
with lock_b:
print("线程2获取了锁B")
解决方案二:分级锁策略
为每个锁分配一个全局唯一的编号,要求所有线程按照编号递增的顺序获取锁。这是一种系统化的方法,适用于大量锁的场景。
import threading
from contextlib import contextmanager
class LockManager:
"""分级锁管理器,确保锁按全局顺序获取"""
def __init__(self):
self._locks = {}
self._counter = 0
self._lock = threading.Lock()
def register(self, name):
"""注册一个新锁,分配全局唯一编号"""
with self._lock:
self._counter += 1
lock_id = self._counter
self._locks[name] = (lock_id, threading.Lock())
return name
@contextmanager
def acquire(self, *names):
"""按照编号顺序获取多个锁"""
sorted_names = sorted(names,
key=lambda n: self._locks[n][0])
locks = [self._locks[n][1] for n in sorted_names]
# 依次获取(避免嵌套with的缩进地狱)
for lock in locks:
lock.acquire()
try:
yield
finally:
# 逆序释放
for lock in reversed(locks):
lock.release()
lm = LockManager()
a = lm.register("A")
b = lm.register("B")
def safe_thread_1():
with lm.acquire(a, b):
print("线程1安全地持有A和B")
def safe_thread_2():
with lm.acquire(a, b):
print("线程2安全地持有A和B")
解决方案三:使用超时和重试
使用lock.acquire(timeout=...)方法,在无法获取锁时主动放弃已持有的锁,避免永久等待。
def thread_with_timeout():
if not lock_a.acquire(timeout=1):
print("获取锁A超时,放弃操作")
return
try:
time.sleep(0.1)
if not lock_b.acquire(timeout=1):
print("获取锁B超时,释放锁A后重试")
lock_a.release()
return thread_with_timeout() # 重试
try:
print("成功获取两把锁")
finally:
lock_b.release()
finally:
lock_a.release()
核心教训: 避免死锁的三条黄金法则:(1)固定锁获取顺序——所有线程以相同顺序获取锁;(2)使用超时机制——不无限等待;(3)最小化锁的持有时间——只持有锁做必要操作,尽快释放。条件允许时,尽量使用Queue等无需显式锁定的抽象。
四、陷阱4:asyncio中的阻塞调用
asyncio的核心理念是单线程事件循环驱动的协作式并发。其高效的前提是所有任务都主动让出控制权(通过await挂起)。如果在协程中调用了同步阻塞的I/O操作(如time.sleep()、requests.get()、open().read()),整个事件循环会被阻塞,所有其他协程都无法运行。
错误的做法:在协程中调用同步阻塞函数
import asyncio
import time
import requests
async def fetch_data(url):
# 严重错误:requests.get() 是同步阻塞的
# 当它执行时,整个事件循环被阻塞
response = requests.get(url)
return response.text
async def main():
# 创建多个任务,但它们会顺序执行!
# 因为每个协程中的requests.get都会阻塞事件循环
tasks = [
fetch_data("https://example.com/api/1"),
fetch_data("https://example.com/api/2"),
fetch_data("https://example.com/api/3"),
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
# 结果是:三个请求依次执行,完全没有并发效果
上述代码中,requests.get()是一个同步阻塞调用。当第一个协程调用它时,事件循环被阻塞,其他协程无法运行。直到第一个请求完成,控制权才返回给事件循环。因此asyncio.gather()完全失去了并发优势,三个请求是顺序执行的。
解决方法一:使用异步库
首选方案是使用原生支持asyncio的HTTP库,如aiohttp、httpx(支持async模式)。这些库在等待网络I/O时会主动await,让事件循环运行其他任务。
import asyncio
import aiohttp
async def fetch_data(session, url):
# aiohttp 的请求是异步非阻塞的
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [
fetch_data(session, "https://example.com/api/1"),
fetch_data(session, "https://example.com/api/2"),
fetch_data(session, "https://example.com/api/3"),
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
# 现在三个请求真正并发执行,总耗时接近最慢的那个请求
解决方法二:使用run_in_executor
如果无法替换同步库,可以使用loop.run_in_executor()将阻塞操作交给线程池处理,避免阻塞事件循环。
import asyncio
import requests
async def fetch_data(url):
loop = asyncio.get_event_loop()
# 将同步的requests.get交给线程池执行
response = await loop.run_in_executor(
None, # None使用默认线程池
requests.get,
url
)
return response.text
async def main():
tasks = [
fetch_data("https://example.com/api/1"),
fetch_data("https://example.com/api/2"),
fetch_data("https://example.com/api/3"),
]
results = await asyncio.gather(*tasks)
return results
需要注意的其他阻塞操作
time.sleep() → 使用asyncio.sleep()
subprocess.run() → 使用asyncio.create_subprocess_exec()
同步文件读写 → 使用aiofiles或在run_in_executor中执行
数据库查询(同步驱动) → 使用异步驱动(aiomysql、asyncpg、motor等)
CPU密集型计算 → 在run_in_executor中交给进程池
核心教训: 在asyncio代码中,任何阻塞调用都会破坏并发性。一律使用异步替代库;没有异步替代的同步操作使用run_in_executor托管到线程池或进程池。永远不要在协程中直接调用time.sleep()、同步I/O库或CPU密集型计算。
五、陷阱5:忘记处理CancelledError
在asyncio中,任务取消是通过向协程内注入CancelledError异常实现的。许多开发者在编写协程时没有正确处理这个异常,导致资源泄漏、状态不一致等问题。更严重的是,某些代码错误地捕获并屏蔽了CancelledError,导致任务无法被取消。
问题场景:资源清理失败
import asyncio
async def resource_intensive_task():
# 获取资源(如打开连接)
conn = await acquire_connection()
try:
# 执行耗时操作
result = await conn.query("SELECT ...")
return result
except:
# 错误的做法:捕获所有异常但不重新抛出
# 这会吞噬CancelledError,导致任务无法取消
print(f"发生错误: {e}")
finally:
# 如果任务被取消,finally块仍然会执行
# 但conn.close()本身也可能是异步的
await conn.close()
# 问题:如果close也抛出异常怎么办?
正确的做法:catch特定的异常
import asyncio
async def resource_intensive_task():
conn = None
try:
conn = await acquire_connection()
result = await conn.query("SELECT ...")
return result
except asyncio.CancelledError:
# 任务被取消时的特殊处理
# 可以执行额外的清理逻辑
logger.info("任务被取消,正在清理资源")
raise # 重要:必须重新抛出CancelledError
except Exception as e:
# 只捕获非取消的异常
logger.error(f"执行出错: {e}")
raise
finally:
if conn is not None:
try:
await conn.close()
except Exception:
pass # 关闭连接时的异常不应影响主流程
使用cancel_shielded_scope保护关键操作
有时我们需要确保某些关键操作(如清理、提交事务)不被取消。可以使用asyncio.shield()保护这些操作。
import asyncio
async def safe_task():
conn = await acquire_connection()
try:
result = await conn.query("SELECT ...")
return result
finally:
# 确保连接关闭不被取消中断
try:
await asyncio.shield(conn.close())
except asyncio.CancelledError:
# shield只能保护第一层取消
# 如果close本身耗时过长,内层仍可能被取消
# 这里需要处理第一层取消后,确保close最终被执行
await conn.close() # 再次尝试
Python 3.9+ 的TaskGroup中的取消处理
Python 3.11引入的TaskGroup和Timeout使得取消语义更加严格。在TaskGroup中,如果任何一个子任务抛出异常,所有其他子任务都会自动被取消。
import asyncio
async def worker(name, sleep_time):
try:
await asyncio.sleep(sleep_time)
return f"{name}完成"
except asyncio.CancelledError:
print(f"{name}被取消了")
raise # 必须重新抛出,否则TaskGroup无法感知
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(worker("A", 1))
t2 = tg.create_task(worker("B", 2))
t3 = tg.create_task(worker("C", 3))
# 如果t1失败,t2和t3会被自动取消
except ExceptionGroup:
print("一组任务发生了错误")
asyncio.run(main())
核心教训: (1)永远不要用except:或except Exception:捕获所有异常——这会屏蔽CancelledError;(2)捕获CancelledError后必须重新抛出,除非你有非常充分的理由;(3)用asyncio.shield()保护关键的清理操作;(4)在finally块中做资源清理,确保无论如何都能释放资源。
六、陷阱6:协程未await导致资源泄漏
这是一个非常隐蔽的错误。当你调用一个异步函数但不await它时,你得到的不是一个执行结果,而是一个协程对象。这个协程对象如果未被await或包装为Task,在垃圾回收时会触发RuntimeWarning。更严重的是,协程内部持有的资源永远不会被释放。
问题重现
import asyncio
async def open_connection():
# 模拟打开一个网络连接
print("连接已打开")
# 这里的代码永远不会执行,因为协程没有被await
await asyncio.sleep(1)
print("连接已关闭")
async def main():
# 错误:忘记await
open_connection()
# 协程对象在函数返回时被垃圾回收
# 你会看到:RuntimeWarning: coroutine 'open_connection' was never awaited
# 并且"连接已打开"实际上仍然会打印(协程开始执行了),
# 但后续的代码不会执行,连接资源泄漏
asyncio.run(main())
运行上述代码,Python会发出警告:
RuntimeWarning: coroutine 'open_connection' was never awaited
更隐蔽的变体:创建Task但不保存引用
import asyncio
async def background_task():
while True:
print("后台任务运行中...")
await asyncio.sleep(1)
async def main():
# 错误:创建的Task没有保存引用
# Task被创建后立即被垃圾回收
asyncio.create_task(background_task())
# 等待一会儿
await asyncio.sleep(3)
print("main完成")
# 注意:background_task可能不会执行完
# 因为它的Task对象被GC了
asyncio.run(main())
正确的做法:始终保存Task引用
import asyncio
async def background_task():
try:
while True:
print("后台任务运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("后台任务被取消")
raise
async def main():
# 正确:保存Task引用
task = asyncio.create_task(background_task())
await asyncio.sleep(3)
print("main完成")
# 显式取消后台任务
task.cancel()
try:
await task
except asyncio.CancelledError:
pass # 取消是预期的行为
asyncio.run(main())
使用TaskGroup自动管理任务生命周期
import asyncio
async def background_task(name):
try:
while True:
print(f"{name}运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"{name}被取消")
raise
async def main():
async with asyncio.TaskGroup() as tg:
# TaskGroup自动管理任务的引用
# 当退出async with块时,所有任务自动取消
tg.create_task(background_task("A"))
tg.create_task(background_task("B"))
await asyncio.sleep(3)
asyncio.run(main())
# 退出TaskGroup时,A和B自动被取消并等待完成
核心教训: (1)永远不要忘记await协程调用——在调用异步函数的地方检查是否有await;(2)asyncio.create_task()返回的Task对象必须保存引用,否则会被垃圾回收;(3)使用asyncio.TaskGroup自动管理任务生命周期;(4)在开发环境中启用-W error::RuntimeWarning将未await的警告转为错误,防止遗漏。
七、陷阱7:进程池中的不可pickle数据
multiprocessing和concurrent.futures.ProcessPoolExecutor的底层依赖于pickle序列化来跨进程传输数据。这意味着所有传递给工作函数和从工作函数返回的数据都必须是可pickle的。许多Python对象(如lambda函数、闭包、实例方法、生成器、某些特殊对象)默认不可pickle,尝试使用它们会导致神秘的错误。
错误示例:lambda函数和闭包
from concurrent.futures import ProcessPoolExecutor
# 错误:lambda函数不可pickle
with ProcessPoolExecutor() as executor:
future = executor.submit(lambda x: x * 2, 42)
# 抛出:AttributeError: Can't pickle local object '<lambda>'
# 错误:闭包也不可pickle
def make_multiplier(factor):
return lambda x: x * factor
with ProcessPoolExecutor() as executor:
fn = make_multiplier(2)
future = executor.submit(fn, 42)
# 同样抛出AttributeError
错误示例:实例方法
from concurrent.futures import ProcessPoolExecutor
class Worker:
def __init__(self, value):
self.value = value
def compute(self, x):
return x * self.value
w = Worker(10)
with ProcessPoolExecutor() as executor:
# 错误:绑定方法(实例方法)不可pickle
future = executor.submit(w.compute, 42)
# 抛出:AttributeError: Can't pickle local object 'Worker.compute'
正确的做法:使用模块级别的函数
from concurrent.futures import ProcessPoolExecutor
# 模块级别的函数——可以pickle
def multiply(x, factor):
return x * factor
with ProcessPoolExecutor() as executor:
future = executor.submit(multiply, 42, 10)
print(future.result()) # 420
使用静态方法
from concurrent.futures import ProcessPoolExecutor
class Worker:
def __init__(self, value):
self.value = value
@staticmethod
def compute(x, factor):
"""静态方法——可以pickle"""
return x * factor
def compute_with_state(self, x):
"""如果确实需要实例状态,将状态作为参数传递"""
return Worker.compute(x, self.value)
with ProcessPoolExecutor() as executor:
# 使用静态方法,显式传递所有参数
future = executor.submit(Worker.compute, 42, 10)
print(future.result()) # 420
使用concurrent.futures的替代方案
有些现代多进程库解决了pickle的问题:
joblib :支持更多类型的序列化,包括闭包和lambda
loky :concurrent.futures的替代实现,改进了序列化
cloudpickle :扩展pickle,支持更多Python对象类型
import cloudpickle
from concurrent.futures import ProcessPoolExecutor
# 注册cloudpickle作为序列化器
# 或者更简单:直接用loky的序列化
# 使用cloudpickle包装闭包
def make_multiplier(factor):
def multiply(x):
return x * factor
return cloudpickle.dumps(multiply)
# 在实际项目中,推荐使用loky或者分布式框架(Ray、Dask)
# 它们已经内置了对更广泛序列化的支持
哪些对象不可pickle?
不可pickle的对象 原因 解决方案
lambda函数 没有全局名称,无法重建 使用模块级函数的def
闭包(包含自由变量的函数) 捕获的环境不可序列化 显式传递所有参数
实例方法(绑定方法) 包含对实例的引用(不可序列化) 使用静态方法或模块级函数
生成器 内部状态复杂 提前收集结果为列表
某些自定义类的实例 缺少__reduce__/__getstate__ 实现序列化协议方法
数据库连接、文件句柄 操作系统资源不可序列化 传递连接参数,在每个进程中重新创建
核心教训: (1)传递给ProcessPoolExecutor的函数必须模块级别可访问——使用def而不是lambda;(2)所有参数和返回值必须是可pickle的;(3)如果要传递复杂对象,先考虑能否简化设计,不行则使用cloudpickle或dill扩展pickle;(4)使用ThreadPoolExecutor时不存在此问题(共享内存无需序列化)。
八、最佳实践总结
以上我们讨论了Python并发编程中最常见的七个陷阱。下面将这些教训提炼为系统化的最佳实践,帮助你在项目中做出正确的并发设计决策。
1. 选择合适的并发模型
正确选择并发模型是避免陷阱的第一步。以下是决策指南:
场景 推荐方案 不推荐方案
I/O密集型(网络请求、文件读写) asyncio > 多线程 > 多进程 单线程串行
CPU密集型(计算、数据处理) 多进程 > asyncio+run_in_executor 多线程(受GIL限制)
大量短连接并发 asyncio(事件循环开销最小) 多线程(线程创建开销大)
有状态的复杂工作流 多线程(共享状态较方便) 多进程(IPC开销大)
需要真正并行计算 多进程 > 使用C扩展(NumPy等) 线程(GIL限制)
2. 最小化共享状态
并发编程中大多数问题的根源是共享的可变状态。遵循以下原则:
无共享架构 :每个并发单元持有自己的数据,通过消息传递(Queue、Channel)通信
不可变数据 :尽量使用不可变数据结构,天然线程安全
锁的范围最小化 :持有锁的时间越短越好,只保护关键代码
使用高级抽象 :优先使用Queue、Future、Barrier等高级同步原语,而不是手动管理锁
3. 使用concurrent.futures简化并发
concurrent.futures模块提供了统一的并发接口,可以在线程池和进程池之间无缝切换:
from concurrent.futures import (
ThreadPoolExecutor,
ProcessPoolExecutor,
as_completed,
wait,
FIRST_COMPLETED
)
def process_item(item):
# 处理单个项目
return item * 2
# 只需修改这一行,就能在线程和进程间切换
Executor = ThreadPoolExecutor # 或 ProcessPoolExecutor
with Executor(max_workers=4) as executor:
# 提交所有任务
futures = {executor.submit(process_item, i): i for i in range(100)}
# 按完成顺序处理结果
for future in as_completed(futures):
try:
result = future.result(timeout=5)
print(f"处理完成: {result}")
except TimeoutError:
print(f"任务超时: {futures[future]}")
except Exception as e:
print(f"任务失败: {e}")
4. 做好异常处理和资源清理
并发代码中的异常处理比串行代码更加重要,因为异常发生在独立的执行单元中,可能被吞没或传播到意想不到的地方:
总是调用future.result() :提交到Executor的Future必须调用result(),否则异常不会传播
使用try/finally或上下文管理器 :确保锁、连接、文件等资源被正确释放
处理好CancelledError :在asyncio中不要吞噬CancelledError
设置超时 :所有并发操作都应该设置合理的超时,防止永久挂起
5. 编写确定性测试
并发bug是最难重现和调试的。以下测试策略可以显著提高并发代码的质量:
import threading
import time
import unittest
class TestConcurrentCounter(unittest.TestCase):
def test_counter_without_lock_should_fail(self):
"""无锁的计数器在多线程下应该失败(演示)"""
counter = 0
def add():
nonlocal counter
for _ in range(10000):
counter += 1
threads = [threading.Thread(target=add) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# 有很大概率不是100000
self.assertNotEqual(counter, 100000,
"无同步时计数器可能偶然正确,但这不是可靠的行为")
def test_counter_with_lock_should_succeed(self):
"""加锁后的计数器应该总是正确的"""
counter = 0
lock = threading.Lock()
def add():
nonlocal counter
for _ in range(10000):
with lock:
counter += 1
threads = [threading.Thread(target=add) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(counter, 100000)
def test_asyncio_cancellation_cleanup(self):
"""验证取消时资源被正确清理"""
import asyncio
cleanup_called = False
async def task():
nonlocal cleanup_called
try:
await asyncio.sleep(10)
finally:
cleanup_called = True
async def run():
t = asyncio.create_task(task())
await asyncio.sleep(0.1)
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
self.assertTrue(cleanup_called)
asyncio.run(run())
6. 并发级别控制
不要无限制地创建线程或进程。过高的并发度会导致上下文切换开销激增,甚至耗尽系统资源。使用信号量控制并发级别是一个好习惯:
import asyncio
import aiohttp
class ConcurrencyLimiter:
"""控制并发请求数量的信号量包装器"""
def __init__(self, limit=10):
self.semaphore = asyncio.Semaphore(limit)
async def fetch(self, session, url):
async with self.semaphore: # 超过限制时自动排队
async with session.get(url) as response:
return await response.text()
async def main():
limiter = ConcurrencyLimiter(limit=20) # 最多20个并发
async with aiohttp.ClientSession() as session:
tasks = [
limiter.fetch(session, f"https://api.example.com/item/{i}")
for i in range(1000)
]
results = await asyncio.gather(*tasks)
return results
7. 监控和调试
并发问题难以调试,善用以下工具:
logging模块:添加线程/进程ID到日志,使用logging.handlers.QueueHandler实现线程安全的日志
faulthandler:在信号超时时打印所有线程的堆栈
objgraph:检测GIL相关的锁持有情况
strace/ltrace:追踪系统调用,排查I/O阻塞问题
asyncio.get_event_loop().slow_callback_duration:设置asyncio慢回调阈值,帮助发现阻塞
"并发编程很难。不是因为它复杂,而是因为非确定性的交错执行让bug难以重现。最好的防御不是更好的调试工具,而是更好的设计——最小化共享状态,使用高层次的抽象,让并发模型的选择与问题的本质匹配。"
总结对照表:陷阱与对策
陷阱 症状 对策
认为GIL允许CPU并行 多线程CPU任务反而更慢 使用多进程(ProcessPoolExecutor)
忽略共享数据同步 数据结果不一致、难以重现的bug 使用Lock或Queue,避免共享可变状态
锁顺序不一致 程序永久卡死 固定锁获取顺序、加锁超时
asyncio中阻塞调用 协程无并发效果 使用异步库或run_in_executor
未处理CancelledError 任务无法取消、资源泄漏 捕获后重新抛出,finally块做清理
协程未await RuntimeWarning、资源泄漏 检查所有协程调用是否await,保存Task引用
不可pickle数据 序列化错误 使用模块级函数,避免lambda/闭包