并发编程常见陷阱与最佳实践

Python并发编程专题 · 避开并发编程的深坑,编写健壮的并行代码

专题:Python并发编程系统学习

关键词:Python, 并发编程, 并发陷阱, 竞态条件, 最佳实践, 线程安全, GIL误解

一、陷阱1:认为Python线程可以实现CPU并行

这是Python开发者进入并发领域时最容易犯的错误。许多人听说Python支持多线程编程,就自然地认为创建多个线程就能利用多核CPU同时执行计算密集型任务获得加速。但事实恰恰相反——由于全局解释器锁(GIL)的存在,Python线程在CPU密集型任务上不仅不能并行,甚至可能比串行更慢。

GIL的本质

CPython解释器的内存管理不是线程安全的,因此设计了一把大锁——GIL,确保同一时刻只有一个线程在执行Python字节码。这意味着无论你有多少CPU核心,Python线程永远无法真正并行执行CPU计算。GIL在每个线程执行一定数量的字节码指令后会主动释放(每100个ticks),但即便如此,多个线程仍然是在轮流占用一个核心,而不是分散到多个核心上。

import time import threading def cpu_heavy(n): """CPU密集型任务:计算斐波那契数列""" if n <= 1: return n return cpu_heavy(n - 1) + cpu_heavy(n - 2) # 串行执行 start = time.time() for _ in range(4): cpu_heavy(35) print(f"串行执行耗时: {time.time() - start:.2f}s") # 多线程执行 start = time.time() threads = [] for _ in range(4): t = threading.Thread(target=cpu_heavy, args=(35,)) threads.append(t) t.start() for t in threads: t.join() print(f"多线程执行耗时: {time.time() - start:.2f}s")

运行这段代码你会发现,多线程版本不仅没有加速,反而比串行更慢(线程切换带来了额外开销)。这就是GIL对CPU密集型任务造成的典型影响。

正确的做法:使用多进程

对于CPU密集型任务,应该使用multiprocessing模块或concurrent.futures.ProcessPoolExecutor。每个进程拥有独立的Python解释器和GIL,可以真正并行运行在多核上。

from concurrent.futures import ProcessPoolExecutor import time def cpu_heavy(n): if n <= 1: return n return cpu_heavy(n - 1) + cpu_heavy(n - 2) start = time.time() with ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(cpu_heavy, 35) for _ in range(4)] results = [f.result() for f in futures] print(f"多进程执行耗时: {time.time() - start:.2f}s")

多进程版本能够利用所有CPU核心,理论上可以获得接近核心数的线性加速比。但需要注意:进程间通信(IPC)的开销远大于线程间通信,不适合需要频繁共享数据的场景。

什么时候该用多线程

多线程在Python中并非一无是处。对于I/O密集型任务(网络请求、文件读写、数据库查询),线程在等待I/O时会释放GIL,因此多线程可以显著提升吞吐量。总结来说:

核心教训:不要用多线程加速CPU密集型计算。对于I/O密集型任务使用多线程或asyncio,对于CPU密集型任务使用多进程。理解GIL不是bug而是CPython的设计决策(为了保持C扩展的兼容性)。

二、陷阱2:忽略共享数据的同步

即使知道了GIL的存在,许多开发者仍然认为Python中的简单操作是线程安全的——毕竟GIL保证了一时刻只有一个线程执行字节码。但问题在于,Python中的一行代码(如count += 1)实际上对应了多条字节码指令,线程可能在执行中途被切换,导致数据竞争。

问题重现

import threading counter = 0 def increment(): global counter for _ in range(100000): # 问题所在:counter += 1 不是原子操作 # 它实际上对应:LOAD counter → ADD 1 → STORE counter # 线程可能在这三条指令之间被切换 counter += 1 threads = [threading.Thread(target=increment) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(f"期望结果: 1000000, 实际结果: {counter}") # 每次运行结果都可能不同,很可能远小于1000000

即使有GIL的存在,counter += 1也不是原子操作。两个线程可能同时读到相同的counter值,分别加1后写回,导致一次递增被覆盖。GIL只保证字节码级别的原子性,不保证Python语句级别的原子性。

正确的做法:使用锁保护共享数据

import threading counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # 确保同一时刻只有一个线程修改counter counter += 1 threads = [threading.Thread(target=increment) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(f"加锁后的结果: {counter}") # 现在结果正确:1000000

更好的方案:使用队列和避免共享状态

锁虽然可以解决问题,但引入了死锁、性能下降等风险。更好的并发设计原则是"不要共享状态"。使用queue.Queue在线程间传递数据,而不是直接操作共享变量。

import threading import queue def worker(task_queue, result_queue): """工作线程从任务队列取任务,结果放入结果队列""" while True: item = task_queue.get() if item is None: # 哨兵值,表示结束 task_queue.task_done() break # 处理任务 result_queue.put(item * 2) task_queue.task_done() task_q = queue.Queue() result_q = queue.Queue() # 启动工作线程 threads = [threading.Thread(target=worker, args=(task_q, result_q)) for _ in range(4)] for t in threads: t.start() # 放入任务 for i in range(100): task_q.put(i) # 等待所有任务完成 task_q.join() # 结束工作线程 for _ in threads: task_q.put(None) for t in threads: t.join() # 收集结果 while not result_q.empty(): print(result_q.get())

核心教训:GIL不能替代显式同步。修改共享变量必须使用Lock保护。最佳实践是使用Queue等线程安全的数据结构传递数据而非共享状态。如果必须共享状态,使用threading模块提供的同步原语(Lock、RLock、Semaphore、Event等)。

三、陷阱3:锁的顺序不一致导致死锁

当多个线程需要同时持有多个锁时,如果每个线程获取锁的顺序不一致,就可能发生死锁——所有线程都在等待对方释放锁,程序永久停滞。这是并发编程中最经典也最难以调试的问题之一。

死锁的经典场景

import threading import time lock_a = threading.Lock() lock_b = threading.Lock() def thread_1(): with lock_a: print("线程1获取了锁A") time.sleep(0.1) # 模拟一些工作 with lock_b: print("线程1获取了锁B") def thread_2(): with lock_b: print("线程2获取了锁B") time.sleep(0.1) # 模拟一些工作 with lock_a: print("线程2获取了锁A") t1 = threading.Thread(target=thread_1) t2 = threading.Thread(target=thread_2) t1.start() t2.start() # 程序很可能永远卡在这里,两个线程互相等待

线程1持有锁A等待锁B,线程2持有锁B等待锁A,形成经典的"循环等待"死锁条件。程序将永久挂起,不会抛出异常,很难排查。

解决方案一:固定锁获取顺序

最简单的解决方案是确保所有线程以相同的顺序获取锁。如果所有线程都先获取锁A再获取锁B,则永远不会出现循环等待。

def thread_1(): with lock_a: print("线程1获取了锁A") time.sleep(0.1) with lock_b: print("线程1获取了锁B") def thread_2(): # 同样先获取锁A,再获取锁B with lock_a: # 与thread_1保持一致的顺序 print("线程2获取了锁A") time.sleep(0.1) with lock_b: print("线程2获取了锁B")

解决方案二:分级锁策略

为每个锁分配一个全局唯一的编号,要求所有线程按照编号递增的顺序获取锁。这是一种系统化的方法,适用于大量锁的场景。

import threading from contextlib import contextmanager class LockManager: """分级锁管理器,确保锁按全局顺序获取""" def __init__(self): self._locks = {} self._counter = 0 self._lock = threading.Lock() def register(self, name): """注册一个新锁,分配全局唯一编号""" with self._lock: self._counter += 1 lock_id = self._counter self._locks[name] = (lock_id, threading.Lock()) return name @contextmanager def acquire(self, *names): """按照编号顺序获取多个锁""" sorted_names = sorted(names, key=lambda n: self._locks[n][0]) locks = [self._locks[n][1] for n in sorted_names] # 依次获取(避免嵌套with的缩进地狱) for lock in locks: lock.acquire() try: yield finally: # 逆序释放 for lock in reversed(locks): lock.release() lm = LockManager() a = lm.register("A") b = lm.register("B") def safe_thread_1(): with lm.acquire(a, b): print("线程1安全地持有A和B") def safe_thread_2(): with lm.acquire(a, b): print("线程2安全地持有A和B")

解决方案三:使用超时和重试

使用lock.acquire(timeout=...)方法,在无法获取锁时主动放弃已持有的锁,避免永久等待。

def thread_with_timeout(): if not lock_a.acquire(timeout=1): print("获取锁A超时,放弃操作") return try: time.sleep(0.1) if not lock_b.acquire(timeout=1): print("获取锁B超时,释放锁A后重试") lock_a.release() return thread_with_timeout() # 重试 try: print("成功获取两把锁") finally: lock_b.release() finally: lock_a.release()

核心教训:避免死锁的三条黄金法则:(1)固定锁获取顺序——所有线程以相同顺序获取锁;(2)使用超时机制——不无限等待;(3)最小化锁的持有时间——只持有锁做必要操作,尽快释放。条件允许时,尽量使用Queue等无需显式锁定的抽象。

四、陷阱4:asyncio中的阻塞调用

asyncio的核心理念是单线程事件循环驱动的协作式并发。其高效的前提是所有任务都主动让出控制权(通过await挂起)。如果在协程中调用了同步阻塞的I/O操作(如time.sleep()requests.get()open().read()),整个事件循环会被阻塞,所有其他协程都无法运行。

错误的做法:在协程中调用同步阻塞函数

import asyncio import time import requests async def fetch_data(url): # 严重错误:requests.get() 是同步阻塞的 # 当它执行时,整个事件循环被阻塞 response = requests.get(url) return response.text async def main(): # 创建多个任务,但它们会顺序执行! # 因为每个协程中的requests.get都会阻塞事件循环 tasks = [ fetch_data("https://example.com/api/1"), fetch_data("https://example.com/api/2"), fetch_data("https://example.com/api/3"), ] results = await asyncio.gather(*tasks) return results asyncio.run(main()) # 结果是:三个请求依次执行,完全没有并发效果

上述代码中,requests.get()是一个同步阻塞调用。当第一个协程调用它时,事件循环被阻塞,其他协程无法运行。直到第一个请求完成,控制权才返回给事件循环。因此asyncio.gather()完全失去了并发优势,三个请求是顺序执行的。

解决方法一:使用异步库

首选方案是使用原生支持asyncio的HTTP库,如aiohttphttpx(支持async模式)。这些库在等待网络I/O时会主动await,让事件循环运行其他任务。

import asyncio import aiohttp async def fetch_data(session, url): # aiohttp 的请求是异步非阻塞的 async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: tasks = [ fetch_data(session, "https://example.com/api/1"), fetch_data(session, "https://example.com/api/2"), fetch_data(session, "https://example.com/api/3"), ] results = await asyncio.gather(*tasks) return results asyncio.run(main()) # 现在三个请求真正并发执行,总耗时接近最慢的那个请求

解决方法二:使用run_in_executor

如果无法替换同步库,可以使用loop.run_in_executor()将阻塞操作交给线程池处理,避免阻塞事件循环。

import asyncio import requests async def fetch_data(url): loop = asyncio.get_event_loop() # 将同步的requests.get交给线程池执行 response = await loop.run_in_executor( None, # None使用默认线程池 requests.get, url ) return response.text async def main(): tasks = [ fetch_data("https://example.com/api/1"), fetch_data("https://example.com/api/2"), fetch_data("https://example.com/api/3"), ] results = await asyncio.gather(*tasks) return results

需要注意的其他阻塞操作

核心教训:在asyncio代码中,任何阻塞调用都会破坏并发性。一律使用异步替代库;没有异步替代的同步操作使用run_in_executor托管到线程池或进程池。永远不要在协程中直接调用time.sleep()、同步I/O库或CPU密集型计算。

五、陷阱5:忘记处理CancelledError

在asyncio中,任务取消是通过向协程内注入CancelledError异常实现的。许多开发者在编写协程时没有正确处理这个异常,导致资源泄漏、状态不一致等问题。更严重的是,某些代码错误地捕获并屏蔽了CancelledError,导致任务无法被取消。

问题场景:资源清理失败

import asyncio async def resource_intensive_task(): # 获取资源(如打开连接) conn = await acquire_connection() try: # 执行耗时操作 result = await conn.query("SELECT ...") return result except: # 错误的做法:捕获所有异常但不重新抛出 # 这会吞噬CancelledError,导致任务无法取消 print(f"发生错误: {e}") finally: # 如果任务被取消,finally块仍然会执行 # 但conn.close()本身也可能是异步的 await conn.close() # 问题:如果close也抛出异常怎么办?

正确的做法:catch特定的异常

import asyncio async def resource_intensive_task(): conn = None try: conn = await acquire_connection() result = await conn.query("SELECT ...") return result except asyncio.CancelledError: # 任务被取消时的特殊处理 # 可以执行额外的清理逻辑 logger.info("任务被取消,正在清理资源") raise # 重要:必须重新抛出CancelledError except Exception as e: # 只捕获非取消的异常 logger.error(f"执行出错: {e}") raise finally: if conn is not None: try: await conn.close() except Exception: pass # 关闭连接时的异常不应影响主流程

使用cancel_shielded_scope保护关键操作

有时我们需要确保某些关键操作(如清理、提交事务)不被取消。可以使用asyncio.shield()保护这些操作。

import asyncio async def safe_task(): conn = await acquire_connection() try: result = await conn.query("SELECT ...") return result finally: # 确保连接关闭不被取消中断 try: await asyncio.shield(conn.close()) except asyncio.CancelledError: # shield只能保护第一层取消 # 如果close本身耗时过长,内层仍可能被取消 # 这里需要处理第一层取消后,确保close最终被执行 await conn.close() # 再次尝试

Python 3.9+ 的TaskGroup中的取消处理

Python 3.11引入的TaskGroupTimeout使得取消语义更加严格。在TaskGroup中,如果任何一个子任务抛出异常,所有其他子任务都会自动被取消。

import asyncio async def worker(name, sleep_time): try: await asyncio.sleep(sleep_time) return f"{name}完成" except asyncio.CancelledError: print(f"{name}被取消了") raise # 必须重新抛出,否则TaskGroup无法感知 async def main(): try: async with asyncio.TaskGroup() as tg: t1 = tg.create_task(worker("A", 1)) t2 = tg.create_task(worker("B", 2)) t3 = tg.create_task(worker("C", 3)) # 如果t1失败,t2和t3会被自动取消 except ExceptionGroup: print("一组任务发生了错误") asyncio.run(main())

核心教训:(1)永远不要用except:except Exception:捕获所有异常——这会屏蔽CancelledError;(2)捕获CancelledError后必须重新抛出,除非你有非常充分的理由;(3)用asyncio.shield()保护关键的清理操作;(4)在finally块中做资源清理,确保无论如何都能释放资源。

六、陷阱6:协程未await导致资源泄漏

这是一个非常隐蔽的错误。当你调用一个异步函数但不await它时,你得到的不是一个执行结果,而是一个协程对象。这个协程对象如果未被await或包装为Task,在垃圾回收时会触发RuntimeWarning。更严重的是,协程内部持有的资源永远不会被释放。

问题重现

import asyncio async def open_connection(): # 模拟打开一个网络连接 print("连接已打开") # 这里的代码永远不会执行,因为协程没有被await await asyncio.sleep(1) print("连接已关闭") async def main(): # 错误:忘记await open_connection() # 协程对象在函数返回时被垃圾回收 # 你会看到:RuntimeWarning: coroutine 'open_connection' was never awaited # 并且"连接已打开"实际上仍然会打印(协程开始执行了), # 但后续的代码不会执行,连接资源泄漏 asyncio.run(main())

运行上述代码,Python会发出警告:

RuntimeWarning: coroutine 'open_connection' was never awaited

更隐蔽的变体:创建Task但不保存引用

import asyncio async def background_task(): while True: print("后台任务运行中...") await asyncio.sleep(1) async def main(): # 错误:创建的Task没有保存引用 # Task被创建后立即被垃圾回收 asyncio.create_task(background_task()) # 等待一会儿 await asyncio.sleep(3) print("main完成") # 注意:background_task可能不会执行完 # 因为它的Task对象被GC了 asyncio.run(main())

正确的做法:始终保存Task引用

import asyncio async def background_task(): try: while True: print("后台任务运行中...") await asyncio.sleep(1) except asyncio.CancelledError: print("后台任务被取消") raise async def main(): # 正确:保存Task引用 task = asyncio.create_task(background_task()) await asyncio.sleep(3) print("main完成") # 显式取消后台任务 task.cancel() try: await task except asyncio.CancelledError: pass # 取消是预期的行为 asyncio.run(main())

使用TaskGroup自动管理任务生命周期

import asyncio async def background_task(name): try: while True: print(f"{name}运行中...") await asyncio.sleep(1) except asyncio.CancelledError: print(f"{name}被取消") raise async def main(): async with asyncio.TaskGroup() as tg: # TaskGroup自动管理任务的引用 # 当退出async with块时,所有任务自动取消 tg.create_task(background_task("A")) tg.create_task(background_task("B")) await asyncio.sleep(3) asyncio.run(main()) # 退出TaskGroup时,A和B自动被取消并等待完成

核心教训:(1)永远不要忘记await协程调用——在调用异步函数的地方检查是否有await;(2)asyncio.create_task()返回的Task对象必须保存引用,否则会被垃圾回收;(3)使用asyncio.TaskGroup自动管理任务生命周期;(4)在开发环境中启用-W error::RuntimeWarning将未await的警告转为错误,防止遗漏。

七、陷阱7:进程池中的不可pickle数据

multiprocessingconcurrent.futures.ProcessPoolExecutor的底层依赖于pickle序列化来跨进程传输数据。这意味着所有传递给工作函数和从工作函数返回的数据都必须是可pickle的。许多Python对象(如lambda函数、闭包、实例方法、生成器、某些特殊对象)默认不可pickle,尝试使用它们会导致神秘的错误。

错误示例:lambda函数和闭包

from concurrent.futures import ProcessPoolExecutor # 错误:lambda函数不可pickle with ProcessPoolExecutor() as executor: future = executor.submit(lambda x: x * 2, 42) # 抛出:AttributeError: Can't pickle local object '<lambda>' # 错误:闭包也不可pickle def make_multiplier(factor): return lambda x: x * factor with ProcessPoolExecutor() as executor: fn = make_multiplier(2) future = executor.submit(fn, 42) # 同样抛出AttributeError

错误示例:实例方法

from concurrent.futures import ProcessPoolExecutor class Worker: def __init__(self, value): self.value = value def compute(self, x): return x * self.value w = Worker(10) with ProcessPoolExecutor() as executor: # 错误:绑定方法(实例方法)不可pickle future = executor.submit(w.compute, 42) # 抛出:AttributeError: Can't pickle local object 'Worker.compute'

正确的做法:使用模块级别的函数

from concurrent.futures import ProcessPoolExecutor # 模块级别的函数——可以pickle def multiply(x, factor): return x * factor with ProcessPoolExecutor() as executor: future = executor.submit(multiply, 42, 10) print(future.result()) # 420

使用静态方法

from concurrent.futures import ProcessPoolExecutor class Worker: def __init__(self, value): self.value = value @staticmethod def compute(x, factor): """静态方法——可以pickle""" return x * factor def compute_with_state(self, x): """如果确实需要实例状态,将状态作为参数传递""" return Worker.compute(x, self.value) with ProcessPoolExecutor() as executor: # 使用静态方法,显式传递所有参数 future = executor.submit(Worker.compute, 42, 10) print(future.result()) # 420

使用concurrent.futures的替代方案

有些现代多进程库解决了pickle的问题:

import cloudpickle from concurrent.futures import ProcessPoolExecutor # 注册cloudpickle作为序列化器 # 或者更简单:直接用loky的序列化 # 使用cloudpickle包装闭包 def make_multiplier(factor): def multiply(x): return x * factor return cloudpickle.dumps(multiply) # 在实际项目中,推荐使用loky或者分布式框架(Ray、Dask) # 它们已经内置了对更广泛序列化的支持

哪些对象不可pickle?

不可pickle的对象原因解决方案
lambda函数没有全局名称,无法重建使用模块级函数的def
闭包(包含自由变量的函数)捕获的环境不可序列化显式传递所有参数
实例方法(绑定方法)包含对实例的引用(不可序列化)使用静态方法或模块级函数
生成器内部状态复杂提前收集结果为列表
某些自定义类的实例缺少__reduce__/__getstate__实现序列化协议方法
数据库连接、文件句柄操作系统资源不可序列化传递连接参数,在每个进程中重新创建

核心教训:(1)传递给ProcessPoolExecutor的函数必须模块级别可访问——使用def而不是lambda;(2)所有参数和返回值必须是可pickle的;(3)如果要传递复杂对象,先考虑能否简化设计,不行则使用cloudpickledill扩展pickle;(4)使用ThreadPoolExecutor时不存在此问题(共享内存无需序列化)。

八、最佳实践总结

以上我们讨论了Python并发编程中最常见的七个陷阱。下面将这些教训提炼为系统化的最佳实践,帮助你在项目中做出正确的并发设计决策。

1. 选择合适的并发模型

正确选择并发模型是避免陷阱的第一步。以下是决策指南:

场景推荐方案不推荐方案
I/O密集型(网络请求、文件读写)asyncio > 多线程 > 多进程单线程串行
CPU密集型(计算、数据处理)多进程 > asyncio+run_in_executor多线程(受GIL限制)
大量短连接并发asyncio(事件循环开销最小)多线程(线程创建开销大)
有状态的复杂工作流多线程(共享状态较方便)多进程(IPC开销大)
需要真正并行计算多进程 > 使用C扩展(NumPy等)线程(GIL限制)

2. 最小化共享状态

并发编程中大多数问题的根源是共享的可变状态。遵循以下原则:

3. 使用concurrent.futures简化并发

concurrent.futures模块提供了统一的并发接口,可以在线程池和进程池之间无缝切换:

from concurrent.futures import ( ThreadPoolExecutor, ProcessPoolExecutor, as_completed, wait, FIRST_COMPLETED ) def process_item(item): # 处理单个项目 return item * 2 # 只需修改这一行,就能在线程和进程间切换 Executor = ThreadPoolExecutor # 或 ProcessPoolExecutor with Executor(max_workers=4) as executor: # 提交所有任务 futures = {executor.submit(process_item, i): i for i in range(100)} # 按完成顺序处理结果 for future in as_completed(futures): try: result = future.result(timeout=5) print(f"处理完成: {result}") except TimeoutError: print(f"任务超时: {futures[future]}") except Exception as e: print(f"任务失败: {e}")

4. 做好异常处理和资源清理

并发代码中的异常处理比串行代码更加重要,因为异常发生在独立的执行单元中,可能被吞没或传播到意想不到的地方:

5. 编写确定性测试

并发bug是最难重现和调试的。以下测试策略可以显著提高并发代码的质量:

import threading import time import unittest class TestConcurrentCounter(unittest.TestCase): def test_counter_without_lock_should_fail(self): """无锁的计数器在多线程下应该失败(演示)""" counter = 0 def add(): nonlocal counter for _ in range(10000): counter += 1 threads = [threading.Thread(target=add) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() # 有很大概率不是100000 self.assertNotEqual(counter, 100000, "无同步时计数器可能偶然正确,但这不是可靠的行为") def test_counter_with_lock_should_succeed(self): """加锁后的计数器应该总是正确的""" counter = 0 lock = threading.Lock() def add(): nonlocal counter for _ in range(10000): with lock: counter += 1 threads = [threading.Thread(target=add) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() self.assertEqual(counter, 100000) def test_asyncio_cancellation_cleanup(self): """验证取消时资源被正确清理""" import asyncio cleanup_called = False async def task(): nonlocal cleanup_called try: await asyncio.sleep(10) finally: cleanup_called = True async def run(): t = asyncio.create_task(task()) await asyncio.sleep(0.1) t.cancel() try: await t except asyncio.CancelledError: pass self.assertTrue(cleanup_called) asyncio.run(run())

6. 并发级别控制

不要无限制地创建线程或进程。过高的并发度会导致上下文切换开销激增,甚至耗尽系统资源。使用信号量控制并发级别是一个好习惯:

import asyncio import aiohttp class ConcurrencyLimiter: """控制并发请求数量的信号量包装器""" def __init__(self, limit=10): self.semaphore = asyncio.Semaphore(limit) async def fetch(self, session, url): async with self.semaphore: # 超过限制时自动排队 async with session.get(url) as response: return await response.text() async def main(): limiter = ConcurrencyLimiter(limit=20) # 最多20个并发 async with aiohttp.ClientSession() as session: tasks = [ limiter.fetch(session, f"https://api.example.com/item/{i}") for i in range(1000) ] results = await asyncio.gather(*tasks) return results

7. 监控和调试

并发问题难以调试,善用以下工具:

"并发编程很难。不是因为它复杂,而是因为非确定性的交错执行让bug难以重现。最好的防御不是更好的调试工具,而是更好的设计——最小化共享状态,使用高层次的抽象,让并发模型的选择与问题的本质匹配。"

总结对照表:陷阱与对策

陷阱症状对策
认为GIL允许CPU并行多线程CPU任务反而更慢使用多进程(ProcessPoolExecutor)
忽略共享数据同步数据结果不一致、难以重现的bug使用Lock或Queue,避免共享可变状态
锁顺序不一致程序永久卡死固定锁获取顺序、加锁超时
asyncio中阻塞调用协程无并发效果使用异步库或run_in_executor
未处理CancelledError任务无法取消、资源泄漏捕获后重新抛出,finally块做清理
协程未awaitRuntimeWarning、资源泄漏检查所有协程调用是否await,保存Task引用
不可pickle数据序列化错误使用模块级函数,避免lambda/闭包