asyncio同步原语:Lock/Event/Semaphore/Condition

Python并发编程专题 · 协程间的同步与互斥

专题:Python并发编程系统学习

关键词:Python, 并发编程, asyncio, Lock, Semaphore, Condition, 同步原语, 协程同步

一、协程间同步的必要性

许多开发者误以为协程在同一线程中运行、不存在真正的并行执行,因此不需要同步机制。这个观点是错误的。协程虽然是单线程执行,但在 await 切换点处,事件循环可能会暂停当前协程并切换到另一个协程运行。如果多个协程共享同一份可变数据(如全局变量、对象属性、文件句柄),在 await 切换的间隙中就可能发生竞态条件(race condition)。

考虑一个简单的例子:两个协程同时对一个共享计数器执行 counter += 1。这个操作在 Python 字节码层面不是原子的——它包含读取、计算和写入三步。如果一个协程在读取之后、写入之前被切换出去,另一个协程也读取到同样的旧值,最终两个协程各自加一的结果仅仅是递增了一次,导致数据丢失。

asyncio 提供了一套完整的同步原语(synchronization primitives),它们与 threading 模块中的同步原语名称相同、语义相似,但行为有本质区别:asyncio 原语在等待时不会阻塞线程,而是让出控制权回到事件循环。这使得我们可以在不引入线程阻塞的前提下保护共享资源。

核心认知:单线程下仍然需要同步原语——同步保护的是"协程切换点"上的数据完整性,而非真正的并行执行。

二、asyncio.Lock

asyncio.Lock 是最基础的同步原语,用于保护临界区,确保同一时刻只有一个协程访问共享资源。它的接口与 threading.Lock 非常相似,但 acquire() 方法是一个协程函数,在锁被持有时会 await 让出控制权而非阻塞线程。

推荐的使用方式是 async with lock 上下文管理器,它会在进入时等待获取锁,退出时自动释放锁。也可以使用底层的 await lock.acquire()lock.release() 手动管理,但后者更容易遗漏 release 调用。

import asyncio lock = asyncio.Lock() async def access_shared(): async with lock: # 临界区:同一时刻只有一个协程能执行这段代码 await asyncio.sleep(0.1) # 操作共享资源... shared_counter += 1

Lock 对象有一个重要的注意事项:asyncio.Lock 不是"可重入"的(reentrant),默认不支持嵌套获取。如果一个协程在已经持有锁的情况下再次 await lock.acquire(),会产生死锁。如果需要可重入行为,应使用 asyncio.Lock 的替代方案或自行设计标识位。

Lock 还支持 locked() 方法来查询当前是否被持有,这在日志和调试中非常有用。

三、asyncio.Event

asyncio.Event 实现了一个简单的信号量通知机制,用于让一个或多个协程等待某个事件的发生。它内部维护了一个布尔标志位(初始为 False),通过 set() 设置为 Trueclear() 重置为 Falsewait() 在标志位为 False 时阻塞(挂起协程,而非阻塞线程)。

threading.Event 的关键区别在于:await event.wait() 不会阻塞线程——挂起的协程会回到事件循环调度其他任务,当事件被 set 时,等待的协程被唤醒继续执行。

async def waiter(event, name): print(f"{name} 等待事件...") await event.wait() print(f"{name} 收到事件,继续执行") async def setter(event): await asyncio.sleep(2) event.set() print("事件已触发")

Event 的典型应用场景包括:服务启动就绪通知、工作协程优雅关闭信号、多阶段任务之间的阶段切换协调。需要注意的是,Event 是一次性的——如果需要反复通知,需要在每次 wait() 返回后调用 clear(),但在多协程环境中由于竞态条件,这种反复使用需要格外小心。

四、asyncio.Semaphore

asyncio.Semaphore 管理着一个内部计数器,控制同时访问某资源的协程数量。当计数器大于零时 acquire() 立即成功并将计数器减一;计数器为零时 acquire() 阻塞等待有协程 release() 释放。

信号量最常见的用途是实现限流器(rate limiter),控制并发网络请求的数量,防止对远程服务造成过大压力。例如,采集网页数据时需要限制同时打开的连接数,Semaphore 是理想的工具。

sem = asyncio.Semaphore(10) async def fetch_url(url): async with sem: return await fetch(url) # 同时启动 100 个任务,但最多 10 个并发 tasks = [fetch_url(url) for url in urls] results = await asyncio.gather(*tasks)

asyncio.Semaphore 有两个变体值得注意:

使用信号量时要注意:应尽量让临界区代码简短快速,避免在持有信号量时执行长时间的阻塞操作(如同步 I/O),因为这样会降低整体并发效率。

五、asyncio.Condition

asyncio.Condition 是更高级的同步原语,实现了"等待/通知"(wait/notify)模式。它内部封装了一个 Lock,并提供 wait()notify()notify_all() 方法。在调用 wait() 时,协程会释放锁并挂起等待;被 notify() 唤醒后重新获取锁再继续执行。

Condition 是实现异步生产者-消费者模式的标准工具。生产者生成数据后调用 notify() 唤醒等待的消费者;消费者在队列为空时调用 wait() 挂起自己。

async def producer(cond, queue): async with cond: await asyncio.sleep(1) await queue.put("data") cond.notify() # 通知一个等待的消费者 async def consumer(cond, queue): async with cond: while queue.empty(): await cond.wait() # 释放锁并等待 data = await queue.get() return data

使用 Condition 时需要牢记的关键点:

在 Python 3.9+ 中,asyncio.Condition 的性能有显著优化。在大多数实际场景中,如果只是简单的生产者-消费者模式,也可以考虑使用 asyncio.Queue 替代 Condition,因为 Queue 内部已经封装了完整的同步逻辑。

六、asyncio.Barrier(Python 3.11+)

asyncio.Barrier 是 Python 3.11 新增的同步原语,实现了一个同步屏障(sync barrier)机制。它创建一个屏障,指定需要等待的协程数量(parties)。当足够数量的协程都到达屏障点时,它们才会被同时释放继续执行。

async def worker(barrier, name): print(f"{name} 开始第一阶段工作") await asyncio.sleep(random.uniform(0.5, 2.0)) print(f"{name} 到达屏障") await barrier.wait() # 等待所有 worker 到达 print(f"{name} 通过屏障,开始第二阶段")

Barrier 的主要特性包括:

Barrier 非常适合分布式计算中的同步阶段、机器学习训练中的梯度同步、以及多阶段任务编排等场景。

七、与 threading 同步原语的关键区别

虽然 asyncio 同步原语与 threading 同步原语的名字和基本语义高度一致,但它们有本质区别。以下是关键对比:

对比维度 threading 同步原语 asyncio 同步原语
等待行为 阻塞线程(OS 线程上下文切换,重量级) await 挂起协程(事件循环内切换,轻量级)
适用并发模型 多线程并行 单线程协程并发
锁传递 支持锁在不同线程间传递(需谨慎) 不需要锁占有传递(协程在同一线程)
上下文管理器 with lock: async with lock:
GIL 影响 GIL 保护原子操作,但 I/O 时释放 GIL 不存在 GIL 问题(单线程)
锁重入 threading.RLock 支持可重入 asyncio.Lock 不支持重入(需自行防范死锁)
超时机制 acquire(timeout) 支持超时 可配合 asyncio.wait_for() 实现超时
适用场景 CPU 密集型 + I/O 密集型(多核利用) 高并发 I/O 密集型(大量连接、请求)

核心区别总结:threading 原语等待时阻塞线程(OS 调度器介入),asyncio 原语等待时让出控制权给事件循环(协程调度器介入)。前者是抢占式多任务,后者是协作式多任务。在 asyncio 中没有"锁传递"的概念,因为所有协程在同一个线程中运行,锁的持有者天然就是当前协程,不存在另一个线程强行获取锁的情况。

另一个重要的实践区别:在 threading 中我们通常避免在持有锁时执行 I/O 操作(因为会阻塞整个线程),但在 asyncio 中,持有锁时 await 进行 I/O 操作是完全安全且常见的——await 会挂起当前协程但不会阻塞事件循环,因此不会导致死锁(前提是临界区内的 await 操作不会再次尝试获取同一个锁)。

八、最佳实践与常见陷阱

最佳实践:

常见陷阱:

九、学习要点总结

核心要点回顾:

1. asyncio 同步原语保护的是协程切换点上的数据完整性,而非并行执行。

2. Lock 保护临界区,确保互斥访问共享资源。

3. Event 实现一对多的信号通知,一次性使用。

4. Semaphore/BoundedSemaphore 控制并发访问数量,是实现限流器的核心工具。

5. Condition 实现等待/通知模式,适用于异步生产者-消费者。

6. Barrier(3.11+)实现多阶段同步屏障,适用于分阶段并行计算。

7. asyncio 原语本质是协作式多任务的同步工具,与 threading 的抢占式同步有根本区别。

8. 合理选择同步原语可以显著降低代码复杂度,提升可维护性。