queue模块 — 同步队列

Python标准库精讲专题 · 并发编程篇 · 掌握线程安全队列

专题:Python标准库精讲系统学习

关键词:Python, 标准库, queue, 队列, Queue, LifoQueue, PriorityQueue, 生产者消费者, 线程安全

一、queue模块概述

queue是Python标准库中提供的线程安全队列模块,适用于多线程编程中的安全数据交换。该模块实现了多种队列数据结构,支持阻塞操作和超时机制,是构建生产者消费者模式的核心工具。无论是在并发爬虫、任务调度,还是事件驱动系统中,queue模块都是多线程间传递数据的最佳选择。

核心特性

队列类型一览

说明引入版本
Queue先进先出(FIFO)队列,最常用的线程安全队列2.0+
LifoQueue后进先出(LIFO)栈,相当于线程安全的栈结构2.0+
PriorityQueue优先级队列,基于heapq实现,优先级最低的元素优先出队2.0+
SimpleQueue简化版FIFO队列,轻量级,不支持任务跟踪和容量限制3.7+

与collections.deque的区别

collections.deque是双端队列,支持在两端高效地添加和删除元素,性能极佳。但二者有本质区别,适合不同的场景:

简单来说:单线程场景或需要手动精细控制锁时选择deque;多线程安全通信场景应优先选择queue模块,既简洁又安全。

二、Queue — 先进先出(FIFO)

Queue是queue模块中最核心的类,实现了标准的先进先出队列。第一个放入的元素会被第一个取出,如同现实生活中的排队队列。Queue的内部使用deque作为底层存储,结合threading.Condition实现高效的阻塞唤醒机制。

构造与容量控制

Queue(maxsize=0)创建一个FIFO队列实例。maxsize为0时表示队列容量无限,元素入队永不阻塞;maxsize为正整数时限制队列最多容纳的元素数量。当队列达到最大容量时,put()操作将阻塞直到有消费者取出元素腾出空间。合理设置maxsize是系统稳定性设计的重要环节。

核心方法详解

基本使用示例

from queue import Queue q = Queue(maxsize=10) # 生产者放入数据 q.put("任务A") q.put("任务B", block=True, timeout=5) # 消费者取出数据 item = q.get() print(f"处理: {item}") # 标记任务完成 q.task_done() # 等待所有任务处理完毕 q.join()

阻塞与超时机制

阻塞操作是Queue最重要的设计理念。当队列为空时,消费者线程调用get()会自动暂停执行,进入等待状态,直到有新的元素入队或超时被唤醒。这种机制相比轮询检查(polling)有两大优势:一是节省CPU资源,线程在无数据时自然休眠不消耗CPU;二是响应延迟低,新数据入队后等待线程立即被唤醒。这种"等待-通知"模式是高效并发编程的基础。

关键点:put()和get()的block参数默认均为True,意味着默认就是阻塞行为。如果不希望阻塞,应该显式传入block=False并做好捕获queue.Full或queue.Empty异常的准备。实际工程中默认阻塞配合合理的timeout是最稳妥的使用方式。

三、LifoQueue — 后进先出(LIFO)

LifoQueue实现后进先出(Last In First Out)行为,即最后放入的元素最先被取出,功能等同于栈(Stack)数据结构。LifoQueue的接口与Queue完全一致,只是内部存取顺序不同——数据从同一端放入和取出,自然形成后进先出的效果。

基本使用示例

from queue import LifoQueue stack = LifoQueue(maxsize=0) # 压栈 stack.put("第一层") stack.put("第二层") stack.put("第三层") # 弹栈(后进先出) print(stack.get()) # 第三层 print(stack.get()) # 第二层 print(stack.get()) # 第一层

典型应用场景

关键点:LifoQueue和Queue共享同一套线程安全实现机制,因此栈操作同样是完全线程安全的。多个线程可以同时安全地执行压栈和弹栈操作,这是常规list实现的栈无法直接提供的特性——list栈在多线程环境下需要额外加锁保护。

四、PriorityQueue — 优先级队列

PriorityQueue基于heapq模块实现,内部使用最小堆(min-heap)数据结构维护元素顺序。每次取出元素时返回队列中优先级最高的元素——即值最小的元素总是最先出队。PriorityQueue完美融合了堆的高效操作(入队和出队均为O(log n)时间复杂度)和queue模块的线程安全特性。

基本使用示例

最常见的用法是将(priority, data)元组放入队列,PriorityQueue会根据元组的第一个元素排序。

from queue import PriorityQueue pq = PriorityQueue() # 放入(优先级, 数据)元组,数值越小优先级越高 pq.put((3, "低优先级任务")) pq.put((1, "高优先级任务")) pq.put((2, "中优先级任务")) # 按优先级取出 print(pq.get()[1]) # 高优先级任务 print(pq.get()[1]) # 中优先级任务 print(pq.get()[1]) # 低优先级任务

优先级排序规则与注意事项

PriorityQueue使用元组的第一个元素确定优先级顺序。当第一个元素(优先级值)相同时,队列会比较元组的第二个元素来打破平局。这就引发了一个重要的注意事项:如果第二个元素是不可比较的类型(例如两个不同的自定义对象),队列会抛出TypeError。解决这个问题有以下几种方案:

from queue import PriorityQueue from dataclasses import dataclass @dataclass(order=True) class Task: priority: int item: object = None pq = PriorityQueue() pq.put(Task(1, "紧急任务")) pq.put(Task(3, "日常任务")) pq.put(Task(2, "重要任务")) task = pq.get() print(task.item) # 紧急任务

典型应用场景

五、SimpleQueue — 简化版FIFO(3.7+)

SimpleQueue是Python 3.7引入的轻量级FIFO队列,专为不需要任务跟踪的简单场景优化。相比Queue,它去掉了task_done()/join()任务追踪机制和maxsize容量限制,内部实现更简单直接,在不需要容量控制和任务跟踪的场景下性能略高于Queue。

接口对比

方法/特性QueueSimpleQueue
put(item, block, timeout)支持阻塞/超时/容量限制不支持阻塞(始终不阻塞,无容量限制)
get(block, timeout)支持阻塞/超时支持阻塞/超时
qsize()支持支持
empty()支持支持
task_done()支持不支持
join()支持不支持
maxsize支持不支持(无容量限制)

使用示例

from queue import SimpleQueue sq = SimpleQueue() # 放入元素(不阻塞,也无容量限制) sq.put("简单任务A") sq.put("简单任务B") # 取出元素(支持阻塞和超时) print(sq.get()) # 简单任务A print(sq.get()) # 简单任务B print(sq.empty()) # True

适用场景

关键点:SimpleQueue的put()不阻塞也不检查容量,适用于生产者速度远快于消费者时可以接受无限增长的场景。如果需要对生产速度进行节流控制,应选择Queue并设置合适的maxsize参数,否则无限制的入队可能导致内存耗尽。

六、实战案例 — 生产者消费者与多Worker分发

案例1:经典生产者消费者模式

生产者消费者模式是多线程编程中最经典的协作设计模式。生产者线程负责产生数据,消费者线程负责处理数据,中间通过队列解耦——生产者和消费者互不感知对方的存在,只与队列交互。这种解耦带来了极大的灵活性:生产者和消费者的数量可以独立调整,处理速度也可以各自独立变化。

import threading import time from queue import Queue def producer(q, items): for item in items: time.sleep(0.5) q.put(item) print(f"[生产者] 放入: {item}, 当前队列: {q.qsize()}") q.put(None) # 结束信号(毒丸) def consumer(q): while True: item = q.get() if item is None: # 收到结束信号,退出循环 q.task_done() break print(f"[消费者] 处理: {item}") time.sleep(1) q.task_done() q = Queue(maxsize=3) t1 = threading.Thread(target=producer, args=(q, ["任务1", "任务2", "任务3", "任务4", "任务5"])) t2 = threading.Thread(target=consumer, args=(q,)) t1.start() t2.start() t1.join() q.join() t2.join() print("所有任务处理完毕")

案例2:多Worker任务分发

将任务分发给多个Worker线程并行处理,是爬虫、文件处理、API请求等IO密集型任务的常见模式。使用多Worker可以充分利用IO等待时间,大幅提升系统吞吐量。下面的示例展示了如何启停多个Worker,以及如何将结果汇总到独立的结果队列中。

import threading import time from queue import Queue def worker(worker_id, task_queue, result_queue): while True: task = task_queue.get() if task is None: result_queue.put(f"Worker-{worker_id} 结束") task_queue.task_done() break # 模拟任务处理 time.sleep(0.5) result = f"Worker-{worker_id} 完成: {task}" result_queue.put(result) task_queue.task_done() NUM_WORKERS = 3 task_queue = Queue() result_queue = Queue() # 启动Worker线程 workers = [] for i in range(NUM_WORKERS): t = threading.Thread(target=worker, args=(i + 1, task_queue, result_queue)) t.start() workers.append(t) # 分发20个任务 for j in range(20): task_queue.put(f"任务{j + 1:02d}") # 发送结束信号(每个Worker一个毒丸) for _ in range(NUM_WORKERS): task_queue.put(None) # 等待所有任务完成 task_queue.join() # 收集结果 results = [] while not result_queue.empty(): results.append(result_queue.get()) for r in results: print(r) completed = len([r for r in results if "完成" in r]) print(f"共完成 {completed} 个任务")

设计要点总结

最佳实践:对于大部分多线程数据交换场景,queue模块提供了正确且高效的实现。应避免自行使用list/dict加锁来实现线程安全队列——标准库的实现已经过充分测试和优化,直接使用更安全、更高效。在需要更高级的并发模式时(如异步队列、分布式队列),可以在queue模块的基础上进行扩展,而非重新造轮子。