专题:Python并发编程系统学习
关键词:Python, 并发编程, asyncio, Lock, Semaphore, Condition, 同步原语, 协程同步
许多开发者误以为协程在同一线程中运行、不存在真正的并行执行,因此不需要同步机制。这个观点是错误的。协程虽然是单线程执行,但在 await 切换点处,事件循环可能会暂停当前协程并切换到另一个协程运行。如果多个协程共享同一份可变数据(如全局变量、对象属性、文件句柄),在 await 切换的间隙中就可能发生竞态条件(race condition)。
考虑一个简单的例子:两个协程同时对一个共享计数器执行 counter += 1。这个操作在 Python 字节码层面不是原子的——它包含读取、计算和写入三步。如果一个协程在读取之后、写入之前被切换出去,另一个协程也读取到同样的旧值,最终两个协程各自加一的结果仅仅是递增了一次,导致数据丢失。
asyncio 提供了一套完整的同步原语(synchronization primitives),它们与 threading 模块中的同步原语名称相同、语义相似,但行为有本质区别:asyncio 原语在等待时不会阻塞线程,而是让出控制权回到事件循环。这使得我们可以在不引入线程阻塞的前提下保护共享资源。
核心认知:单线程下仍然需要同步原语——同步保护的是"协程切换点"上的数据完整性,而非真正的并行执行。
asyncio.Lock 是最基础的同步原语,用于保护临界区,确保同一时刻只有一个协程访问共享资源。它的接口与 threading.Lock 非常相似,但 acquire() 方法是一个协程函数,在锁被持有时会 await 让出控制权而非阻塞线程。
推荐的使用方式是 async with lock 上下文管理器,它会在进入时等待获取锁,退出时自动释放锁。也可以使用底层的 await lock.acquire() 和 lock.release() 手动管理,但后者更容易遗漏 release 调用。
Lock 对象有一个重要的注意事项:asyncio.Lock 不是"可重入"的(reentrant),默认不支持嵌套获取。如果一个协程在已经持有锁的情况下再次 await lock.acquire(),会产生死锁。如果需要可重入行为,应使用 asyncio.Lock 的替代方案或自行设计标识位。
Lock 还支持 locked() 方法来查询当前是否被持有,这在日志和调试中非常有用。
asyncio.Event 实现了一个简单的信号量通知机制,用于让一个或多个协程等待某个事件的发生。它内部维护了一个布尔标志位(初始为 False),通过 set() 设置为 True,clear() 重置为 False,wait() 在标志位为 False 时阻塞(挂起协程,而非阻塞线程)。
与 threading.Event 的关键区别在于:await event.wait() 不会阻塞线程——挂起的协程会回到事件循环调度其他任务,当事件被 set 时,等待的协程被唤醒继续执行。
Event 的典型应用场景包括:服务启动就绪通知、工作协程优雅关闭信号、多阶段任务之间的阶段切换协调。需要注意的是,Event 是一次性的——如果需要反复通知,需要在每次 wait() 返回后调用 clear(),但在多协程环境中由于竞态条件,这种反复使用需要格外小心。
asyncio.Semaphore 管理着一个内部计数器,控制同时访问某资源的协程数量。当计数器大于零时 acquire() 立即成功并将计数器减一;计数器为零时 acquire() 阻塞等待有协程 release() 释放。
信号量最常见的用途是实现限流器(rate limiter),控制并发网络请求的数量,防止对远程服务造成过大压力。例如,采集网页数据时需要限制同时打开的连接数,Semaphore 是理想的工具。
asyncio.Semaphore 有两个变体值得注意:
asyncio.Semaphore(value):标准信号量,value 为最大并发数,默认为 1(此时等价于 Lock)。asyncio.BoundedSemaphore(value):有界信号量,确保 release() 的次数不会超过 acquire() 的次数,防止计数器越界。在大多数应用中推荐使用 BoundedSemaphore,它能在编程错误时提前暴露问题。使用信号量时要注意:应尽量让临界区代码简短快速,避免在持有信号量时执行长时间的阻塞操作(如同步 I/O),因为这样会降低整体并发效率。
asyncio.Condition 是更高级的同步原语,实现了"等待/通知"(wait/notify)模式。它内部封装了一个 Lock,并提供 wait()、notify()、notify_all() 方法。在调用 wait() 时,协程会释放锁并挂起等待;被 notify() 唤醒后重新获取锁再继续执行。
Condition 是实现异步生产者-消费者模式的标准工具。生产者生成数据后调用 notify() 唤醒等待的消费者;消费者在队列为空时调用 wait() 挂起自己。
使用 Condition 时需要牢记的关键点:
wait() 必须在持有锁的情况下调用(即在 async with cond: 块内)。wait() 返回后不能假定条件已满足——建议在循环中检查条件,因为可能存在虚假唤醒(spurious wakeup)或通知被多个竞争协程截获。notify() 只唤醒一个等待协程,notify_all() 唤醒所有等待协程。在多个同类消费者场景中 notify() 更高效;在广播状态变更时使用 notify_all()。在 Python 3.9+ 中,asyncio.Condition 的性能有显著优化。在大多数实际场景中,如果只是简单的生产者-消费者模式,也可以考虑使用 asyncio.Queue 替代 Condition,因为 Queue 内部已经封装了完整的同步逻辑。
asyncio.Barrier 是 Python 3.11 新增的同步原语,实现了一个同步屏障(sync barrier)机制。它创建一个屏障,指定需要等待的协程数量(parties)。当足够数量的协程都到达屏障点时,它们才会被同时释放继续执行。
Barrier 的主要特性包括:
wait(timeout) 支持超时参数,如果超时仍未集齐所有协程,屏障会进入 broken 状态,抛出 BrokenBarrierError。action 参数指定一个回调,当所有协程到达屏障时由最后一个到达的协程执行此回调。barrier.broken 检查屏障是否已损坏,barrier.n_waiting 查询当前等待的协程数,barrier.parties 获取总参与数。Barrier 非常适合分布式计算中的同步阶段、机器学习训练中的梯度同步、以及多阶段任务编排等场景。
虽然 asyncio 同步原语与 threading 同步原语的名字和基本语义高度一致,但它们有本质区别。以下是关键对比:
| 对比维度 | threading 同步原语 | asyncio 同步原语 |
|---|---|---|
| 等待行为 | 阻塞线程(OS 线程上下文切换,重量级) | await 挂起协程(事件循环内切换,轻量级) |
| 适用并发模型 | 多线程并行 | 单线程协程并发 |
| 锁传递 | 支持锁在不同线程间传递(需谨慎) | 不需要锁占有传递(协程在同一线程) |
| 上下文管理器 | with lock: |
async with lock: |
| GIL 影响 | GIL 保护原子操作,但 I/O 时释放 GIL | 不存在 GIL 问题(单线程) |
| 锁重入 | threading.RLock 支持可重入 | asyncio.Lock 不支持重入(需自行防范死锁) |
| 超时机制 | acquire(timeout) 支持超时 | 可配合 asyncio.wait_for() 实现超时 |
| 适用场景 | CPU 密集型 + I/O 密集型(多核利用) | 高并发 I/O 密集型(大量连接、请求) |
核心区别总结:threading 原语等待时阻塞线程(OS 调度器介入),asyncio 原语等待时让出控制权给事件循环(协程调度器介入)。前者是抢占式多任务,后者是协作式多任务。在 asyncio 中没有"锁传递"的概念,因为所有协程在同一个线程中运行,锁的持有者天然就是当前协程,不存在另一个线程强行获取锁的情况。
另一个重要的实践区别:在 threading 中我们通常避免在持有锁时执行 I/O 操作(因为会阻塞整个线程),但在 asyncio 中,持有锁时 await 进行 I/O 操作是完全安全且常见的——await 会挂起当前协程但不会阻塞事件循环,因此不会导致死锁(前提是临界区内的 await 操作不会再次尝试获取同一个锁)。
最佳实践:
async with 上下文管理器,避免手动 acquire/release 遗漏导致的死锁。asyncio.Queue,它内置了完整的同步逻辑,比 Condition 更容易使用。loop.call_soon_threadsafe()。常见陷阱:
wait() 返回后条件可能尚未满足,必须用 while 循环重新检查。核心要点回顾:
1. asyncio 同步原语保护的是协程切换点上的数据完整性,而非并行执行。
2. Lock 保护临界区,确保互斥访问共享资源。
3. Event 实现一对多的信号通知,一次性使用。
4. Semaphore/BoundedSemaphore 控制并发访问数量,是实现限流器的核心工具。
5. Condition 实现等待/通知模式,适用于异步生产者-消费者。
6. Barrier(3.11+)实现多阶段同步屏障,适用于分阶段并行计算。
7. asyncio 原语本质是协作式多任务的同步工具,与 threading 的抢占式同步有根本区别。
8. 合理选择同步原语可以显著降低代码复杂度,提升可维护性。