asyncio队列:asyncio.Queue与协程间数据传输

Python并发编程专题 · 协程间数据交换的推荐方案

专题:Python并发编程系统学习

关键词:Python, 并发编程, asyncio.Queue, 异步队列, 协程通信, 生产者消费者, 背压

一、协程安全队列的需求

在基于 asyncio 的异步编程中,经常需要在多个协程之间传递数据。例如,一个协程负责从网络或文件系统读取数据,另一个协程负责对数据进行处理,第三个协程负责将结果写入数据库。这种多阶段流水线处理在异步应用中非常常见。

与多线程编程不同,协程不共享内存地址空间中的可变数据,因此不能简单地依赖线程安全的 queue.Queue 来交换数据。我们需要一种专门为协程设计的队列——它在 put 和 get 操作上都是异步的,能够在队列满或空时自动挂起当前协程,从而避免忙等待和轮询。

asyncio.Queue 就是为此场景设计的协程安全队列。它的核心特点包括:

核心思想:asyncio.Queue 将线程安全的 queue.Queue 的阻塞调用替换为 await 挂起,在异步世界中实现了同样的生产者-消费者解耦模式,同时保留了背压(backpressure)能力。

二、asyncio.Queue基础用法

asyncio.Queue 的使用模式非常清晰:创建一个队列实例,生产者协程向队列中 put 元素,消费者协程从队列中 get 元素。下面展示一个最简单的生产者-消费者示例:

import asyncio async def producer(q): for i in range(10): await q.put(i) print(f"生产: {i}") async def consumer(q): while True: item = await q.get() print(f"消费: {item}") q.task_done() async def main(): q = asyncio.Queue(maxsize=5) producers = [asyncio.create_task(producer(q))] consumers = [asyncio.create_task(consumer(q))] await asyncio.gather(*producers) await q.join() # 等待所有项目被处理 for c in consumers: c.cancel()

在这个示例中:

注意 consumer 中的 while True 循环。当所有生产者完成且队列为空时,消费者会永远阻塞在 await q.get() 上。因此我们需要在 join() 返回后手动取消消费者任务。后面的章节会介绍更优雅的退出方式。

三、队列容量与背压控制

在异步系统中,背压(backpressure)是一个关键概念。当生产者生产数据的速度快于消费者处理数据的速度时,如果不加控制,缓冲区会无限增长,最终导致内存耗尽。asyncio.Queue 的 maxsize 参数天然提供了背压控制能力。

当设定了 maxsize 后,队列满时的 put 操作会自动挂起生产者,直到消费者消费了元素腾出空间。这种阻塞会反向传递:生产者被阻塞意味着它的上游(如网络读取、数据库查询)也会暂停,从而形成一条完整的背压链。

import asyncio async def fast_producer(q): for i in range(20): await q.put(i) print(f"[{i}] 已放入, 队列大小: {q.qsize()}") print("生产者完成") async def slow_consumer(q): while True: item = await q.get() await asyncio.sleep(0.5) # 模拟慢速处理 print(f"处理了: {item}") q.task_done() async def main(): q = asyncio.Queue(maxsize=3) # 队列最多容纳3个元素 prod = asyncio.create_task(fast_producer(q)) cons = asyncio.create_task(slow_consumer(q)) await prod await q.join() cons.cancel() asyncio.run(main())

尝试运行上面的代码,你会观察到:尽管生产者尝试快速放入 20 个元素,但由于 maxsize=3,它会在队列满时被阻塞,等待消费者处理完元素腾出空间。这就是背压的实际效果——慢消费者的速度决定了整体系统的吞吐量。

最佳实践:队列 maxsize 的设定没有绝对标准,但可以参考以下策略:

四、asyncio.LifoQueue与PriorityQueue

asyncio 模块提供了 Queue 的两种变体,用于不同的需求场景:

asyncio.LifoQueue(后进先出队列)

LifoQueue 即栈结构——最后放入的元素最先被取出。适用于"最新的数据最有价值"的场景,例如实时行情处理、最近的日志条目优先分析等。

import asyncio async def demo_lifo(): q = asyncio.LifoQueue() for i in range(5): q.put_nowait(i) # 非阻塞放入 while not q.empty(): item = q.get_nowait() # 非阻塞取出 print(f"取出: {item}") # 输出: 4, 3, 2, 1, 0 asyncio.run(demo_lifo())

asyncio.PriorityQueue(优先级队列)

PriorityQueue 按元素的优先级顺序输出——优先级最低(值最小)的元素最先被取出。元素可以是元组 (priority, data) 的形式。

import asyncio async def demo_priority(): q = asyncio.PriorityQueue() q.put_nowait((3, "低优先级")) q.put_nowait((1, "高优先级")) q.put_nowait((2, "中优先级")) while not q.empty(): pri, msg = await q.get() print(f"[优先级 {pri}] {msg}") # 输出顺序: 高优先级 -> 中优先级 -> 低优先级 q.task_done() asyncio.run(demo_priority())

PriorityQueue 的典型应用场景包括:任务调度系统(紧急任务优先处理)、网络请求队列(超时时间短的请求优先)、事件处理(重要事件优先消费)。

五、队列迭代与shutdown

在早期的 asyncio 版本中,优雅地关闭队列并通知消费者退出是一个常见的设计挑战。传统的做法是使用 sentinel 哨兵值:在生产完成后,向队列中放入一个特殊标记,消费者检测到这个标记后退出。

# 使用 sentinel 哨兵值优雅退出 SENTINEL = "STOP" async def producer(q): for i in range(5): await q.put(i) await q.put(SENTINEL) # 发送哨兵信号 async def consumer(q): while True: item = await q.get() if item is SENTINEL: q.task_done() break print(f"处理: {item}") q.task_done()

单个消费者时 sentinel 模式工作良好,但对于多个消费者,需要放入与消费者数量相同的 sentinel 值,否则某些消费者可能永远不会收到信号。这种模式虽然可行但略显笨拙。

Python 3.13+ 的 Queue.shutdown()

Python 3.13 引入了 Queue.shutdown() 方法,大大简化了队列的关闭逻辑。调用 shutdown() 后:

import asyncio async def consumer(q): while True: try: item = await q.get() print(f"处理: {item}") q.task_done() except asyncio.QueueShutDown: print("队列已关闭,退出") break async def main(): q = asyncio.Queue() for i in range(10): q.put_nowait(i) q.shutdown() # 关闭队列,不可再 put await consumer(q) asyncio.run(main())

shutdown() 尤其适用于多消费者场景:只需调用一次,所有等待 get() 的消费者都会被唤醒并获得 QueueShutDown 异常,无需手动管理 sentinel 计数。

六、多生产者-多消费者模式

现实中的异步应用往往涉及多个生产者和多个消费者协同工作。asyncio.Queue 天然支持这种模式——多个协程可以安全地并发 put 和 get。

import asyncio import random async def producer(pid, q, count): for i in range(count): item = f"P{pid}-项目{i}" await q.put(item) print(f"生产者{pid} 生产: {item}") await asyncio.sleep(random.uniform(0.1, 0.3)) async def consumer(cid, q): while True: try: item = await asyncio.wait_for(q.get(), timeout=2.0) print(f"消费者{cid} 消费: {item}") q.task_done() except asyncio.TimeoutError: print(f"消费者{cid} 超时退出") break async def main(): q = asyncio.Queue(maxsize=10) # 创建 3 个生产者,各生产 5 个项目 producers = [asyncio.create_task(producer(i, q, 5)) for i in range(3)] # 创建 2 个消费者 consumers = [asyncio.create_task(consumer(i, q)) for i in range(2)] await asyncio.gather(*producers) await q.join() # 等待所有生产的数据被消费 # 用超时机制让消费者自然退出 await asyncio.gather(*consumers, return_exceptions=True) asyncio.run(main())

在这个示例中:

多生产者-多消费者模式的核心优势在于:

优雅关闭策略总结:

七、queue.Queue vs asyncio.Queue对比

Python 标准库提供了两个 Queue 实现:threading/queue 模块中的 queue.Queue 和 asyncio 模块中的 asyncio.Queue。二者的核心差异在于 同步阻塞异步挂起 的区别。

对比维度 queue.Queue asyncio.Queue
所属模块 queue(标准库) asyncio
安全类型 线程安全(thread-safe) 协程安全(coroutine-safe)
阻塞方式 阻塞调用线程(blocking) 挂起当前协程(await)
非阻塞接口 put_nowait() / get_nowait() put_nowait() / get_nowait()
超时支持 put(..., timeout=N), get(..., timeout=N) asyncio.wait_for(q.get(), timeout=N)
完成跟踪 task_done() / join() task_done() / join()
变体 LifoQueue, PriorityQueue LifoQueue, PriorityQueue
关闭机制 无原生关闭 shutdown()(Python 3.13+)
适用场景 多线程生产者-消费者 协程生产者-消费者

在异步代码中使用 queue.Queue 是 错误的

经验法则:在 asyncio 代码中始终使用 asyncio.Queue,在 threading 代码中始终使用 queue.Queue,在 multiprocessing 代码中使用 multiprocessing.Queue。三者的 API 设计非常相似,但底层的同步机制完全不同,不可混用。