heapq模块 — 堆队列算法

Python标准库精讲专题 · 数据结构与算法工具篇 · 掌握堆队列算法与优先队列

专题:Python标准库精讲系统学习

关键词:Python, 标准库, heapq, 堆, 优先队列, 堆排序, heappush, heappop, heapify, nlargest, nsmallest

一、堆数据结构概述

堆(Heap)是一种特殊的完全二叉树数据结构。Python的heapq模块实现的是最小堆(Min-Heap),即任何一个父节点的值都小于或等于其子节点的值。这意味着堆顶(heap[0])始终是整个堆中最小的元素。

1.1 堆的核心性质

1.2 最小堆与最大堆

heapq模块原生仅支持最小堆。如果需要最大堆效果,可以通过取负值变通实现(将在后续章节详述)。堆的高度为 O(log n),所有核心操作的复杂度均为 O(log n) 或 O(n)。

堆结构广泛应用于优先队列、任务调度、Top-K问题、中位数查找等场景,是算法面试中的高频数据结构。

二、heapq核心函数

heapq模块提供了完整的堆操作函数集。以下逐一介绍每个核心函数的用法。

2.1 heappush(heap, item) — 入堆

将元素 item 添加到堆中,并保持堆的不变性。时间复杂度 O(log n)。

import heapq heap = [] heapq.heappush(heap, 5) heapq.heappush(heap, 3) heapq.heappush(heap, 7) heapq.heappush(heap, 1) heapq.heappush(heap, 9) print(heap) # 输出: [1, 3, 7, 5, 9]

2.2 heappop(heap) — 出堆

弹出并返回堆中最小的元素(即 heap[0]),同时保持堆的不变性。时间复杂度 O(log n)。

smallest = heapq.heappop(heap) print(smallest) # 输出: 1 print(heap) # 输出: [3, 5, 7, 9]

2.3 heapify(x) — 建堆

将普通列表 x 原地转换为一个合法的堆。时间复杂度 O(n)(Floyd建堆算法,比逐个插入的 O(n log n) 更优)。

arr = [9, 7, 3, 5, 1, 8, 6] heapq.heapify(arr) print(arr) # 输出: [1, 5, 3, 9, 7, 8, 6] # 堆顶 1 为最小值

2.4 heappushpop(heap, item) — 推入并弹出

先将 item 推入堆,然后弹出并返回堆中的最小值。该函数组合操作比先 heappush 再 heappop 更高效。时间复杂度 O(log n)。

# 当新元素比堆顶大时,等价于替换堆顶 result = heapq.heappushpop(heap, 2) print(result) # 输出: 2(原堆中最小值 1 已被弹出,2 入堆后成为新的最小值需重新调整) # 实际上会先推入再弹出,结果取决于堆的当前状态

2.5 heapreplace(heap, item) — 弹出并推入

先弹出堆中最小值,再将 item 推入堆。与 heappushpop 的区别在于操作顺序不同。时间复杂度 O(log n)。

# heapreplace 先 pop 再 push # 适用于"替换堆顶元素"的场景 heap = [1, 5, 3, 9, 7] heapq.heapify(heap) old_min = heapq.heapreplace(heap, 4) print(old_min) # 输出: 1 print(heap) # 输出: [3, 4, 5, 9, 7]

2.6 merge(*iterables) — 合并有序序列

将多个已排序的输入归并为一个惰性求值的迭代器,内部基于堆实现。时间复杂度取决于输入规模。

import heapq list1 = [1, 3, 5, 7] list2 = [2, 4, 6, 8] list3 = [0, 9, 10] merged = heapq.merge(list1, list2, list3) print(list(merged)) # 输出: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

2.7 函数汇总

函数描述时间复杂度
heappush(heap, item)将元素入堆O(log n)
heappop(heap)弹出最小元素O(log n)
heapify(x)列表原地建堆O(n)
heappushpop(heap, item)推入后再弹出最小值O(log n)
heapreplace(heap, item)弹出最小值后再推入O(log n)
merge(*iterables)合并多个有序序列O(总元素数)
nlargest(n, iterable, key=None)获取前n个最大元素O(n log k)
nsmallest(n, iterable, key=None)获取前n个最小元素O(n log k)

三、nlargest与nsmallest

nlargest 和 nsmallest 是 heapq 中两个非常实用的高阶函数,专门用于从大型数据集中高效获取前 n 个最大或最小元素。

3.1 基本语法

heapq.nlargest(n, iterable, key=None) heapq.nsmallest(n, iterable, key=None)

3.2 基础用法

import heapq numbers = [3, 7, 1, 9, 4, 6, 8, 2, 5] # 获取最大的3个元素 top3 = heapq.nlargest(3, numbers) print(top3) # 输出: [9, 8, 7] # 获取最小的3个元素 bottom3 = heapq.nsmallest(3, numbers) print(bottom3) # 输出: [1, 2, 3]

3.3 使用 key 参数

key 参数允许基于特定的属性或函数进行排序,适用于处理复杂数据结构。

students = [ {'name': 'Alice', 'score': 88}, {'name': 'Bob', 'score': 95}, {'name': 'Charlie', 'score': 72}, {'name': 'David', 'score': 91}, {'name': 'Eve', 'score': 85}, ] # 获取成绩最高的3名学生 top_students = heapq.nlargest(3, students, key=lambda s: s['score']) for s in top_students: print(f"{s['name']}: {s['score']}") # 输出: # Bob: 95 # David: 91 # Alice: 88 # 获取成绩最低的2名学生 bottom_students = heapq.nsmallest(2, students, key=lambda s: s['score'])

3.4 性能分析

nlargest 和 nsmallest 内部使用大小为 n 的堆,时间复杂度为 O(m log n)(m 为总数据量,n 为需要返回的元素个数)。

方法时间复杂度空间复杂度适用场景
sorted(iterable)[:n]O(m log m)O(m)数据量小,需要全排序
heapq.nlargest(n, iter)O(m log n)O(n)数据量大,n 远小于 m
heapq.nsmallest(n, iter)O(m log n)O(n)数据量大,n 远小于 m
max(iterable) / min(iterable)O(m)O(1)只需最大值或最小值 (n=1)

经验法则:当 n=1 时,直接使用 min() 或 max() 效率最高;当 n 较小时(相对于总数据量),使用 nlargest/nsmallest;当需要完整排序时,直接使用 sorted()。

四、优先队列实现

堆是实现优先队列(Priority Queue)最经典的数据结构。利用 heapq 可以轻松构建高效的优先队列。

4.1 基础优先队列

最简单的优先队列使用 (priority, item) 元组作为堆元素,heapq 会自动按元组的第一个元素(优先级数值)排序。

import heapq class PriorityQueue: def __init__(self): self._heap = [] def push(self, item, priority): """将元素以指定优先级入队(数值越小优先级越高)""" heapq.heappush(self._heap, (priority, item)) def pop(self): """弹出优先级最高的元素""" if self._heap: priority, item = heapq.heappop(self._heap) return item raise IndexError("pop from empty priority queue") def __len__(self): return len(self._heap) def is_empty(self): return len(self._heap) == 0 # 使用示例 pq = PriorityQueue() pq.push('Task A', 3) pq.push('Task B', 1) # 优先级最高 pq.push('Task C', 2) print(pq.pop()) # 输出: Task B (优先级1) print(pq.pop()) # 输出: Task C (优先级2) print(pq.pop()) # 输出: Task A (优先级3)

4.2 大值优先队列

如果需要"优先级数值越大越优先",可以通过对优先级取负值实现。

class MaxPriorityQueue: def __init__(self): self._heap = [] def push(self, item, priority): """将元素以指定优先级入队(数值越大优先级越高)""" heapq.heappush(self._heap, (-priority, item)) def pop(self): priority, item = heapq.heappop(self._heap) return item # 使用示例 mpq = MaxPriorityQueue() mpq.push('Low', 1) mpq.push('High', 10) mpq.push('Medium', 5) print(mpq.pop()) # 输出: High (优先级10) print(mpq.pop()) # 输出: Medium (优先级5) print(mpq.pop()) # 输出: Low (优先级1)

4.3 支持优先级更新的优先队列

在实际应用中(如Dijkstra最短路径算法),经常需要更新已有元素的优先级。可以通过惰性删除(lazy deletion)策略实现。

import heapq from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any = field(compare=False) class UpdatablePriorityQueue: def __init__(self): self._heap = [] self._entry_finder = {} # item -> 当前的优先级 self._REMOVED = object() # 标记已删除的条目 self._counter = 0 def add_or_update(self, item, priority): """添加新元素或更新已有元素的优先级""" if item in self._entry_finder: # 标记旧条目为已删除(惰性删除) old_priority = self._entry_finder[item] self._entry_finder[item] = (old_priority, self._REMOVED) self._entry_finder[item] = priority heapq.heappush(self._heap, (priority, self._counter, item)) self._counter += 1 def pop(self): """弹出优先级最高的有效元素""" while self._heap: priority, counter, item = heapq.heappop(self._heap) if self._entry_finder.get(item) == priority: # 确认条目仍然有效 del self._entry_finder[item] return item raise IndexError("pop from empty priority queue") def __len__(self): return len(self._entry_finder)

惰性删除策略的核心思想是:不直接从堆中移除旧条目(这需要 O(n) 查找),而是将其标记为无效,在弹出时忽略。这种策略在 Dijikstra 算法和 A* 搜索中广泛使用。

五、堆排序算法

利用堆的数据结构特性,可以实现一种高效且稳定的排序算法——堆排序(Heap Sort)。堆排序的时间复杂度为 O(n log n),空间复杂度为 O(1)(原地排序)。

5.1 使用 heapq 实现堆排序

利用 heapq 模块,可以极其简洁地实现堆排序。

import heapq def heapsort(iterable): heap = list(iterable) heapq.heapify(heap) # 建堆 O(n) return [heapq.heappop(heap) for _ in range(len(heap))] # 逐一弹出 O(n log n) # 测试 data = [6, 3, 8, 2, 9, 1, 5, 7, 4] sorted_data = heapsort(data) print(sorted_data) # 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9] # 降序排序(取负值) def heapsort_desc(iterable): heap = [-x for x in iterable] heapq.heapify(heap) return [-heapq.heappop(heap) for _ in range(len(heap))] print(heapsort_desc(data)) # 输出: [9, 8, 7, 6, 5, 4, 3, 2, 1]

5.2 堆排序完整实现(理解原理)

为了深入理解堆排序,下面给出不借助 heapq 模块、从零实现的堆排序。

def heapify_down(arr, n, i): """下沉操作:维护以 i 为根的子树的堆性质""" smallest = i # 当前节点 left = 2 * i + 1 # 左子节点 right = 2 * i + 2 # 右子节点 # 找到当前节点、左子节点、右子节点中的最小值 if left < n and arr[left] < arr[smallest]: smallest = left if right < n and arr[right] < arr[smallest]: smallest = right # 如果最小值不是当前节点,则交换并继续下沉 if smallest != i: arr[i], arr[smallest] = arr[smallest], arr[i] heapify_down(arr, n, smallest) def heap_sort(arr): n = len(arr) # 第一步:建堆(从最后一个非叶子节点开始下沉) for i in range(n // 2 - 1, -1, -1): heapify_down(arr, n, i) # 第二步:逐个提取堆顶(最小值),放到数组末尾 for i in range(n - 1, 0, -1): arr[0], arr[i] = arr[i], arr[0] # 将堆顶交换到已排序区 heapify_down(arr, i, 0) # 维护剩余部分的堆性质 return arr # 测试 data = [6, 3, 8, 2, 9, 1, 5, 7, 4] print(heap_sort(data)) # 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9]

5.3 时间复杂度分析

阶段时间复杂度说明
建堆O(n)Floyd算法,从 n/2 开始逐个下沉
排序O(n log n)n 次弹出,每次 O(log n)
总复杂度O(n log n)平均/最好/最坏情况均为 O(n log n)
空间复杂度O(1)原地排序(仅需常数级额外空间)

特点总结:堆排序不是稳定排序(相同元素的相对顺序可能改变),但具有始终如一的 O(n log n) 性能保证,不存在快速排序的退化问题,也不需要归并排序的额外空间。

六、实战应用

heapq 模块在实际工程和算法竞赛中有着广泛的应用场景。以下是几种经典实战问题的解法。

6.1 合并K个有序序列

经典面试题:给定 k 个已排序的链表/数组,将它们合并为一个有序序列。

import heapq def merge_k_sorted_arrays(arrays): """ 合并K个有序数组 使用堆维护每个数组的当前指针位置 """ result = [] heap = [] # 初始化堆:将每个数组的第一个元素连同数组索引和元素索引入堆 for arr_idx, arr in enumerate(arrays): if arr: # 数组非空 heapq.heappush(heap, (arr[0], arr_idx, 0)) while heap: val, arr_idx, elem_idx = heapq.heappop(heap) result.append(val) # 如果该数组还有下一个元素,将其入堆 if elem_idx + 1 < len(arrays[arr_idx]): next_val = arrays[arr_idx][elem_idx + 1] heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1)) return result # 测试 arrays = [ [1, 4, 7, 10], [2, 5, 8, 11], [3, 6, 9, 12] ] merged = merge_k_sorted_arrays(arrays) print(merged) # 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] # 时间复杂度: O(N log K),其中 N 为总元素数,K 为数组个数 # 空间复杂度: O(K),用于堆存储

6.2 Top-K 问题

从海量数据中找出最大/最小的 K 个元素,是最常见的堆应用场景之一。

import heapq import random def find_top_k_largest(nums, k): """从数组中找出最大的 K 个元素""" # 维护一个大小为 K 的最小堆 heap = nums[:k] heapq.heapify(heap) for num in nums[k:]: if num > heap[0]: # 比堆顶大,替换堆顶 heapq.heapreplace(heap, num) return heap # 堆中的 K 个元素即为最大的 K 个 def find_top_k_smallest(nums, k): """从数组中找出最小的 K 个元素(取负值法)""" # 维护一个大小为 K 的最大堆(通过取负值实现) heap = [-x for x in nums[:k]] heapq.heapify(heap) for num in nums[k:]: if -num > heap[0]: # 当前元素比"最大堆"的堆顶(实际是最小值)小 heapq.heapreplace(heap, -num) return [-x for x in heap] # 测试 data = random.sample(range(1, 1001), 1000) top10 = find_top_k_largest(data, 10) print("Top 10 最大:", sorted(top10, reverse=True)) bottom5 = find_top_k_smallest(data, 5) print("Top 5 最小:", sorted(bottom5)) # 时间复杂度: O(N log K),远优于 O(N log N) 的全排序

6.3 数据流中位数查找

使用两个堆(一个最大堆、一个最小堆)可以在 O(log n) 时间内从数据流中获取中位数。

import heapq class MedianFinder: """数据流中位数查找器""" def __init__(self): self.small = [] # 最大堆(存较小的一半),Python heapq 不支持最大堆,取负存储 self.large = [] # 最小堆(存较大的一半) def add_num(self, num): """添加一个数字到数据流中""" # 先将元素推入 small(最大堆) heapq.heappush(self.small, -num) # 保证 small 中的所有元素 ≤ large 中的所有元素 if self.small and self.large and (-self.small[0] > self.large[0]): val = -heapq.heappop(self.small) heapq.heappush(self.large, val) # 平衡两个堆的大小 if len(self.small) > len(self.large) + 1: val = -heapq.heappop(self.small) heapq.heappush(self.large, val) elif len(self.large) > len(self.small) + 1: val = heapq.heappop(self.large) heapq.heappush(self.small, -val) def find_median(self): """返回当前数据流的中位数""" if len(self.small) > len(self.large): return -self.small[0] elif len(self.large) > len(self.small): return self.large[0] else: return (-self.small[0] + self.large[0]) / 2.0 # 测试 mf = MedianFinder() for num in [5, 2, 8, 1, 9, 3, 7]: mf.add_num(num) print(f"添加 {num} 后中位数: {mf.find_median()}") # 输出: # 添加 5 后中位数: 5 # 添加 2 后中位数: 3.5 # 添加 8 后中位数: 5 # 添加 1 后中位数: 3.5 # 添加 9 后中位数: 5 # 添加 3 后中位数: 4 # 添加 7 后中位数: 5

6.4 任务调度器

假设有一组任务和它们的优先级,模拟基于优先级的任务调度执行过程。

import heapq import time class TaskScheduler: """基于优先级的任务调度器""" def __init__(self): self._task_queue = [] self._task_id = 0 def add_task(self, name, priority, duration=1): """ 添加任务 priority: 优先级(数值越低越优先执行) duration: 任务执行时长 """ heapq.heappush(self._task_queue, (priority, self._task_id, name, duration)) self._task_id += 1 def run(self): """按优先级顺序执行所有任务""" print("任务调度开始:") print("-" * 40) while self._task_queue: priority, tid, name, duration = heapq.heappop(self._task_queue) print(f"执行任务: {name} (优先级={priority}, 耗时={duration})") print("-" * 40) print("所有任务执行完毕") def peek_next(self): """查看下一个待执行的任务(不移除)""" if self._task_queue: priority, tid, name, duration = self._task_queue[0] return name, priority return None # 使用示例 scheduler = TaskScheduler() scheduler.add_task("系统监控", 1, 2) scheduler.add_task("用户请求C", 5, 1) scheduler.add_task("用户请求A", 3, 1) scheduler.add_task("日志写入", 4, 3) scheduler.add_task("紧急告警", 0, 1) # 最高优先级 print(f"下一个任务: {scheduler.peek_next()}") # 输出: ('紧急告警', 0) scheduler.run()

6.5 最大堆的变通实现

Python 的 heapq 只支持最小堆,但通过以下技巧可以实现最大堆:

import heapq class MaxHeap: """通过取负值实现的最大堆""" def __init__(self): self._heap = [] def push(self, val): heapq.heappush(self._heap, -val) def pop(self): return -heapq.heappop(self._heap) def peek(self): return -self._heap[0] if self._heap else None def __len__(self): return len(self._heap) # 使用示例 max_heap = MaxHeap() for v in [3, 1, 7, 5, 9]: max_heap.push(v) print(max_heap.pop()) # 输出: 9 print(max_heap.pop()) # 输出: 7 print(max_heap.peek()) # 输出: 5 # 对于自定义对象,可以在包装类中重写 __lt__ import heapq @functools.total_ordering class ReverseCompare: def __init__(self, obj): self.obj = obj def __lt__(self, other): return self.obj > other.obj # 反转比较 def __eq__(self, other): return self.obj == other.obj # 使用 ReverseCompare 包装的对象在堆中表现为最大堆

七、核心要点总结

7.1 性能复杂度速查表

操作函数时间复杂度
建堆heapify()O(n)
入堆heappush()O(log n)
出堆heappop()O(log n)
替换堆顶heapreplace()O(log n)
推入后弹出heappushpop()O(log n)
取前K个nlargest() / nsmallest()O(m log k)
合并有序序列merge()O(总元素数)
访问堆顶(不弹出)heap[0]O(1)

7.2 heapq 使用限制与变通

7.3 最佳实践

1. 使用 heapify() 而非逐个 heappush 建堆:O(n) vs O(n log n)

2. 需要替换堆顶时优先使用 heapreplace() 而非先 pop 再 push

3. 处理 Top-K 问题时维护大小为 K 的堆,空间效率优于全排序

4. 优先队列的优先级更新使用惰性删除策略,避免 O(n) 删除操作

5. 数据流中位数使用最大堆 + 最小堆的双堆策略

6. 对于自定义类型的堆排序,利用 key 参数比修改 __lt__ 更简洁

7.4 适用场景一览

场景推荐方案示例
Top-K 最大元素nlargest() 或小根堆排行榜、热门推荐
Top-K 最小元素nsmallest() 或大根堆最低价格、最短路径
优先队列调度heappush + heappop任务调度、 Dijkstra
合并K个有序序列merge() 或手动堆实现多路归并、外排序
数据流中位数最大堆 + 最小堆实时统计、监控系统
动态维护窗口最值懒删除堆滑动窗口最大值

heapq 模块是 Python 标准库中被低估的宝藏之一。它虽然只提供了最基础的堆操作函数,却足以构建从任务调度到图算法、从数据流统计到海量数据 Top-K 的各类解决方案。深入理解 heapq,是掌握 Python 高效编程的重要一步。