← 返回Python标准库精讲目录
← 返回学习笔记首页
专题: Python标准库精讲系统学习
关键词: Python, 标准库, queue, 队列, Queue, LifoQueue, PriorityQueue, 生产者消费者, 线程安全
一、queue模块概述
queue是Python标准库中提供的线程安全队列模块,适用于多线程编程中的安全数据交换。该模块实现了多种队列数据结构,支持阻塞操作和超时机制,是构建生产者消费者模式的核心工具。无论是在并发爬虫、任务调度,还是事件驱动系统中,queue模块都是多线程间传递数据的最佳选择。
核心特性
线程安全 :内部使用threading.Lock和threading.Condition实现,多个线程可安全地同时存取数据,无需额外加锁
阻塞操作 :队列为空时get()阻塞等待,队列满时put()阻塞等待,避免忙等待(busy waiting)浪费CPU资源
超时机制 :put()和get()均支持timeout参数,超过指定时间仍未满足条件则抛出异常,避免无限阻塞
容量控制 :通过maxsize参数限制队列大小,实现背压(backpressure)调节,防止系统被过快的生产者压垮
任务跟踪 :task_done()和join()配合使用,生产者可以准确地等待所有任务被消费者处理完毕
队列类型一览
类 说明 引入版本
Queue 先进先出(FIFO)队列,最常用的线程安全队列 2.0+
LifoQueue 后进先出(LIFO)栈,相当于线程安全的栈结构 2.0+
PriorityQueue 优先级队列,基于heapq实现,优先级最低的元素优先出队 2.0+
SimpleQueue 简化版FIFO队列,轻量级,不支持任务跟踪和容量限制 3.7+
与collections.deque的区别
collections.deque是双端队列,支持在两端高效地添加和删除元素,性能极佳。但二者有本质区别,适合不同的场景:
线程安全 :deque不是线程安全的,多线程并发存取时需要手动加锁,稍有不慎就会出现数据竞争;queue模块内置锁机制,开箱即用,从根本上避免了线程安全问题
阻塞语义 :deque没有阻塞操作——从空deque中pop会直接抛出IndexError,消费者需要自行实现轮询或等待逻辑;queue提供put()/get()阻塞调用,实现优雅的等待唤醒机制
容量限制 :deque没有容量上限概念,无法对生产者进行节流;Queue可设置maxsize控制最大容量,实现天然的生产者阻塞调节
任务跟踪 :queue提供task_done()/join()机制方便协调消费者进度,deque完全无此功能,需要自行实现
简单来说:单线程场景或需要手动精细控制锁时选择deque;多线程安全通信场景应优先选择queue模块,既简洁又安全。
二、Queue — 先进先出(FIFO)
Queue是queue模块中最核心的类,实现了标准的先进先出队列。第一个放入的元素会被第一个取出,如同现实生活中的排队队列。Queue的内部使用deque作为底层存储,结合threading.Condition实现高效的阻塞唤醒机制。
构造与容量控制
Queue(maxsize=0)创建一个FIFO队列实例。maxsize为0时表示队列容量无限,元素入队永不阻塞;maxsize为正整数时限制队列最多容纳的元素数量。当队列达到最大容量时,put()操作将阻塞直到有消费者取出元素腾出空间。合理设置maxsize是系统稳定性设计的重要环节。
核心方法详解
put(item, block=True, timeout=None) :向队列尾部放入一个元素。block=True(默认)时若队列已满则阻塞等待,直到其他线程取出元素腾出空间;timeout指定阻塞超时秒数,超时后抛出queue.Full异常。block=False时执行非阻塞尝试,若队列已满则立即抛出Full异常。
get(block=True, timeout=None) :从队列头部取出一个元素并移除。block=True时若队列为空则阻塞等待,直到有新元素入队;timeout指定超时时间,超时后抛出queue.Empty异常。
qsize() :返回队列当前大致长度。注意在多线程环境下返回值是近似值,因为调用返回之间队列状态可能已变化。
empty() / full() :判断队列是否为空或已满。同样是近似判断,仅作参考。
task_done() :消费者在处理完一个任务后调用,通知队列该任务已完成。每次get()后都应调用一次task_done()。
join() :阻塞直到队列中所有元素均已被处理——即task_done()的累计调用次数等于put()的总次数。这是生产者等待所有任务完成的关键方法。
基本使用示例
from queue import Queue
q = Queue(maxsize=10)
# 生产者放入数据
q.put("任务A")
q.put("任务B", block=True, timeout=5)
# 消费者取出数据
item = q.get()
print(f"处理: {item}")
# 标记任务完成
q.task_done()
# 等待所有任务处理完毕
q.join()
阻塞与超时机制
阻塞操作是Queue最重要的设计理念。当队列为空时,消费者线程调用get()会自动暂停执行,进入等待状态,直到有新的元素入队或超时被唤醒。这种机制相比轮询检查(polling)有两大优势:一是节省CPU资源,线程在无数据时自然休眠不消耗CPU;二是响应延迟低,新数据入队后等待线程立即被唤醒。这种"等待-通知"模式是高效并发编程的基础。
关键点 :put()和get()的block参数默认均为True,意味着默认就是阻塞行为。如果不希望阻塞,应该显式传入block=False并做好捕获queue.Full或queue.Empty异常的准备。实际工程中默认阻塞配合合理的timeout是最稳妥的使用方式。
三、LifoQueue — 后进先出(LIFO)
LifoQueue实现后进先出(Last In First Out)行为,即最后放入的元素最先被取出,功能等同于栈(Stack)数据结构。LifoQueue的接口与Queue完全一致,只是内部存取顺序不同——数据从同一端放入和取出,自然形成后进先出的效果。
基本使用示例
from queue import LifoQueue
stack = LifoQueue(maxsize=0)
# 压栈
stack.put("第一层")
stack.put("第二层")
stack.put("第三层")
# 弹栈(后进先出)
print(stack.get()) # 第三层
print(stack.get()) # 第二层
print(stack.get()) # 第一层
典型应用场景
深度优先搜索(DFS) :在图或树的DFS遍历中,使用LifoQueue保存待访问节点,保证每次优先探索最新发现的路径,天然符合DFS的特性
撤销操作(Undo) :文本编辑器或图形软件中的多级撤销功能,将每一步操作压入栈中,撤销时从栈顶依次弹出——最后执行的操作最先被撤销
表达式求值 :编译器和计算器中处理括号匹配和后缀表达式(RPN)转换和计算时,栈是最核心的数据结构
递归转非递归 :将递归算法改写为迭代算法时,常用栈来模拟系统调用栈,保存上下文信息
关键点 :LifoQueue和Queue共享同一套线程安全实现机制,因此栈操作同样是完全线程安全的。多个线程可以同时安全地执行压栈和弹栈操作,这是常规list实现的栈无法直接提供的特性——list栈在多线程环境下需要额外加锁保护。
四、PriorityQueue — 优先级队列
PriorityQueue基于heapq模块实现,内部使用最小堆(min-heap)数据结构维护元素顺序。每次取出元素时返回队列中优先级最高的元素——即值最小的元素总是最先出队。PriorityQueue完美融合了堆的高效操作(入队和出队均为O(log n)时间复杂度)和queue模块的线程安全特性。
基本使用示例
最常见的用法是将(priority, data)元组放入队列,PriorityQueue会根据元组的第一个元素排序。
from queue import PriorityQueue
pq = PriorityQueue()
# 放入(优先级, 数据)元组,数值越小优先级越高
pq.put((3, "低优先级任务"))
pq.put((1, "高优先级任务"))
pq.put((2, "中优先级任务"))
# 按优先级取出
print(pq.get()[1]) # 高优先级任务
print(pq.get()[1]) # 中优先级任务
print(pq.get()[1]) # 低优先级任务
优先级排序规则与注意事项
PriorityQueue使用元组的第一个元素确定优先级顺序。当第一个元素(优先级值)相同时,队列会比较元组的第二个元素来打破平局。这就引发了一个重要的注意事项:如果第二个元素是不可比较的类型(例如两个不同的自定义对象),队列会抛出TypeError。解决这个问题有以下几种方案:
使用数据类 :通过@dataclass(order=True)装饰自定义包装类,让对象本身可比较
使用递增计数器 :在优先级元组中插入一个递增序号作为辅助排序键,确保唯一性
包装为可比较结构 :使用(priority, index, data)三元组,其中index保证唯一性
from queue import PriorityQueue
from dataclasses import dataclass
@dataclass(order=True)
class Task:
priority: int
item: object = None
pq = PriorityQueue()
pq.put(Task(1, "紧急任务"))
pq.put(Task(3, "日常任务"))
pq.put(Task(2, "重要任务"))
task = pq.get()
print(task.item) # 紧急任务
典型应用场景
任务调度系统 :根据任务优先级决定执行顺序,高优先级任务优先被Worker线程取出并处理,确保重要任务不延迟
Dijkstra最短路径算法 :算法的核心步骤就是从优先队列中取出当前距离最短的未访问节点,保证每次贪心选择的正确性
事件驱动模拟 :离散事件仿真系统中,将事件按发生时间排序,时间最早的事件优先处理,推进仿真时钟前进
多路归并排序 :合并多个有序数据流时,利用优先队列每次选择当前最小的元素输出,实现高效的N路归并
五、SimpleQueue — 简化版FIFO(3.7+)
SimpleQueue是Python 3.7引入的轻量级FIFO队列,专为不需要任务跟踪的简单场景优化。相比Queue,它去掉了task_done()/join()任务追踪机制和maxsize容量限制,内部实现更简单直接,在不需要容量控制和任务跟踪的场景下性能略高于Queue。
接口对比
方法/特性 Queue SimpleQueue
put(item, block, timeout) 支持阻塞/超时/容量限制 不支持阻塞(始终不阻塞,无容量限制)
get(block, timeout) 支持阻塞/超时 支持阻塞/超时
qsize() 支持 支持
empty() 支持 支持
task_done() 支持 不支持
join() 支持 不支持
maxsize 支持 不支持(无容量限制)
使用示例
from queue import SimpleQueue
sq = SimpleQueue()
# 放入元素(不阻塞,也无容量限制)
sq.put("简单任务A")
sq.put("简单任务B")
# 取出元素(支持阻塞和超时)
print(sq.get()) # 简单任务A
print(sq.get()) # 简单任务B
print(sq.empty()) # True
适用场景
不需要任务跟踪的简单生产者-消费者通信场景,数据发出即完成
对性能要求较高但不需要容量控制的场景,SimpleQueue的轻量级实现可减少锁开销
无界(unbounded)FIFO数据传递,生产者和消费者速度基本匹配的情况下
作为底层基础组件被更高级的抽象包装使用
关键点 :SimpleQueue的put()不阻塞也不检查容量,适用于生产者速度远快于消费者时可以接受无限增长的场景。如果需要对生产速度进行节流控制,应选择Queue并设置合适的maxsize参数,否则无限制的入队可能导致内存耗尽。
六、实战案例 — 生产者消费者与多Worker分发
案例1:经典生产者消费者模式
生产者消费者模式是多线程编程中最经典的协作设计模式。生产者线程负责产生数据,消费者线程负责处理数据,中间通过队列解耦——生产者和消费者互不感知对方的存在,只与队列交互。这种解耦带来了极大的灵活性:生产者和消费者的数量可以独立调整,处理速度也可以各自独立变化。
import threading
import time
from queue import Queue
def producer(q, items):
for item in items:
time.sleep(0.5)
q.put(item)
print(f"[生产者] 放入: {item}, 当前队列: {q.qsize()}")
q.put(None) # 结束信号(毒丸)
def consumer(q):
while True:
item = q.get()
if item is None: # 收到结束信号,退出循环
q.task_done()
break
print(f"[消费者] 处理: {item}")
time.sleep(1)
q.task_done()
q = Queue(maxsize=3)
t1 = threading.Thread(target=producer, args=(q, ["任务1", "任务2", "任务3", "任务4", "任务5"]))
t2 = threading.Thread(target=consumer, args=(q,))
t1.start()
t2.start()
t1.join()
q.join()
t2.join()
print("所有任务处理完毕")
案例2:多Worker任务分发
将任务分发给多个Worker线程并行处理,是爬虫、文件处理、API请求等IO密集型任务的常见模式。使用多Worker可以充分利用IO等待时间,大幅提升系统吞吐量。下面的示例展示了如何启停多个Worker,以及如何将结果汇总到独立的结果队列中。
import threading
import time
from queue import Queue
def worker(worker_id, task_queue, result_queue):
while True:
task = task_queue.get()
if task is None:
result_queue.put(f"Worker-{worker_id} 结束")
task_queue.task_done()
break
# 模拟任务处理
time.sleep(0.5)
result = f"Worker-{worker_id} 完成: {task}"
result_queue.put(result)
task_queue.task_done()
NUM_WORKERS = 3
task_queue = Queue()
result_queue = Queue()
# 启动Worker线程
workers = []
for i in range(NUM_WORKERS):
t = threading.Thread(target=worker, args=(i + 1, task_queue, result_queue))
t.start()
workers.append(t)
# 分发20个任务
for j in range(20):
task_queue.put(f"任务{j + 1:02d}")
# 发送结束信号(每个Worker一个毒丸)
for _ in range(NUM_WORKERS):
task_queue.put(None)
# 等待所有任务完成
task_queue.join()
# 收集结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
for r in results:
print(r)
completed = len([r for r in results if "完成" in r])
print(f"共完成 {completed} 个任务")
设计要点总结
队列容量控制 :设置合理的maxsize可以防止生产者过快压垮系统,实现自然的背压(backpressure)调节——系统忙时生产者自动阻塞减速,系统空闲时自动恢复速度
优雅终止 :使用None作为"毒丸"(poison pill)向消费者发送终止信号的方式,比强制关闭线程更安全可靠。每个消费者都需要收到一个毒丸才能正常退出
任务跟踪 :join()配合task_done()确保所有任务被确实处理完毕,而不是仅仅被取出队列。这在实际工程中至关重要——队列为空不代表所有任务已完成,可能还有任务正在被处理
异常处理 :实际工程中get()调用应包含异常处理,避免queue.Empty异常导致Worker线程意外退出。同时put()操作也应在循环中使用try/except保护,处理可能的Full异常
结果收集 :使用独立的队列收集处理结果,将任务分发和结果收集分离。这种职责分离的设计让系统更加清晰,也便于后续对结果进行统一处理
最佳实践 :对于大部分多线程数据交换场景,queue模块提供了正确且高效的实现。应避免自行使用list/dict加锁来实现线程安全队列——标准库的实现已经过充分测试和优化,直接使用更安全、更高效。在需要更高级的并发模式时(如异步队列、分布式队列),可以在queue模块的基础上进行扩展,而非重新造轮子。