← 返回Python标准库精讲目录
← 返回学习笔记首页
专题: Python标准库精讲系统学习
关键词: Python, 标准库, heapq, 堆, 优先队列, 堆排序, heappush, heappop, heapify, nlargest, nsmallest
一、堆数据结构概述
堆(Heap)是一种特殊的完全二叉树数据结构。Python的heapq模块实现的是最小堆(Min-Heap) ,即任何一个父节点的值都小于或等于其子节点的值。这意味着堆顶(heap[0])始终是整个堆中最小的元素。
1.1 堆的核心性质
完全二叉树 :除最后一层外,其他层节点都是满的,最后一层节点靠左排列
堆序性(Heap Property) :对于最小堆,任意父节点 ≤ 子节点;对于最大堆,任意父节点 ≥ 子节点
隐式表示 :heapq使用普通Python列表存储堆,通过索引关系维护父子关系——索引 i 的父节点为 (i-1)//2,左子节点为 2*i+1,右子节点为 2*i+2
1.2 最小堆与最大堆
heapq模块原生仅支持最小堆 。如果需要最大堆效果,可以通过取负值变通实现(将在后续章节详述)。堆的高度为 O(log n),所有核心操作的复杂度均为 O(log n) 或 O(n)。
堆结构广泛应用于优先队列、任务调度、Top-K问题、中位数查找等场景,是算法面试中的高频数据结构。
二、heapq核心函数
heapq模块提供了完整的堆操作函数集。以下逐一介绍每个核心函数的用法。
2.1 heappush(heap, item) — 入堆
将元素 item 添加到堆中,并保持堆的不变性。时间复杂度 O(log n)。
import heapq
heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 3)
heapq.heappush(heap, 7)
heapq.heappush(heap, 1)
heapq.heappush(heap, 9)
print(heap) # 输出: [1, 3, 7, 5, 9]
2.2 heappop(heap) — 出堆
弹出并返回堆中最小的元素(即 heap[0]),同时保持堆的不变性。时间复杂度 O(log n)。
smallest = heapq.heappop(heap)
print(smallest) # 输出: 1
print(heap) # 输出: [3, 5, 7, 9]
2.3 heapify(x) — 建堆
将普通列表 x 原地转换为一个合法的堆。时间复杂度 O(n)(Floyd建堆算法,比逐个插入的 O(n log n) 更优)。
arr = [9, 7, 3, 5, 1, 8, 6]
heapq.heapify(arr)
print(arr) # 输出: [1, 5, 3, 9, 7, 8, 6]
# 堆顶 1 为最小值
2.4 heappushpop(heap, item) — 推入并弹出
先将 item 推入堆,然后弹出并返回堆中的最小值。该函数组合操作比先 heappush 再 heappop 更高效。时间复杂度 O(log n)。
# 当新元素比堆顶大时,等价于替换堆顶
result = heapq.heappushpop(heap, 2)
print(result) # 输出: 2(原堆中最小值 1 已被弹出,2 入堆后成为新的最小值需重新调整)
# 实际上会先推入再弹出,结果取决于堆的当前状态
2.5 heapreplace(heap, item) — 弹出并推入
先弹出堆中最小值,再将 item 推入堆。与 heappushpop 的区别在于操作顺序不同。时间复杂度 O(log n)。
# heapreplace 先 pop 再 push
# 适用于"替换堆顶元素"的场景
heap = [1, 5, 3, 9, 7]
heapq.heapify(heap)
old_min = heapq.heapreplace(heap, 4)
print(old_min) # 输出: 1
print(heap) # 输出: [3, 4, 5, 9, 7]
2.6 merge(*iterables) — 合并有序序列
将多个已排序的输入归并为一个惰性求值的迭代器,内部基于堆实现。时间复杂度取决于输入规模。
import heapq
list1 = [1, 3, 5, 7]
list2 = [2, 4, 6, 8]
list3 = [0, 9, 10]
merged = heapq.merge(list1, list2, list3)
print(list(merged)) # 输出: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2.7 函数汇总
函数 描述 时间复杂度
heappush(heap, item) 将元素入堆 O(log n)
heappop(heap) 弹出最小元素 O(log n)
heapify(x) 列表原地建堆 O(n)
heappushpop(heap, item) 推入后再弹出最小值 O(log n)
heapreplace(heap, item) 弹出最小值后再推入 O(log n)
merge(*iterables) 合并多个有序序列 O(总元素数)
nlargest(n, iterable, key=None) 获取前n个最大元素 O(n log k)
nsmallest(n, iterable, key=None) 获取前n个最小元素 O(n log k)
三、nlargest与nsmallest
nlargest 和 nsmallest 是 heapq 中两个非常实用的高阶函数,专门用于从大型数据集中高效获取前 n 个最大或最小元素。
3.1 基本语法
heapq.nlargest(n, iterable, key=None)
heapq.nsmallest(n, iterable, key=None)
n :要返回的元素个数
iterable :输入的可迭代对象
key :可选参数,指定排序键函数(与 sorted 的 key 类似)
3.2 基础用法
import heapq
numbers = [3, 7, 1, 9, 4, 6, 8, 2, 5]
# 获取最大的3个元素
top3 = heapq.nlargest(3, numbers)
print(top3) # 输出: [9, 8, 7]
# 获取最小的3个元素
bottom3 = heapq.nsmallest(3, numbers)
print(bottom3) # 输出: [1, 2, 3]
3.3 使用 key 参数
key 参数允许基于特定的属性或函数进行排序,适用于处理复杂数据结构。
students = [
{'name': 'Alice', 'score': 88},
{'name': 'Bob', 'score': 95},
{'name': 'Charlie', 'score': 72},
{'name': 'David', 'score': 91},
{'name': 'Eve', 'score': 85},
]
# 获取成绩最高的3名学生
top_students = heapq.nlargest(3, students, key=lambda s: s['score'])
for s in top_students:
print(f"{s['name']}: {s['score']}")
# 输出:
# Bob: 95
# David: 91
# Alice: 88
# 获取成绩最低的2名学生
bottom_students = heapq.nsmallest(2, students, key=lambda s: s['score'])
3.4 性能分析
nlargest 和 nsmallest 内部使用大小为 n 的堆,时间复杂度为 O(m log n)(m 为总数据量,n 为需要返回的元素个数)。
方法 时间复杂度 空间复杂度 适用场景
sorted(iterable)[:n] O(m log m) O(m) 数据量小,需要全排序
heapq.nlargest(n, iter) O(m log n) O(n) 数据量大,n 远小于 m
heapq.nsmallest(n, iter) O(m log n) O(n) 数据量大,n 远小于 m
max(iterable) / min(iterable) O(m) O(1) 只需最大值或最小值 (n=1)
经验法则 :当 n=1 时,直接使用 min() 或 max() 效率最高;当 n 较小时(相对于总数据量),使用 nlargest/nsmallest;当需要完整排序时,直接使用 sorted()。
四、优先队列实现
堆是实现优先队列(Priority Queue)最经典的数据结构。利用 heapq 可以轻松构建高效的优先队列。
4.1 基础优先队列
最简单的优先队列使用 (priority, item) 元组作为堆元素,heapq 会自动按元组的第一个元素(优先级数值)排序。
import heapq
class PriorityQueue:
def __init__(self):
self._heap = []
def push(self, item, priority):
"""将元素以指定优先级入队(数值越小优先级越高)"""
heapq.heappush(self._heap, (priority, item))
def pop(self):
"""弹出优先级最高的元素"""
if self._heap:
priority, item = heapq.heappop(self._heap)
return item
raise IndexError("pop from empty priority queue")
def __len__(self):
return len(self._heap)
def is_empty(self):
return len(self._heap) == 0
# 使用示例
pq = PriorityQueue()
pq.push('Task A', 3)
pq.push('Task B', 1) # 优先级最高
pq.push('Task C', 2)
print(pq.pop()) # 输出: Task B (优先级1)
print(pq.pop()) # 输出: Task C (优先级2)
print(pq.pop()) # 输出: Task A (优先级3)
4.2 大值优先队列
如果需要"优先级数值越大越优先",可以通过对优先级取负值实现。
class MaxPriorityQueue:
def __init__(self):
self._heap = []
def push(self, item, priority):
"""将元素以指定优先级入队(数值越大优先级越高)"""
heapq.heappush(self._heap, (-priority, item))
def pop(self):
priority, item = heapq.heappop(self._heap)
return item
# 使用示例
mpq = MaxPriorityQueue()
mpq.push('Low', 1)
mpq.push('High', 10)
mpq.push('Medium', 5)
print(mpq.pop()) # 输出: High (优先级10)
print(mpq.pop()) # 输出: Medium (优先级5)
print(mpq.pop()) # 输出: Low (优先级1)
4.3 支持优先级更新的优先队列
在实际应用中(如Dijkstra最短路径算法),经常需要更新已有元素的优先级。可以通过惰性删除(lazy deletion)策略实现。
import heapq
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any = field(compare=False)
class UpdatablePriorityQueue:
def __init__(self):
self._heap = []
self._entry_finder = {} # item -> 当前的优先级
self._REMOVED = object() # 标记已删除的条目
self._counter = 0
def add_or_update(self, item, priority):
"""添加新元素或更新已有元素的优先级"""
if item in self._entry_finder:
# 标记旧条目为已删除(惰性删除)
old_priority = self._entry_finder[item]
self._entry_finder[item] = (old_priority, self._REMOVED)
self._entry_finder[item] = priority
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
def pop(self):
"""弹出优先级最高的有效元素"""
while self._heap:
priority, counter, item = heapq.heappop(self._heap)
if self._entry_finder.get(item) == priority:
# 确认条目仍然有效
del self._entry_finder[item]
return item
raise IndexError("pop from empty priority queue")
def __len__(self):
return len(self._entry_finder)
惰性删除策略的核心思想是:不直接从堆中移除旧条目(这需要 O(n) 查找),而是将其标记为无效,在弹出时忽略。这种策略在 Dijikstra 算法和 A* 搜索中广泛使用。
五、堆排序算法
利用堆的数据结构特性,可以实现一种高效且稳定的排序算法——堆排序(Heap Sort)。堆排序的时间复杂度为 O(n log n),空间复杂度为 O(1)(原地排序)。
5.1 使用 heapq 实现堆排序
利用 heapq 模块,可以极其简洁地实现堆排序。
import heapq
def heapsort(iterable):
heap = list(iterable)
heapq.heapify(heap) # 建堆 O(n)
return [heapq.heappop(heap) for _ in range(len(heap))] # 逐一弹出 O(n log n)
# 测试
data = [6, 3, 8, 2, 9, 1, 5, 7, 4]
sorted_data = heapsort(data)
print(sorted_data) # 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 降序排序(取负值)
def heapsort_desc(iterable):
heap = [-x for x in iterable]
heapq.heapify(heap)
return [-heapq.heappop(heap) for _ in range(len(heap))]
print(heapsort_desc(data)) # 输出: [9, 8, 7, 6, 5, 4, 3, 2, 1]
5.2 堆排序完整实现(理解原理)
为了深入理解堆排序,下面给出不借助 heapq 模块、从零实现的堆排序。
def heapify_down(arr, n, i):
"""下沉操作:维护以 i 为根的子树的堆性质"""
smallest = i # 当前节点
left = 2 * i + 1 # 左子节点
right = 2 * i + 2 # 右子节点
# 找到当前节点、左子节点、右子节点中的最小值
if left < n and arr[left] < arr[smallest]:
smallest = left
if right < n and arr[right] < arr[smallest]:
smallest = right
# 如果最小值不是当前节点,则交换并继续下沉
if smallest != i:
arr[i], arr[smallest] = arr[smallest], arr[i]
heapify_down(arr, n, smallest)
def heap_sort(arr):
n = len(arr)
# 第一步:建堆(从最后一个非叶子节点开始下沉)
for i in range(n // 2 - 1, -1, -1):
heapify_down(arr, n, i)
# 第二步:逐个提取堆顶(最小值),放到数组末尾
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0] # 将堆顶交换到已排序区
heapify_down(arr, i, 0) # 维护剩余部分的堆性质
return arr
# 测试
data = [6, 3, 8, 2, 9, 1, 5, 7, 4]
print(heap_sort(data)) # 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9]
5.3 时间复杂度分析
阶段 时间复杂度 说明
建堆 O(n) Floyd算法,从 n/2 开始逐个下沉
排序 O(n log n) n 次弹出,每次 O(log n)
总复杂度 O(n log n) 平均/最好/最坏情况均为 O(n log n)
空间复杂度 O(1) 原地排序(仅需常数级额外空间)
特点总结 :堆排序不是稳定排序(相同元素的相对顺序可能改变),但具有始终如一的 O(n log n) 性能保证,不存在快速排序的退化问题,也不需要归并排序的额外空间。
六、实战应用
heapq 模块在实际工程和算法竞赛中有着广泛的应用场景。以下是几种经典实战问题的解法。
6.1 合并K个有序序列
经典面试题:给定 k 个已排序的链表/数组,将它们合并为一个有序序列。
import heapq
def merge_k_sorted_arrays(arrays):
"""
合并K个有序数组
使用堆维护每个数组的当前指针位置
"""
result = []
heap = []
# 初始化堆:将每个数组的第一个元素连同数组索引和元素索引入堆
for arr_idx, arr in enumerate(arrays):
if arr: # 数组非空
heapq.heappush(heap, (arr[0], arr_idx, 0))
while heap:
val, arr_idx, elem_idx = heapq.heappop(heap)
result.append(val)
# 如果该数组还有下一个元素,将其入堆
if elem_idx + 1 < len(arrays[arr_idx]):
next_val = arrays[arr_idx][elem_idx + 1]
heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1))
return result
# 测试
arrays = [
[1, 4, 7, 10],
[2, 5, 8, 11],
[3, 6, 9, 12]
]
merged = merge_k_sorted_arrays(arrays)
print(merged)
# 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
# 时间复杂度: O(N log K),其中 N 为总元素数,K 为数组个数
# 空间复杂度: O(K),用于堆存储
6.2 Top-K 问题
从海量数据中找出最大/最小的 K 个元素,是最常见的堆应用场景之一。
import heapq
import random
def find_top_k_largest(nums, k):
"""从数组中找出最大的 K 个元素"""
# 维护一个大小为 K 的最小堆
heap = nums[:k]
heapq.heapify(heap)
for num in nums[k:]:
if num > heap[0]: # 比堆顶大,替换堆顶
heapq.heapreplace(heap, num)
return heap # 堆中的 K 个元素即为最大的 K 个
def find_top_k_smallest(nums, k):
"""从数组中找出最小的 K 个元素(取负值法)"""
# 维护一个大小为 K 的最大堆(通过取负值实现)
heap = [-x for x in nums[:k]]
heapq.heapify(heap)
for num in nums[k:]:
if -num > heap[0]: # 当前元素比"最大堆"的堆顶(实际是最小值)小
heapq.heapreplace(heap, -num)
return [-x for x in heap]
# 测试
data = random.sample(range(1, 1001), 1000)
top10 = find_top_k_largest(data, 10)
print("Top 10 最大:", sorted(top10, reverse=True))
bottom5 = find_top_k_smallest(data, 5)
print("Top 5 最小:", sorted(bottom5))
# 时间复杂度: O(N log K),远优于 O(N log N) 的全排序
6.3 数据流中位数查找
使用两个堆(一个最大堆、一个最小堆)可以在 O(log n) 时间内从数据流中获取中位数。
import heapq
class MedianFinder:
"""数据流中位数查找器"""
def __init__(self):
self.small = [] # 最大堆(存较小的一半),Python heapq 不支持最大堆,取负存储
self.large = [] # 最小堆(存较大的一半)
def add_num(self, num):
"""添加一个数字到数据流中"""
# 先将元素推入 small(最大堆)
heapq.heappush(self.small, -num)
# 保证 small 中的所有元素 ≤ large 中的所有元素
if self.small and self.large and (-self.small[0] > self.large[0]):
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
# 平衡两个堆的大小
if len(self.small) > len(self.large) + 1:
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
elif len(self.large) > len(self.small) + 1:
val = heapq.heappop(self.large)
heapq.heappush(self.small, -val)
def find_median(self):
"""返回当前数据流的中位数"""
if len(self.small) > len(self.large):
return -self.small[0]
elif len(self.large) > len(self.small):
return self.large[0]
else:
return (-self.small[0] + self.large[0]) / 2.0
# 测试
mf = MedianFinder()
for num in [5, 2, 8, 1, 9, 3, 7]:
mf.add_num(num)
print(f"添加 {num} 后中位数: {mf.find_median()}")
# 输出:
# 添加 5 后中位数: 5
# 添加 2 后中位数: 3.5
# 添加 8 后中位数: 5
# 添加 1 后中位数: 3.5
# 添加 9 后中位数: 5
# 添加 3 后中位数: 4
# 添加 7 后中位数: 5
6.4 任务调度器
假设有一组任务和它们的优先级,模拟基于优先级的任务调度执行过程。
import heapq
import time
class TaskScheduler:
"""基于优先级的任务调度器"""
def __init__(self):
self._task_queue = []
self._task_id = 0
def add_task(self, name, priority, duration=1):
"""
添加任务
priority: 优先级(数值越低越优先执行)
duration: 任务执行时长
"""
heapq.heappush(self._task_queue,
(priority, self._task_id, name, duration))
self._task_id += 1
def run(self):
"""按优先级顺序执行所有任务"""
print("任务调度开始:")
print("-" * 40)
while self._task_queue:
priority, tid, name, duration = heapq.heappop(self._task_queue)
print(f"执行任务: {name} (优先级={priority}, 耗时={duration})")
print("-" * 40)
print("所有任务执行完毕")
def peek_next(self):
"""查看下一个待执行的任务(不移除)"""
if self._task_queue:
priority, tid, name, duration = self._task_queue[0]
return name, priority
return None
# 使用示例
scheduler = TaskScheduler()
scheduler.add_task("系统监控", 1, 2)
scheduler.add_task("用户请求C", 5, 1)
scheduler.add_task("用户请求A", 3, 1)
scheduler.add_task("日志写入", 4, 3)
scheduler.add_task("紧急告警", 0, 1) # 最高优先级
print(f"下一个任务: {scheduler.peek_next()}") # 输出: ('紧急告警', 0)
scheduler.run()
6.5 最大堆的变通实现
Python 的 heapq 只支持最小堆,但通过以下技巧可以实现最大堆:
import heapq
class MaxHeap:
"""通过取负值实现的最大堆"""
def __init__(self):
self._heap = []
def push(self, val):
heapq.heappush(self._heap, -val)
def pop(self):
return -heapq.heappop(self._heap)
def peek(self):
return -self._heap[0] if self._heap else None
def __len__(self):
return len(self._heap)
# 使用示例
max_heap = MaxHeap()
for v in [3, 1, 7, 5, 9]:
max_heap.push(v)
print(max_heap.pop()) # 输出: 9
print(max_heap.pop()) # 输出: 7
print(max_heap.peek()) # 输出: 5
# 对于自定义对象,可以在包装类中重写 __lt__
import heapq
@functools.total_ordering
class ReverseCompare:
def __init__(self, obj):
self.obj = obj
def __lt__(self, other):
return self.obj > other.obj # 反转比较
def __eq__(self, other):
return self.obj == other.obj
# 使用 ReverseCompare 包装的对象在堆中表现为最大堆
七、核心要点总结
7.1 性能复杂度速查表
操作 函数 时间复杂度
建堆 heapify() O(n)
入堆 heappush() O(log n)
出堆 heappop() O(log n)
替换堆顶 heapreplace() O(log n)
推入后弹出 heappushpop() O(log n)
取前K个 nlargest() / nsmallest() O(m log k)
合并有序序列 merge() O(总元素数)
访问堆顶(不弹出) heap[0] O(1)
7.2 heapq 使用限制与变通
仅支持最小堆 :heapq 原生只实现最小堆。需要最大堆时,可以通过取负值或自定义比较类实现
线程不安全 :heapq 操作不是原子的,多线程场景需加锁保护
无内置查找操作 :堆不提供 O(1) 查找任意元素的能力,查找需 O(n) 遍历
无内置删除操作 :删除任意元素需 O(n) 查找位置 + O(log n) 调整,建议使用惰性删除策略
自定义对象排序 :堆中存储自定义对象时,需要实现 __lt__ 方法以支持比较
7.3 最佳实践
1. 使用 heapify() 而非逐个 heappush 建堆:O(n) vs O(n log n)
2. 需要替换堆顶时优先使用 heapreplace() 而非先 pop 再 push
3. 处理 Top-K 问题时维护大小为 K 的堆,空间效率优于全排序
4. 优先队列的优先级更新使用惰性删除策略,避免 O(n) 删除操作
5. 数据流中位数使用最大堆 + 最小堆的双堆策略
6. 对于自定义类型的堆排序,利用 key 参数比修改 __lt__ 更简洁
7.4 适用场景一览
场景 推荐方案 示例
Top-K 最大元素 nlargest() 或小根堆 排行榜、热门推荐
Top-K 最小元素 nsmallest() 或大根堆 最低价格、最短路径
优先队列调度 heappush + heappop 任务调度、 Dijkstra
合并K个有序序列 merge() 或手动堆实现 多路归并、外排序
数据流中位数 最大堆 + 最小堆 实时统计、监控系统
动态维护窗口最值 懒删除堆 滑动窗口最大值
heapq 模块是 Python 标准库中被低估的宝藏之一。它虽然只提供了最基础的堆操作函数,却足以构建从任务调度到图算法、从数据流统计到海量数据 Top-K 的各类解决方案。深入理解 heapq,是掌握 Python 高效编程的重要一步。