← 返回数据结构与算法目录
← 返回学习笔记首页
堆(Heap)与优先队列
数据结构与算法专题 · 二叉堆实现与堆操作详解
专题: 数据结构与算法系统学习
关键词: 数据结构, 算法, 堆, Heap, 二叉堆, 优先队列, heapq, 堆排序, TopK, sift_up, sift_down
一、堆概述
堆(Heap)是一种特殊的完全二叉树数据结构,它满足"堆属性":每个节点的值都大于等于(最大堆)或小于等于(最小堆)其子节点的值。堆是优先队列最常见的实现方式,也是堆排序算法的核心数据结构。堆的重要特性在于,它能够在O(log n)时间内完成插入和删除操作,同时又能在O(1)时间内获取最值,这种高效性使其成为解决许多实际问题的首选数据结构。
堆分为两种基本类型:最大堆(Max-Heap) 和最小堆(Min-Heap) 。在最大堆中,根节点是堆中最大的元素,每个父节点的值都大于或等于其子节点的值;在最小堆中,根节点是最小的元素,每个父节点的值都小于或等于其子节点的值。这两种堆结构在逻辑上是对称的,实现时只需调整比较符号即可相互转换。
核心堆属性:
最大堆:arr[parent] >= arr[child](每个父节点不小于其子节点)
最小堆:arr[parent] <= arr[child](每个父节点不大于其子节点)
完全二叉树性质:除最后一层外,其他层节点数均为最大值;最后一层节点从左向右排列。
堆的两种类型
最大堆(Max-Heap)
根节点值最大,常用于:
堆排序升序排列
获取前k个最小元素
任务调度(优先级最高优先)
最小堆(Min-Heap)
根节点值最小,常用于:
堆排序降序排列
获取前k个最大元素
Dijkstra最短路径算法
堆的应用场景: 堆广泛应用于优先队列、堆排序、TopK问题(求数据流中第k大/小的元素)、中位数查找、任务调度、Dijkstra最短路径算法、Huffman编码、合并K个有序链表等场景。凡是需要频繁获取最值且动态增删数据的场景,堆都是最合适的结构。
二、二叉堆的存储结构
二叉堆(Binary Heap)使用数组 进行存储,不需要显式的节点对象和指针链接。这种实现方式利用了完全二叉树的特性:对于一个长度为n的数组,我们可以通过下标运算快速定位任意节点的父节点和子节点。数组实现堆的最大优势在于空间效率高(不需要存储指针)和缓存友好(连续内存访问)。
父子下标关系
假设堆的根节点存储在数组索引0 的位置,对于任意索引i处的节点:
父节点索引: parent(i) = (i - 1) // 2
左子节点索引: left_child(i) = 2 * i + 1
右子节点索引: right_child(i) = 2 * i + 2
def parent (i):
return (i - 1 ) // 2
def left_child (i):
return 2 * i + 1
def right_child (i):
return 2 * i + 2
索引方式对比: 如果根节点从索引1开始存储(索引0留空),公式变为:parent(i)=i//2,left(i)=2*i,right(i)=2*i+1。两种方式各有优劣,本文采用0-based索引,与Python列表和heapq模块保持一致。
数组表示的直观示例
以下是一个最大堆的数组表示,堆中元素为 [50, 30, 40, 10, 20, 35]:
根节点(索引0):50
索引1(左子):30,其左右子节点为 10(索引3)和 20(索引4)
索引2(右子):40,其左子节点为 35(索引5)
索引3-5:叶子节点 10, 20, 35
对应的完全二叉树结构为:根50下面挂30和40;30下面挂10和20;40下面挂35(左子),无右子。每个父节点均大于等于其子节点,满足最大堆属性。
# 数组表示的最大堆示例
# 树形结构:
# 50
# / \
# 30 40
# / \ /
# 10 20 35
#
# 数组表示: [50, 30, 40, 10, 20, 35]
max_heap = [50 , 30 , 40 , 10 , 20 , 35 ]
# 验证父节点 >= 子节点
assert 50 >= 30 and 50 >= 40
assert 30 >= 10 and 30 >= 20
assert 40 >= 35
三、核心操作详解
二叉堆的核心操作包括sift_up(上浮)、sift_down(下沉)、heapify(建堆)、insert(插入)和extract(提取最值)。这些操作共同构成了堆数据结构的基本行为。理解sift_up和sift_down是实现所有其他操作的基础,它们分别对应堆属性的向上恢复和向下恢复过程。
1. sift_up(上浮操作)
sift_up用于在堆的末尾插入新元素后恢复堆属性。新元素从底部向上"冒泡":如果它大于(最大堆)或小于(最小堆)其父节点,就与父节点交换位置,重复此过程直到堆属性恢复。sift_up操作的时间复杂度为O(log n),因为它最多从叶子节点上升到根节点,路径长度等于树的高度。
def sift_up (heap, i):
"""最大堆的上浮操作:将索引i处的元素上浮到正确位置"""
while i > 0 :
p = (i - 1 ) // 2 # 父节点索引
if heap[i] <= heap[p]:
break
heap[i], heap[p] = heap[p], heap[i] # 与父节点交换
i = p
def sift_up_min (heap, i):
"""最小堆的上浮操作"""
while i > 0 :
p = (i - 1 ) // 2
if heap[i] >= heap[p]:
break
heap[i], heap[p] = heap[p], heap[i]
i = p
2. sift_down(下沉操作)
sift_down用于从堆顶移除元素后或建堆过程中恢复堆属性。当前节点从顶部向"下沉":比较当前节点与左右子节点,如果小于子节点中的较大者(最大堆),则与较大子节点交换,重复直到堆属性恢复。sift_down的时间复杂度同样为O(log n)。这是堆中最重要的操作之一,因为heapify和extract操作都依赖于它。
def sift_down (heap, n, i):
"""最大堆的下沉操作:将索引i处的元素下沉到正确位置
参数:
heap: 堆数组
n: 堆的有效大小(实际可用的元素个数)
i: 当前需要下沉的节点索引
"""
while True :
largest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and heap[left] > heap[largest]:
largest = left
if right < n and heap[right] > heap[largest]:
largest = right
if largest == i:
break # 已到位
heap[i], heap[largest] = heap[largest], heap[i]
i = largest
3. heapify(建堆操作)
heapify是将一个无序数组原地转换为堆的过程。Floyd提出的线性建堆算法从最后一个非叶子节点开始向前遍历,对每个节点执行sift_down操作。该算法的时间复杂度为O(n),而非直观的O(n log n),因为大多数节点位于靠近底层的层次,它们的下沉深度较浅。
def heapify (arr):
"""将无序数组原地转换为最大堆,O(n)时间复杂度"""
n = len (arr)
# 从最后一个非叶子节点开始下沉
# 最后一个非叶子节点的索引为 n//2 - 1
for i in range (n // 2 - 1 , -1 , -1 ):
sift_down (arr, n, i)
return arr
# 建堆过程演示
arr = [10 , 20 , 15 , 30 , 40 , 5 , 25 ]
heapify (arr)
print (arr) # 输出: [40, 30, 25, 20, 10, 5, 15]
建堆时间复杂度O(n)的数学证明: 设树高为h(根为第0层),第k层有2^k个节点,每个节点下沉的最大深度为h-k。总比较次数为sum(k=0 to h) 2^k * (h-k) = 2^(h+1) - h - 2 = O(n)。这意味着我们可以在线性时间内将任意无序数组转换为堆,这是堆数据结构的重要优势。
4. insert(插入操作)
插入操作是堆的动态使用中最常见的操作之一。新元素首先被追加到数组末尾(保证完全二叉树性质不变),然后通过sift_up操作将其"上浮"到正确位置,恢复堆属性。单次插入的时间复杂度为O(log n)。
def heappush_max (heap, val):
"""向最大堆中插入新元素"""
heap.append(val)
sift_up (heap, len (heap) - 1 )
# 使用示例
max_heap = [40 , 30 , 25 , 20 , 10 , 5 , 15 ]
heappush_max (max_heap, 35 )
print (max_heap) # 输出: [40, 35, 25, 30, 10, 5, 15, 20]
# 插入过程: 35追加到末尾,然后与父节点20比较 → 交换,再与父节点30比较 → 交换,到位
5. extract(提取最值操作)
提取最值操作(extract_max / extract_min)返回并移除堆中的根节点(最大值或最小值)。具体过程为:将根节点与最后一个叶子节点交换,删除最后一个叶子节点(即原来的根节点),然后从新的根节点开始执行sift_down恢复堆属性。该操作时间复杂度为O(log n)。
def heappop_max (heap):
"""从最大堆中取出并移除最大值"""
if not heap:
raise IndexError ("堆为空" )
n = len (heap)
heap[0 ], heap[n - 1 ] = heap[n - 1 ], heap[0 ] # 根与最后一个叶子交换
max_val = heap.pop() # 移除最大值
sift_down (heap, len (heap), 0 ) # 新根下沉
return max_val
# 使用示例
max_heap = [40 , 30 , 25 , 20 , 10 , 5 , 15 ]
max_val = heappop_max (max_heap)
print (max_val) # 输出: 40
print (max_heap) # 输出: [30, 20, 25, 15, 10, 5]
6. peek(查看最值)
peek操作返回根节点的值但不删除它。这是堆最基本的查询操作,时间复杂度为O(1),只需返回数组的第一个元素即可。
def peek_max (heap):
return heap[0 ] if heap else None
四、Python heapq模块
Python标准库提供了heapq模块,实现了最小堆 的所有基本操作。heapq模块用列表作为底层存储,所有操作都是原地进行的(in-place)。heapq不提供最大堆的直接支持,但可以通过存储负值或自定义比较器来实现最大堆的效果。
heapq四大核心函数
import heapq
# 1. heapify:将列表原地转换为最小堆
arr = [10 , 3 , 5 , 1 , 7 , 9 ]
heapq.heapify(arr)
print (arr) # 输出: [1, 3, 5, 10, 7, 9]
# 2. heappush:向堆中插入元素
heapq.heappush(arr, 2 )
print (arr) # 输出: [1, 3, 2, 10, 7, 9, 5]
# 3. heappop:弹出堆顶最小值
min_val = heapq.heappop(arr)
print (min_val) # 输出: 1
print (arr) # 输出: [2, 3, 5, 10, 7, 9]
# 4. heappushpop:先push再pop,比单独调用更高效
result = heapq.heappushpop(arr, 4 )
print (result) # 输出: 2(原来的最小值)
print (arr) # 输出: [3, 4, 5, 10, 7, 9]
最大堆的实现技巧
Python的heapq只支持最小堆,但实现最大堆有三种常用方式:存储负值、存储元组自定义比较、或封装自定义类。其中存储负值是最简单直接的方式。
# 方式一:存储负值实现最大堆
arr = [10 , 3 , 5 , 1 , 7 ]
max_heap = [-x for x in arr]
heapq.heapify(max_heap)
print (-max_heap[0 ]) # 输出: 10(最大值)
# 方式二:存储元组(优先级, 元素)实现带优先级的最大堆
# 优先级取负值即可将最小堆变为按优先级降序
tasks = []
heapq.heappush(tasks, (-3 , "低优先级任务" ))
heapq.heappush(tasks, (-10 , "高优先级任务" ))
heapq.heappush(tasks, (-5 , "中优先级任务" ))
while tasks:
priority, task = heapq.heappop(tasks)
print (f"执行任务: {task}, 优先级: {-priority}" )
实用建议: 在算法竞赛和面试中,优先使用heapq模块而非手写堆实现,因为heapq经过高度优化,性能远超手写版本。只有在需要自定义堆操作(如修改堆中任意元素的优先级)时才考虑手写实现。
五、堆排序
堆排序(Heap Sort)是一种基于堆数据结构的比较排序算法。它的核心思想分为两个阶段:首先利用heapify将无序数组构建为最大堆(O(n)),然后反复将堆顶元素(最大值)与末尾元素交换,堆大小减1并对新堆顶执行sift_down恢复堆属性(O(log n)),重复n-1次即可得到升序排列的数组。总时间复杂度为O(n log n),空间复杂度为O(1)。
def heap_sort (arr):
"""堆排序(原地排序,升序)"""
n = len (arr)
# 阶段1:建堆 O(n)
for i in range (n // 2 - 1 , -1 , -1 ):
sift_down (arr, n, i)
# 阶段2:排序 O(n log n)
for i in range (n - 1 , 0 , -1 ):
# 将堆顶(最大值)交换到数组末尾
arr[0 ], arr[i] = arr[i], arr[0 ]
# 对剩余元素重建堆
sift_down (arr, i, 0 )
return arr
# 利用Python内置heapq实现堆排序(更简洁)
def heap_sort_pythonic (arr):
"""使用heapq实现堆排序(非原地)"""
heapq.heapify(arr) # 建最小堆
return [heapq.heappop(arr) for _ in range (len (arr))]
# 测试
test_arr = [64 , 34 , 25 , 12 , 22 , 11 , 90 ]
heap_sort (test_arr)
print (test_arr) # 输出: [11, 12, 22, 25, 34, 64, 90]
排序算法对比: 堆排序在三种O(n log n)排序算法中有独特优势。与快速排序相比,堆排序的最坏时间复杂度有保证(始终为O(n log n),而快排在极端情况下退化为O(n^2))。与归并排序相比,堆排序是原地排序,空间复杂度O(1)远优于归并排序的O(n)。但堆排序的缺点是缓存不友好(数组访问模式不连续),实际运行速度通常慢于优化后的快速排序。
六、优先队列
优先队列(Priority Queue)是一种特殊的队列,其中每个元素都有一个"优先级",出队时优先级最高的元素最先被移除。堆是实现优先队列最高效的数据结构之一。在Python中,可以直接使用heapq模块来构建优先队列。优先队列的核心操作包括push(入队)、pop(出队)、peek(查看队首)和update(更新优先级)。
基础优先队列实现
class PriorityQueue :
"""基于最小堆的优先队列实现(优先级值越小,优先级越高)"""
def __init__ (self):
self._heap = []
self._index = 0 # 用于相同优先级时保持FIFO顺序
def push (self, item, priority):
"""入队:插入元素 item,优先级为 priority"""
heapq.heappush(self._heap, (priority, self._index, item))
self._index += 1
def pop (self):
"""出队:返回并移除优先级最高的元素"""
if not self._heap:
raise IndexError ("队列为空" )
priority, _, item = heapq.heappop(self._heap)
return item, priority
def peek (self):
if not self._heap:
return None
return self._heap[0 ][2 ]
def is_empty (self):
return len (self._heap) == 0
def __len__ (self):
return len (self._heap)
支持优先级更新的优先队列
在实际应用中(如Dijkstra算法),我们经常需要更新队列中已有元素的优先级。为了实现高效的更新操作,我们可以使用"惰性删除"策略:不直接删除旧条目,而是插入一条新的(优先级, 元素)元组,并在弹出时通过标记数组忽略无效条目。
class MutablePriorityQueue :
"""支持优先级更新的优先队列(使用惰性删除)"""
def __init__ (self):
self._heap = []
self._entries = {} # item -> priority 映射表
def push (self, item, priority):
"""插入或更新元素的优先级"""
if item in self._entries:
# 已有条目,检查是否需要更新
if self._entries[item] == priority:
return # 优先级未变,无需操作
self._entries[item] = priority
heapq.heappush(self._heap, (priority, item))
def pop (self):
"""弹出当前优先级最高的元素(惰性删除法)"""
while self._heap:
priority, item = heapq.heappop(self._heap)
# 检查该条目是否为最新版本
if item in self._entries and self._entries[item] == priority:
del self._entries[item]
return item, priority
raise KeyError ("队列为空" )
def update (self, item, new_priority):
"""更新某个元素的优先级"""
if item not in self._entries or self._entries[item] != new_priority:
self._entries[item] = new_priority
heapq.heappush(self._heap, (new_priority, item))
惰性删除原理: 当需要更新已有元素的优先级时,我们并不直接修改堆中已有的条目(因为堆不支持高效的任意位置修改),而是将新条目直接push进堆。弹出时,通过_entries字典检查弹出的条目是否是最新版本,如果不是则丢弃(即"惰性删除")。这种策略以牺牲少量内存为代价(堆中可能存有已过期的条目),换取了O(log n)的更新操作效率。
七、时间复杂度分析
堆的各种操作时间复杂度是理解和应用堆的关键。以下汇总了二叉堆各操作的时间复杂度,并与完全无序数组和有序数组进行对比:
操作
二叉堆
无序数组
有序数组
插入(insert)
O(log n)
O(1)
O(n)
获取最值(peek)
O(1)
O(n)
O(1)
提取最值(extract)
O(log n)
O(n)
O(1)
建堆(heapify)
O(n)
-
O(n log n)
删除任意元素
O(log n)*
O(n)
O(n)
更新优先级
O(log n)*
O(1)
O(n)
空间复杂度
O(n)
O(n)
O(n)
* 需要额外维护索引映射表才能实现O(log n)的删除和更新操作。
堆的独特优势: 堆是唯一一种能在O(log n)时间内同时支持插入和提取最值操作的数据结构。有序数组的插入需要O(n)时间,无序数组的提取最值需要O(n)时间,而二叉堆完美平衡了这两种操作,特别适合需要动态维护最值的场景。
八、经典应用场景
1. TopK问题
TopK问题是堆最经典的应用之一:从海量数据中找出最大(或最小)的k个元素。使用大小为k的最小堆,可以在O(n log k)时间内解决该问题,且空间复杂度仅为O(k)。对于海量数据场景(数据量超出内存),这种方法的优势尤为明显。
import heapq
def find_top_k_largest (nums, k):
"""找出数组中最大的k个元素(最小堆解法)"""
if k <= 0 or k > len (nums):
return []
heap = nums[:k]
heapq.heapify(heap) # 大小为k的最小堆
for num in nums[k:]:
if num > heap[0 ]: # 大于堆顶则替换
heapq.heapreplace(heap, num)
return heap
def find_top_k_smallest (nums, k):
"""找出数组中最小的k个元素(最大堆解法)"""
if k <= 0 or k > len (nums):
return []
# 使用负值构建最大堆
heap = [-x for x in nums[:k]]
heapq.heapify(heap)
for num in nums[k:]:
if -num > heap[0 ]: # 小于堆顶(最大堆的堆顶是最大值)则替换
heapq.heapreplace(heap, -num)
return [-x for x in heap]
# Python内置的快捷函数
# heapq.nlargest(k, nums) -- 内部也是用堆实现的
# heapq.nsmallest(k, nums) -- 同上
nums = [3 , 7 , 1 , 9 , 4 , 8 , 2 , 6 , 5 ]
print (find_top_k_largest (nums, 3 )) # 输出: [7, 8, 9]
print (find_top_k_smallest (nums, 3 )) # 输出: [1, 2, 3]
print (heapq.nlargest(3 , nums)) # 输出: [9, 8, 7]
print (heapq.nsmallest(3 , nums)) # 输出: [1, 2, 3]
性能对比: TopK问题有三种常见解法:排序法O(n log n)、堆法O(n log k)、快速选择法O(n)平均。当n极大而k较小时(如从1亿条数据中找前100个),堆法的优势最大。当k接近n/2时,快速选择可能更优。Python的heapq.nlargest/nsmallest在k较小时使用堆实现,k较大时使用排序。
2. 数据流中位数查找
中位数查找问题是堆的一种巧妙应用。通过维护一个最大堆(存储较小的一半)和一个最小堆(存储较大的一半),我们可以在O(log n)时间内插入新数,并在O(1)时间内获取当前中位数。关键在于保持两个堆的大小平衡:最大堆的大小要么等于最小堆,要么比最小堆多一个元素。
class MedianFinder :
"""数据流中位数查找器
使用两个堆:最大堆存左半部分,最小堆存右半部分
"""
def __init__ (self):
self.left = [] # 最大堆(存负值)
self.right = [] # 最小堆
def add_num (self, num):
"""插入一个数 O(log n)"""
if not self.left or num <= -self.left[0 ]:
heapq.heappush(self.left, -num)
else :
heapq.heappush(self.right, num)
# 平衡两个堆
# left最多比right多一个元素
if len (self.left) > len (self.right) + 1 :
heapq.heappush(self.right, -heapq.heappop(self.left))
elif len (self.right) > len (self.left):
heapq.heappush(self.left, -heapq.heappop(self.right))
def find_median (self):
"""获取当前中位数 O(1)"""
if len (self.left) == len (self.right):
return (-self.left[0 ] + self.right[0 ]) / 2
return -self.left[0 ]
# 使用示例
mf = MedianFinder ()
for num in [5 , 2 , 8 , 1 , 9 ]:
mf.add_num(num)
print (f"添加 {num}, 当前中位数: {mf.find_median()}" )
# 输出:
# 添加 5, 当前中位数: 5
# 添加 2, 当前中位数: 3.5
# 添加 8, 当前中位数: 5
# 添加 1, 当前中位数: 3.5
# 添加 9, 当前中位数: 5
3. Dijkstra最短路径算法
Dijkstra算法是图论中求解单源最短路径的经典算法,其核心步骤是每次从未访问的节点中选择距离最小的节点(即提取最小堆堆顶),然后松弛其邻接边。优先队列(最小堆)的使用使算法的时间复杂度从O(V^2)降低到O((V+E) log V),是堆在算法中的经典应用。
import heapq
from collections import defaultdict
def dijkstra (graph, start):
"""Dijkstra最短路径算法
参数:
graph: 邻接表表示的图 {node: [(neighbor, weight), ...]}
start: 起始节点
返回:
dist: 从start到各节点的最短距离
prev: 前驱节点字典(用于重建路径)
"""
dist = {node: float ('inf' ) for node in graph}
prev = {node: None for node in graph}
dist[start] = 0
# 优先队列:(距离, 节点)
pq = [(0 , start)]
while pq:
cur_dist, node = heapq.heappop(pq)
# 惰性删除:如果弹出的是过时条目,跳过
if cur_dist > dist[node]:
continue
for neighbor, weight in graph[node]:
new_dist = cur_dist + weight
if new_dist < dist[neighbor]:
dist[neighbor] = new_dist
prev[neighbor] = node
heapq.heappush(pq, (new_dist, neighbor))
return dist, prev
def reconstruct_path (prev, target):
"""重建从起点到target的路径"""
path = []
while target is not None :
path.append(target)
target = prev[target]
return path[::-1 ]
# 测试Dijkstra算法
graph = {
'A' : [('B' , 4 ), ('C' , 2 )],
'B' : [('C' , 1 ), ('D' , 5 )],
'C' : [('D' , 8 ), ('E' , 10 )],
'D' : [('E' , 2 )],
'E' : []
}
dist, prev = dijkstra (graph, 'A' )
print (dist) # {'A': 0, 'B': 4, 'C': 2, 'D': 9, 'E': 11}
print (reconstruct_path (prev, 'E' )) # ['A', 'C', 'D', 'E']
4. 合并K个有序链表
合并K个有序链表是面试中非常常见的题目。使用大小为k的最小堆,每次从堆顶取出当前最小的节点,将其下一个节点加入堆中,可以在O(N log k)时间内完成合并(N为总节点数)。
import heapq
def merge_k_lists (lists):
"""使用堆合并K个有序链表"""
# 哨兵节点
dummy = ListNode (0 )
cur = dummy
# 将每个链表的头节点入堆
heap = []
for i, head in enumerate (lists):
if head:
# 使用 (值, 索引, 节点) 三元组避免节点比较问题
heapq.heappush(heap, (head.val, i, head))
while heap:
val, i, node = heapq.heappop(heap)
cur.next = node
cur = cur.next
if node.next:
heapq.heappush(heap, (node.next.val, i, node.next))
return dummy.next
5. 任务调度
堆结构被广泛应用于操作系统和框架的调度器中。例如,在抢占式调度中,总是选择优先级最高的任务执行;在定时任务调度中,使用最小堆按执行时间排序,每次取最近到期的任务。
import heapq
import time
class TaskScheduler :
"""基于优先级的任务调度器"""
def __init__ (self):
self._queue = []
self._counter = 0
def schedule (self, task, priority, delay=0 ):
"""调度一个任务
task: 可调用对象
priority: 优先级(越小越优先)
delay: 延迟执行(秒)
"""
exec_time = time.time() + delay
heapq.heappush(self._queue,
(priority, exec_time, self._counter, task))
self._counter += 1
def run_pending (self):
"""执行所有到期的任务"""
now = time.time()
while self._queue and self._queue[0 ][1 ] <= now:
priority, exec_time, _, task = heapq.heappop(self._queue)
task()
九、heapq高级用法
Python的heapq模块除了基本的heappush和heappop之外,还提供了一些高效的高级操作,这些操作在特定场景下可以显著提升性能。
1. heapreplace (等价于先pop再push,但更高效)
# heapreplace: 弹出堆顶,同时压入新值(单次O(log n))
# 相当于先heappop再heappush,但只需一次O(log n)操作而非两次
heap = [1 , 3 , 5 , 7 , 9 ]
heapq.heapify(heap)
result = heapq.heapreplace(heap, 4 )
print (result) # 输出: 1(原来的堆顶)
print (heap) # 输出: [3, 4, 5, 7, 9]
# 典型应用:维护固定大小的滑动窗口最大值
def top_k_stream (stream, k):
"""维护数据流中最大的k个元素"""
heap = []
for i, val in enumerate (stream):
if i < k:
heapq.heappush(heap, val)
elif val > heap[0 ]:
heapq.heapreplace(heap, val)
return heap
2. heappushpop (先push再pop,更高效)
# heappushpop: 先压入新值,再弹出堆顶(单次O(log n))
# 当新值不可能成为结果时效率极高:无需调整堆
heap = [2 , 5 , 7 ]
heapq.heapify(heap)
result = heapq.heappushpop(heap, 1 )
print (result) # 输出: 1(1入堆后成为最小值,被弹出)
print (heap) # 输出: [2, 5, 7]
# heapreplace与heappushpop的区别:
# heapreplace(a, b) = pop(a), push(a, b) -- 先pop,再push
# heappushpop(a, b) = push(a, b), pop(a) -- 先push,再pop
# 对于最小堆:heapreplace可能返回大于b的值,heappushpop永远返回min(原堆顶, b)
3. nlargest / nsmallest
# heapq.nlargest 和 heapq.nsmallest 是TopK问题的快捷函数
# 内部实现:当k较小时使用堆,k较大时使用排序(自适应)
data = [10 , 3 , 8 , 1 , 6 , 9 , 2 , 5 , 7 , 4 ]
print (heapq.nlargest(3 , data)) # [10, 9, 8]
print (heapq.nsmallest(3 , data)) # [1, 2, 3]
# 支持key参数,类似于sorted的key
students = [
{'name' : 'Alice' , 'score' : 85 },
{'name' : 'Bob' , 'score' : 92 },
{'name' : 'Charlie' , 'score' : 78 },
]
top_students = heapq.nlargest(2 , students, key=lambda s: s['score' ])
print (top_students)
# [{'name': 'Bob', 'score': 92}, {'name': 'Alice', 'score': 85}]
高级技巧: heapq中的nlargest和nsmallest不仅比手动排序更简洁,而且在k较小时有更好的性能表现。内部实现中,当k=1时退化使用min/max函数,当k较小时使用固定大小的堆,当k接近n时直接排序。这种自适应策略保证了在各种k值下都有合理的性能。
十、完整堆实现代码
以下是一个完整的最大堆Python实现,将所有核心操作整合到一个类中,可直接用于生产环境或面试准备中。这个实现涵盖了从建堆、插入、提取到堆排序的全部功能。
class MaxHeap :
"""完全自实现的二叉最大堆"""
def __init__ (self, data=None ):
"""初始化:可传入初始数据建堆"""
self._heap = []
if data:
self._heap = list (data)
self._heapify()
def _parent (self, i):
return (i - 1 ) // 2
def _left (self, i):
return 2 * i + 1
def _right (self, i):
return 2 * i + 2
def _sift_up (self, i):
while i > 0 and self._heap[i] > self._heap[self._parent(i)]:
p = self._parent(i)
self._heap[i], self._heap[p] = self._heap[p], self._heap[i]
i = p
def _sift_down (self, i, n=None ):
if n is None :
n = len (self._heap)
while True :
largest = i
left = self._left(i)
right = self._right(i)
if left < n and self._heap[left] > self._heap[largest]:
largest = left
if right < n and self._heap[right] > self._heap[largest]:
largest = right
if largest == i:
break
self._heap[i], self._heap[largest] = \
self._heap[largest], self._heap[i]
i = largest
def _heapify (self):
n = len (self._heap)
for i in range (n // 2 - 1 , -1 , -1 ):
self._sift_down(i, n)
def push (self, val):
self._heap.append(val)
self._sift_up(len (self._heap) - 1 )
def pop (self):
if not self._heap:
raise IndexError ("堆为空" )
n = len (self._heap)
self._heap[0 ], self._heap[n - 1 ] = \
self._heap[n - 1 ], self._heap[0 ]
max_val = self._heap.pop()
if self._heap:
self._sift_down(0 )
return max_val
def peek (self):
return self._heap[0 ] if self._heap else None
def __len__ (self):
return len (self._heap)
def __bool__ (self):
return bool (self._heap)
def heap_sort (self):
"""原地堆排序,返回排序后的新列表"""
result = self._heap[:]
n = len (result)
for i in range (n - 1 , 0 , -1 ):
result[0 ], result[i] = result[i], result[0 ]
# 这里的_sift_down需要用n-1的版本
# 我们直接在调用处下沉
j, size = 0 , i
while True :
largest = j
left = 2 * j + 1
right = 2 * j + 2
if left < size and result[left] > result[largest]:
largest = left
if right < size and result[right] > result[largest]:
largest = right
if largest == j:
break
result[j], result[largest] = result[largest], result[j]
j = largest
return result
# 测试完整功能
if __name__ == "__main__" :
mh = MaxHeap ([10 , 3 , 5 , 1 , 7 , 9 ])
print (mh.peek()) # 10
mh.push(15 )
print (mh.peek()) # 15
print (mh.pop()) # 15
print (mh.pop()) # 10
print (mh.heap_sort()) # [1, 3, 5, 7, 9]
十一、核心要点总结
堆的核心要点:
堆的定义: 堆是一棵完全二叉树,最大堆中父节点值 >= 子节点值,最小堆中父节点值 <= 子节点值。
数组表示: 使用数组存储,父子节点通过下标关系访问:parent(i) = (i-1)//2, left(i) = 2*i+1, right(i) = 2*i+2。
核心操作: sift_up(上浮,O(log n))、sift_down(下沉,O(log n))、heapify(建堆,O(n))、insert(插入,O(log n))、extract(提取最值,O(log n))。
heapq模块: Python内置最小堆实现,提供heapify/heappush/heappop/heapreplace/heappushpush/nlargest/nsmallest。
最大堆技巧: 通过存储负值或自定义元组实现最大堆。
堆排序: 建堆O(n) + n次提取O(n log n) = O(n log n),原地排序,空间O(1)。
优先队列: 堆是实现优先队列的最佳选择,支持push/pop/peek/update操作。
经典应用: TopK问题、数据流中位数、Dijkstra算法、合并K个有序链表、任务调度。
优势: 唯一能在O(log n)时间内同时支持插入和提取最值操作的数据结构。
局限: 缓存不友好(数组访问跳跃),不支持高效的任意元素查找/删除(需额外索引映射)。
十二、进一步思考与实践
堆数据结构在计算机科学中占有重要地位。理解堆的原理和实现是掌握更复杂数据结构和算法的基础。以下是一些值得进一步探索的方向:
二项堆(Binomial Heap): 支持高效的合并操作(merge),合并两个堆的时间复杂度为O(log n),是左式堆和斐波那契堆的基础。
斐波那契堆(Fibonacci Heap): 理论上支持O(1)的插入和减键操作,O(log n)的删除操作,在Dijkstra算法等场景有理论优势。但常数因子大,实际应用中不如二叉堆。
配对堆(Pairing Heap): 一种实现简单、实际性能优秀的堆结构,常用于Prim算法和Dijkstra算法。
索引堆(Index Heap): 在堆中引入索引映射表,支持对堆中任意元素的快速查找和修改,在需要动态更新优先级的场景非常有用。
堆与平衡二叉搜索树的对比: 堆擅长期望最值,BST擅长搜索任意值;堆适合动态最值维护,BST适合范围查询。
堆的分布式应用: 在分布式系统中,堆结构被用于合并排序结果、全局TopK查询等场景。
学习建议: 熟练掌握堆的精髓在于三点:深刻理解sift_up和sift_down的"恢复堆属性"过程,记住heapq模块每个函数的用法和复杂度,能够将现实问题转化为"动态维护最值"的抽象模型。面试中遇到"第k大"、"前k个"、"中位数"、"最短路径"等关键词时,优先考虑堆解法。日常刷题时,建议先在纸上画出堆的树形结构来模拟操作过程,再落实到代码实现,这样可以大大加深对堆原理的理解。