专题:Python并发编程系统学习
关键词:Python, 并发编程, asyncio.Queue, 异步队列, 协程通信, 生产者消费者, 背压
一、协程安全队列的需求
在基于 asyncio 的异步编程中,经常需要在多个协程之间传递数据。例如,一个协程负责从网络或文件系统读取数据,另一个协程负责对数据进行处理,第三个协程负责将结果写入数据库。这种多阶段流水线处理在异步应用中非常常见。
与多线程编程不同,协程不共享内存地址空间中的可变数据,因此不能简单地依赖线程安全的 queue.Queue 来交换数据。我们需要一种专门为协程设计的队列——它在 put 和 get 操作上都是异步的,能够在队列满或空时自动挂起当前协程,从而避免忙等待和轮询。
asyncio.Queue 就是为此场景设计的协程安全队列。它的核心特点包括:
- 协程安全:多个生产者协程可以安全地并发 put,多个消费者协程可以安全地并发 get,无需额外的锁保护。
- 异步阻塞:当队列为空时,get() 会挂起等待;当队列满时(设定了 maxsize),put() 会挂起等待——不会浪费 CPU 周期。
- 协作通知:通过 task_done() 和 join() 实现生产者-消费者之间的完成通知机制,生产者可以等待所有已放入的元素被消费完毕。
核心思想:asyncio.Queue 将线程安全的 queue.Queue 的阻塞调用替换为 await 挂起,在异步世界中实现了同样的生产者-消费者解耦模式,同时保留了背压(backpressure)能力。
二、asyncio.Queue基础用法
asyncio.Queue 的使用模式非常清晰:创建一个队列实例,生产者协程向队列中 put 元素,消费者协程从队列中 get 元素。下面展示一个最简单的生产者-消费者示例:
import asyncio
async def producer(q):
for i in range(10):
await q.put(i)
print(f"生产: {i}")
async def consumer(q):
while True:
item = await q.get()
print(f"消费: {item}")
q.task_done()
async def main():
q = asyncio.Queue(maxsize=5)
producers = [asyncio.create_task(producer(q))]
consumers = [asyncio.create_task(consumer(q))]
await asyncio.gather(*producers)
await q.join() # 等待所有项目被处理
for c in consumers:
c.cancel()
在这个示例中:
- q.put(i):生产者协程将元素放入队列。如果队列已满(达到 maxsize=5),此操作会挂起直到有空位。
- q.get():消费者协程从队列取出元素。如果队列为空,此操作会挂起直到有元素可用。
- q.task_done():消费者在处理完每个元素后调用,通知队列一个 put 操作已完成。这是 join() 能够工作的基础。
- q.join():挂起直到队列中所有已 put 的元素都通过 task_done() 标记为完成。这里我们在所有生产者完成后调用 join(),确保所有数据都被消费者处理完毕,然后再取消消费者任务。
注意 consumer 中的 while True 循环。当所有生产者完成且队列为空时,消费者会永远阻塞在 await q.get() 上。因此我们需要在 join() 返回后手动取消消费者任务。后面的章节会介绍更优雅的退出方式。
三、队列容量与背压控制
在异步系统中,背压(backpressure)是一个关键概念。当生产者生产数据的速度快于消费者处理数据的速度时,如果不加控制,缓冲区会无限增长,最终导致内存耗尽。asyncio.Queue 的 maxsize 参数天然提供了背压控制能力。
当设定了 maxsize 后,队列满时的 put 操作会自动挂起生产者,直到消费者消费了元素腾出空间。这种阻塞会反向传递:生产者被阻塞意味着它的上游(如网络读取、数据库查询)也会暂停,从而形成一条完整的背压链。
import asyncio
async def fast_producer(q):
for i in range(20):
await q.put(i)
print(f"[{i}] 已放入, 队列大小: {q.qsize()}")
print("生产者完成")
async def slow_consumer(q):
while True:
item = await q.get()
await asyncio.sleep(0.5) # 模拟慢速处理
print(f"处理了: {item}")
q.task_done()
async def main():
q = asyncio.Queue(maxsize=3) # 队列最多容纳3个元素
prod = asyncio.create_task(fast_producer(q))
cons = asyncio.create_task(slow_consumer(q))
await prod
await q.join()
cons.cancel()
asyncio.run(main())
尝试运行上面的代码,你会观察到:尽管生产者尝试快速放入 20 个元素,但由于 maxsize=3,它会在队列满时被阻塞,等待消费者处理完元素腾出空间。这就是背压的实际效果——慢消费者的速度决定了整体系统的吞吐量。
最佳实践:队列 maxsize 的设定没有绝对标准,但可以参考以下策略:
- CPU 密集型消费者:maxsize 设为 1 或 2,避免生产者过度预热导致消费者追赶压力。
- IO 密集型消费者:maxsize 可以设大一些(如 100-1000),充分利用 IO 等待时间进行批量处理。
- 内存敏感场景:根据单个元素的内存占用估算,确保最坏情况下队列占用的内存是可接受的。
四、asyncio.LifoQueue与PriorityQueue
asyncio 模块提供了 Queue 的两种变体,用于不同的需求场景:
asyncio.LifoQueue(后进先出队列)
LifoQueue 即栈结构——最后放入的元素最先被取出。适用于"最新的数据最有价值"的场景,例如实时行情处理、最近的日志条目优先分析等。
import asyncio
async def demo_lifo():
q = asyncio.LifoQueue()
for i in range(5):
q.put_nowait(i) # 非阻塞放入
while not q.empty():
item = q.get_nowait() # 非阻塞取出
print(f"取出: {item}") # 输出: 4, 3, 2, 1, 0
asyncio.run(demo_lifo())
asyncio.PriorityQueue(优先级队列)
PriorityQueue 按元素的优先级顺序输出——优先级最低(值最小)的元素最先被取出。元素可以是元组 (priority, data) 的形式。
import asyncio
async def demo_priority():
q = asyncio.PriorityQueue()
q.put_nowait((3, "低优先级"))
q.put_nowait((1, "高优先级"))
q.put_nowait((2, "中优先级"))
while not q.empty():
pri, msg = await q.get()
print(f"[优先级 {pri}] {msg}")
# 输出顺序: 高优先级 -> 中优先级 -> 低优先级
q.task_done()
asyncio.run(demo_priority())
PriorityQueue 的典型应用场景包括:任务调度系统(紧急任务优先处理)、网络请求队列(超时时间短的请求优先)、事件处理(重要事件优先消费)。
五、队列迭代与shutdown
在早期的 asyncio 版本中,优雅地关闭队列并通知消费者退出是一个常见的设计挑战。传统的做法是使用 sentinel 哨兵值:在生产完成后,向队列中放入一个特殊标记,消费者检测到这个标记后退出。
# 使用 sentinel 哨兵值优雅退出
SENTINEL = "STOP"
async def producer(q):
for i in range(5):
await q.put(i)
await q.put(SENTINEL) # 发送哨兵信号
async def consumer(q):
while True:
item = await q.get()
if item is SENTINEL:
q.task_done()
break
print(f"处理: {item}")
q.task_done()
单个消费者时 sentinel 模式工作良好,但对于多个消费者,需要放入与消费者数量相同的 sentinel 值,否则某些消费者可能永远不会收到信号。这种模式虽然可行但略显笨拙。
Python 3.13+ 的 Queue.shutdown()
Python 3.13 引入了 Queue.shutdown() 方法,大大简化了队列的关闭逻辑。调用 shutdown() 后:
- 队列不再接受新的 put 操作(立即抛出 QueueShutDown 异常)。
- 队列中已有的元素仍然可以通过 get() 取出。
- 所有已阻塞的 put 操作会被唤醒并抛出异常。
- 当队列已空且已关闭时,get() 也会抛出异常。
import asyncio
async def consumer(q):
while True:
try:
item = await q.get()
print(f"处理: {item}")
q.task_done()
except asyncio.QueueShutDown:
print("队列已关闭,退出")
break
async def main():
q = asyncio.Queue()
for i in range(10):
q.put_nowait(i)
q.shutdown() # 关闭队列,不可再 put
await consumer(q)
asyncio.run(main())
shutdown() 尤其适用于多消费者场景:只需调用一次,所有等待 get() 的消费者都会被唤醒并获得 QueueShutDown 异常,无需手动管理 sentinel 计数。
六、多生产者-多消费者模式
现实中的异步应用往往涉及多个生产者和多个消费者协同工作。asyncio.Queue 天然支持这种模式——多个协程可以安全地并发 put 和 get。
import asyncio
import random
async def producer(pid, q, count):
for i in range(count):
item = f"P{pid}-项目{i}"
await q.put(item)
print(f"生产者{pid} 生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
async def consumer(cid, q):
while True:
try:
item = await asyncio.wait_for(q.get(), timeout=2.0)
print(f"消费者{cid} 消费: {item}")
q.task_done()
except asyncio.TimeoutError:
print(f"消费者{cid} 超时退出")
break
async def main():
q = asyncio.Queue(maxsize=10)
# 创建 3 个生产者,各生产 5 个项目
producers = [asyncio.create_task(producer(i, q, 5)) for i in range(3)]
# 创建 2 个消费者
consumers = [asyncio.create_task(consumer(i, q)) for i in range(2)]
await asyncio.gather(*producers)
await q.join() # 等待所有生产的数据被消费
# 用超时机制让消费者自然退出
await asyncio.gather(*consumers, return_exceptions=True)
asyncio.run(main())
在这个示例中:
- 3 个生产者协程并发地向队列中放入元素,每个生产 5 个项目,共 15 个项目。
- 2 个消费者协程并发地从队列取出并处理元素。
- 所有生产者完成后,调用 join() 等待队列中所有元素被消费。
- 消费者使用 wait_for(q.get(), timeout=2.0) 实现超时退出,避免了无限阻塞的问题。
多生产者-多消费者模式的核心优势在于:
- 解耦:生产者和消费者之间通过队列间接通信,互不知道对方的存在。
- 弹性:可以独立调整生产者和消费者的数量,无需修改业务逻辑。
- 流量控制:队列的 maxsize 自动均衡生产速率和消费速率。
优雅关闭策略总结:
- 方案一(通用):join() + cancel() —— 适用于消费者是无限循环且无需清理的场景。
- 方案二(通用):sentinel 哨兵 —— 适用于精确控制消费者退出时机的场景,多消费者需放入 N 个哨兵。
- 方案三(Python 3.13+):shutdown() —— 最简洁的方式,一行代码唤醒所有消费者。
- 方案四(超时兜底):wait_for(q.get(), timeout=N) —— 防止消费者无限阻塞的安全网。
七、queue.Queue vs asyncio.Queue对比
Python 标准库提供了两个 Queue 实现:threading/queue 模块中的 queue.Queue 和 asyncio 模块中的 asyncio.Queue。二者的核心差异在于 同步阻塞 与 异步挂起 的区别。
| 对比维度 |
queue.Queue |
asyncio.Queue |
| 所属模块 |
queue(标准库) |
asyncio |
| 安全类型 |
线程安全(thread-safe) |
协程安全(coroutine-safe) |
| 阻塞方式 |
阻塞调用线程(blocking) |
挂起当前协程(await) |
| 非阻塞接口 |
put_nowait() / get_nowait() |
put_nowait() / get_nowait() |
| 超时支持 |
put(..., timeout=N), get(..., timeout=N) |
asyncio.wait_for(q.get(), timeout=N) |
| 完成跟踪 |
task_done() / join() |
task_done() / join() |
| 变体 |
LifoQueue, PriorityQueue |
LifoQueue, PriorityQueue |
| 关闭机制 |
无原生关闭 |
shutdown()(Python 3.13+) |
| 适用场景 |
多线程生产者-消费者 |
协程生产者-消费者 |
在异步代码中使用 queue.Queue 是 错误的:
- queue.Queue 的 get() 会阻塞调用线程,而 asyncio 运行在单线程事件循环中——这意味着整个事件循环都会被阻塞,所有协程都无法运行。
- 反之,在 threading 线程代码中使用 asyncio.Queue 也是不适合的——asyncio.Queue 要求其调用者位于同一事件循环中。
经验法则:在 asyncio 代码中始终使用 asyncio.Queue,在 threading 代码中始终使用 queue.Queue,在 multiprocessing 代码中使用 multiprocessing.Queue。三者的 API 设计非常相似,但底层的同步机制完全不同,不可混用。