堆(Heap)与优先队列

数据结构与算法专题 · 二叉堆实现与堆操作详解

专题:数据结构与算法系统学习

关键词:数据结构, 算法, 堆, Heap, 二叉堆, 优先队列, heapq, 堆排序, TopK, sift_up, sift_down

一、堆概述

堆(Heap)是一种特殊的完全二叉树数据结构,它满足"堆属性":每个节点的值都大于等于(最大堆)或小于等于(最小堆)其子节点的值。堆是优先队列最常见的实现方式,也是堆排序算法的核心数据结构。堆的重要特性在于,它能够在O(log n)时间内完成插入和删除操作,同时又能在O(1)时间内获取最值,这种高效性使其成为解决许多实际问题的首选数据结构。

堆分为两种基本类型:最大堆(Max-Heap)最小堆(Min-Heap)。在最大堆中,根节点是堆中最大的元素,每个父节点的值都大于或等于其子节点的值;在最小堆中,根节点是最小的元素,每个父节点的值都小于或等于其子节点的值。这两种堆结构在逻辑上是对称的,实现时只需调整比较符号即可相互转换。

核心堆属性:

最大堆:arr[parent] >= arr[child](每个父节点不小于其子节点)

最小堆:arr[parent] <= arr[child](每个父节点不大于其子节点)

完全二叉树性质:除最后一层外,其他层节点数均为最大值;最后一层节点从左向右排列。

堆的两种类型

最大堆(Max-Heap)

根节点值最大,常用于:

  • 堆排序升序排列
  • 获取前k个最小元素
  • 任务调度(优先级最高优先)

最小堆(Min-Heap)

根节点值最小,常用于:

  • 堆排序降序排列
  • 获取前k个最大元素
  • Dijkstra最短路径算法

堆的应用场景:堆广泛应用于优先队列、堆排序、TopK问题(求数据流中第k大/小的元素)、中位数查找、任务调度、Dijkstra最短路径算法、Huffman编码、合并K个有序链表等场景。凡是需要频繁获取最值且动态增删数据的场景,堆都是最合适的结构。

二、二叉堆的存储结构

二叉堆(Binary Heap)使用数组进行存储,不需要显式的节点对象和指针链接。这种实现方式利用了完全二叉树的特性:对于一个长度为n的数组,我们可以通过下标运算快速定位任意节点的父节点和子节点。数组实现堆的最大优势在于空间效率高(不需要存储指针)和缓存友好(连续内存访问)。

父子下标关系

假设堆的根节点存储在数组索引0的位置,对于任意索引i处的节点:

def parent(i): return (i - 1) // 2 def left_child(i): return 2 * i + 1 def right_child(i): return 2 * i + 2

索引方式对比:如果根节点从索引1开始存储(索引0留空),公式变为:parent(i)=i//2,left(i)=2*i,right(i)=2*i+1。两种方式各有优劣,本文采用0-based索引,与Python列表和heapq模块保持一致。

数组表示的直观示例

以下是一个最大堆的数组表示,堆中元素为 [50, 30, 40, 10, 20, 35]:

对应的完全二叉树结构为:根50下面挂30和40;30下面挂10和20;40下面挂35(左子),无右子。每个父节点均大于等于其子节点,满足最大堆属性。

# 数组表示的最大堆示例 # 树形结构: # 50 # / \ # 30 40 # / \ / # 10 20 35 # # 数组表示: [50, 30, 40, 10, 20, 35] max_heap = [50, 30, 40, 10, 20, 35] # 验证父节点 >= 子节点 assert 50 >= 30 and 50 >= 40 assert 30 >= 10 and 30 >= 20 assert 40 >= 35

三、核心操作详解

二叉堆的核心操作包括sift_up(上浮)、sift_down(下沉)、heapify(建堆)、insert(插入)和extract(提取最值)。这些操作共同构成了堆数据结构的基本行为。理解sift_up和sift_down是实现所有其他操作的基础,它们分别对应堆属性的向上恢复和向下恢复过程。

1. sift_up(上浮操作)

sift_up用于在堆的末尾插入新元素后恢复堆属性。新元素从底部向上"冒泡":如果它大于(最大堆)或小于(最小堆)其父节点,就与父节点交换位置,重复此过程直到堆属性恢复。sift_up操作的时间复杂度为O(log n),因为它最多从叶子节点上升到根节点,路径长度等于树的高度。

def sift_up(heap, i): """最大堆的上浮操作:将索引i处的元素上浮到正确位置""" while i > 0: p = (i - 1) // 2 # 父节点索引 if heap[i] <= heap[p]: break heap[i], heap[p] = heap[p], heap[i] # 与父节点交换 i = p def sift_up_min(heap, i): """最小堆的上浮操作""" while i > 0: p = (i - 1) // 2 if heap[i] >= heap[p]: break heap[i], heap[p] = heap[p], heap[i] i = p

2. sift_down(下沉操作)

sift_down用于从堆顶移除元素后或建堆过程中恢复堆属性。当前节点从顶部向"下沉":比较当前节点与左右子节点,如果小于子节点中的较大者(最大堆),则与较大子节点交换,重复直到堆属性恢复。sift_down的时间复杂度同样为O(log n)。这是堆中最重要的操作之一,因为heapify和extract操作都依赖于它。

def sift_down(heap, n, i): """最大堆的下沉操作:将索引i处的元素下沉到正确位置 参数: heap: 堆数组 n: 堆的有效大小(实际可用的元素个数) i: 当前需要下沉的节点索引 """ while True: largest = i left = 2 * i + 1 right = 2 * i + 2 if left < n and heap[left] > heap[largest]: largest = left if right < n and heap[right] > heap[largest]: largest = right if largest == i: break # 已到位 heap[i], heap[largest] = heap[largest], heap[i] i = largest

3. heapify(建堆操作)

heapify是将一个无序数组原地转换为堆的过程。Floyd提出的线性建堆算法从最后一个非叶子节点开始向前遍历,对每个节点执行sift_down操作。该算法的时间复杂度为O(n),而非直观的O(n log n),因为大多数节点位于靠近底层的层次,它们的下沉深度较浅。

def heapify(arr): """将无序数组原地转换为最大堆,O(n)时间复杂度""" n = len(arr) # 从最后一个非叶子节点开始下沉 # 最后一个非叶子节点的索引为 n//2 - 1 for i in range(n // 2 - 1, -1, -1): sift_down(arr, n, i) return arr # 建堆过程演示 arr = [10, 20, 15, 30, 40, 5, 25] heapify(arr) print(arr) # 输出: [40, 30, 25, 20, 10, 5, 15]

建堆时间复杂度O(n)的数学证明:设树高为h(根为第0层),第k层有2^k个节点,每个节点下沉的最大深度为h-k。总比较次数为sum(k=0 to h) 2^k * (h-k) = 2^(h+1) - h - 2 = O(n)。这意味着我们可以在线性时间内将任意无序数组转换为堆,这是堆数据结构的重要优势。

4. insert(插入操作)

插入操作是堆的动态使用中最常见的操作之一。新元素首先被追加到数组末尾(保证完全二叉树性质不变),然后通过sift_up操作将其"上浮"到正确位置,恢复堆属性。单次插入的时间复杂度为O(log n)。

def heappush_max(heap, val): """向最大堆中插入新元素""" heap.append(val) sift_up(heap, len(heap) - 1) # 使用示例 max_heap = [40, 30, 25, 20, 10, 5, 15] heappush_max(max_heap, 35) print(max_heap) # 输出: [40, 35, 25, 30, 10, 5, 15, 20] # 插入过程: 35追加到末尾,然后与父节点20比较 → 交换,再与父节点30比较 → 交换,到位

5. extract(提取最值操作)

提取最值操作(extract_max / extract_min)返回并移除堆中的根节点(最大值或最小值)。具体过程为:将根节点与最后一个叶子节点交换,删除最后一个叶子节点(即原来的根节点),然后从新的根节点开始执行sift_down恢复堆属性。该操作时间复杂度为O(log n)。

def heappop_max(heap): """从最大堆中取出并移除最大值""" if not heap: raise IndexError("堆为空") n = len(heap) heap[0], heap[n - 1] = heap[n - 1], heap[0] # 根与最后一个叶子交换 max_val = heap.pop() # 移除最大值 sift_down(heap, len(heap), 0) # 新根下沉 return max_val # 使用示例 max_heap = [40, 30, 25, 20, 10, 5, 15] max_val = heappop_max(max_heap) print(max_val) # 输出: 40 print(max_heap) # 输出: [30, 20, 25, 15, 10, 5]

6. peek(查看最值)

peek操作返回根节点的值但不删除它。这是堆最基本的查询操作,时间复杂度为O(1),只需返回数组的第一个元素即可。

def peek_max(heap): return heap[0] if heap else None

四、Python heapq模块

Python标准库提供了heapq模块,实现了最小堆的所有基本操作。heapq模块用列表作为底层存储,所有操作都是原地进行的(in-place)。heapq不提供最大堆的直接支持,但可以通过存储负值或自定义比较器来实现最大堆的效果。

heapq四大核心函数

import heapq # 1. heapify:将列表原地转换为最小堆 arr = [10, 3, 5, 1, 7, 9] heapq.heapify(arr) print(arr) # 输出: [1, 3, 5, 10, 7, 9] # 2. heappush:向堆中插入元素 heapq.heappush(arr, 2) print(arr) # 输出: [1, 3, 2, 10, 7, 9, 5] # 3. heappop:弹出堆顶最小值 min_val = heapq.heappop(arr) print(min_val) # 输出: 1 print(arr) # 输出: [2, 3, 5, 10, 7, 9] # 4. heappushpop:先push再pop,比单独调用更高效 result = heapq.heappushpop(arr, 4) print(result) # 输出: 2(原来的最小值) print(arr) # 输出: [3, 4, 5, 10, 7, 9]

最大堆的实现技巧

Python的heapq只支持最小堆,但实现最大堆有三种常用方式:存储负值、存储元组自定义比较、或封装自定义类。其中存储负值是最简单直接的方式。

# 方式一:存储负值实现最大堆 arr = [10, 3, 5, 1, 7] max_heap = [-x for x in arr] heapq.heapify(max_heap) print(-max_heap[0]) # 输出: 10(最大值) # 方式二:存储元组(优先级, 元素)实现带优先级的最大堆 # 优先级取负值即可将最小堆变为按优先级降序 tasks = [] heapq.heappush(tasks, (-3, "低优先级任务")) heapq.heappush(tasks, (-10, "高优先级任务")) heapq.heappush(tasks, (-5, "中优先级任务")) while tasks: priority, task = heapq.heappop(tasks) print(f"执行任务: {task}, 优先级: {-priority}")

实用建议:在算法竞赛和面试中,优先使用heapq模块而非手写堆实现,因为heapq经过高度优化,性能远超手写版本。只有在需要自定义堆操作(如修改堆中任意元素的优先级)时才考虑手写实现。

五、堆排序

堆排序(Heap Sort)是一种基于堆数据结构的比较排序算法。它的核心思想分为两个阶段:首先利用heapify将无序数组构建为最大堆(O(n)),然后反复将堆顶元素(最大值)与末尾元素交换,堆大小减1并对新堆顶执行sift_down恢复堆属性(O(log n)),重复n-1次即可得到升序排列的数组。总时间复杂度为O(n log n),空间复杂度为O(1)。

def heap_sort(arr): """堆排序(原地排序,升序)""" n = len(arr) # 阶段1:建堆 O(n) for i in range(n // 2 - 1, -1, -1): sift_down(arr, n, i) # 阶段2:排序 O(n log n) for i in range(n - 1, 0, -1): # 将堆顶(最大值)交换到数组末尾 arr[0], arr[i] = arr[i], arr[0] # 对剩余元素重建堆 sift_down(arr, i, 0) return arr # 利用Python内置heapq实现堆排序(更简洁) def heap_sort_pythonic(arr): """使用heapq实现堆排序(非原地)""" heapq.heapify(arr) # 建最小堆 return [heapq.heappop(arr) for _ in range(len(arr))] # 测试 test_arr = [64, 34, 25, 12, 22, 11, 90] heap_sort(test_arr) print(test_arr) # 输出: [11, 12, 22, 25, 34, 64, 90]

排序算法对比:堆排序在三种O(n log n)排序算法中有独特优势。与快速排序相比,堆排序的最坏时间复杂度有保证(始终为O(n log n),而快排在极端情况下退化为O(n^2))。与归并排序相比,堆排序是原地排序,空间复杂度O(1)远优于归并排序的O(n)。但堆排序的缺点是缓存不友好(数组访问模式不连续),实际运行速度通常慢于优化后的快速排序。

六、优先队列

优先队列(Priority Queue)是一种特殊的队列,其中每个元素都有一个"优先级",出队时优先级最高的元素最先被移除。堆是实现优先队列最高效的数据结构之一。在Python中,可以直接使用heapq模块来构建优先队列。优先队列的核心操作包括push(入队)、pop(出队)、peek(查看队首)和update(更新优先级)。

基础优先队列实现

class PriorityQueue: """基于最小堆的优先队列实现(优先级值越小,优先级越高)""" def __init__(self): self._heap = [] self._index = 0 # 用于相同优先级时保持FIFO顺序 def push(self, item, priority): """入队:插入元素 item,优先级为 priority""" heapq.heappush(self._heap, (priority, self._index, item)) self._index += 1 def pop(self): """出队:返回并移除优先级最高的元素""" if not self._heap: raise IndexError("队列为空") priority, _, item = heapq.heappop(self._heap) return item, priority def peek(self): if not self._heap: return None return self._heap[0][2] def is_empty(self): return len(self._heap) == 0 def __len__(self): return len(self._heap)

支持优先级更新的优先队列

在实际应用中(如Dijkstra算法),我们经常需要更新队列中已有元素的优先级。为了实现高效的更新操作,我们可以使用"惰性删除"策略:不直接删除旧条目,而是插入一条新的(优先级, 元素)元组,并在弹出时通过标记数组忽略无效条目。

class MutablePriorityQueue: """支持优先级更新的优先队列(使用惰性删除)""" def __init__(self): self._heap = [] self._entries = {} # item -> priority 映射表 def push(self, item, priority): """插入或更新元素的优先级""" if item in self._entries: # 已有条目,检查是否需要更新 if self._entries[item] == priority: return # 优先级未变,无需操作 self._entries[item] = priority heapq.heappush(self._heap, (priority, item)) def pop(self): """弹出当前优先级最高的元素(惰性删除法)""" while self._heap: priority, item = heapq.heappop(self._heap) # 检查该条目是否为最新版本 if item in self._entries and self._entries[item] == priority: del self._entries[item] return item, priority raise KeyError("队列为空") def update(self, item, new_priority): """更新某个元素的优先级""" if item not in self._entries or self._entries[item] != new_priority: self._entries[item] = new_priority heapq.heappush(self._heap, (new_priority, item))

惰性删除原理:当需要更新已有元素的优先级时,我们并不直接修改堆中已有的条目(因为堆不支持高效的任意位置修改),而是将新条目直接push进堆。弹出时,通过_entries字典检查弹出的条目是否是最新版本,如果不是则丢弃(即"惰性删除")。这种策略以牺牲少量内存为代价(堆中可能存有已过期的条目),换取了O(log n)的更新操作效率。

七、时间复杂度分析

堆的各种操作时间复杂度是理解和应用堆的关键。以下汇总了二叉堆各操作的时间复杂度,并与完全无序数组和有序数组进行对比:

操作 二叉堆 无序数组 有序数组
插入(insert) O(log n) O(1) O(n)
获取最值(peek) O(1) O(n) O(1)
提取最值(extract) O(log n) O(n) O(1)
建堆(heapify) O(n) - O(n log n)
删除任意元素 O(log n)* O(n) O(n)
更新优先级 O(log n)* O(1) O(n)
空间复杂度 O(n) O(n) O(n)

* 需要额外维护索引映射表才能实现O(log n)的删除和更新操作。

堆的独特优势:堆是唯一一种能在O(log n)时间内同时支持插入和提取最值操作的数据结构。有序数组的插入需要O(n)时间,无序数组的提取最值需要O(n)时间,而二叉堆完美平衡了这两种操作,特别适合需要动态维护最值的场景。

八、经典应用场景

1. TopK问题

TopK问题是堆最经典的应用之一:从海量数据中找出最大(或最小)的k个元素。使用大小为k的最小堆,可以在O(n log k)时间内解决该问题,且空间复杂度仅为O(k)。对于海量数据场景(数据量超出内存),这种方法的优势尤为明显。

import heapq def find_top_k_largest(nums, k): """找出数组中最大的k个元素(最小堆解法)""" if k <= 0 or k > len(nums): return [] heap = nums[:k] heapq.heapify(heap) # 大小为k的最小堆 for num in nums[k:]: if num > heap[0]: # 大于堆顶则替换 heapq.heapreplace(heap, num) return heap def find_top_k_smallest(nums, k): """找出数组中最小的k个元素(最大堆解法)""" if k <= 0 or k > len(nums): return [] # 使用负值构建最大堆 heap = [-x for x in nums[:k]] heapq.heapify(heap) for num in nums[k:]: if -num > heap[0]: # 小于堆顶(最大堆的堆顶是最大值)则替换 heapq.heapreplace(heap, -num) return [-x for x in heap] # Python内置的快捷函数 # heapq.nlargest(k, nums) -- 内部也是用堆实现的 # heapq.nsmallest(k, nums) -- 同上 nums = [3, 7, 1, 9, 4, 8, 2, 6, 5] print(find_top_k_largest(nums, 3)) # 输出: [7, 8, 9] print(find_top_k_smallest(nums, 3)) # 输出: [1, 2, 3] print(heapq.nlargest(3, nums)) # 输出: [9, 8, 7] print(heapq.nsmallest(3, nums)) # 输出: [1, 2, 3]

性能对比:TopK问题有三种常见解法:排序法O(n log n)、堆法O(n log k)、快速选择法O(n)平均。当n极大而k较小时(如从1亿条数据中找前100个),堆法的优势最大。当k接近n/2时,快速选择可能更优。Python的heapq.nlargest/nsmallest在k较小时使用堆实现,k较大时使用排序。

2. 数据流中位数查找

中位数查找问题是堆的一种巧妙应用。通过维护一个最大堆(存储较小的一半)和一个最小堆(存储较大的一半),我们可以在O(log n)时间内插入新数,并在O(1)时间内获取当前中位数。关键在于保持两个堆的大小平衡:最大堆的大小要么等于最小堆,要么比最小堆多一个元素。

class MedianFinder: """数据流中位数查找器 使用两个堆:最大堆存左半部分,最小堆存右半部分 """ def __init__(self): self.left = [] # 最大堆(存负值) self.right = [] # 最小堆 def add_num(self, num): """插入一个数 O(log n)""" if not self.left or num <= -self.left[0]: heapq.heappush(self.left, -num) else: heapq.heappush(self.right, num) # 平衡两个堆 # left最多比right多一个元素 if len(self.left) > len(self.right) + 1: heapq.heappush(self.right, -heapq.heappop(self.left)) elif len(self.right) > len(self.left): heapq.heappush(self.left, -heapq.heappop(self.right)) def find_median(self): """获取当前中位数 O(1)""" if len(self.left) == len(self.right): return (-self.left[0] + self.right[0]) / 2 return -self.left[0] # 使用示例 mf = MedianFinder() for num in [5, 2, 8, 1, 9]: mf.add_num(num) print(f"添加 {num}, 当前中位数: {mf.find_median()}") # 输出: # 添加 5, 当前中位数: 5 # 添加 2, 当前中位数: 3.5 # 添加 8, 当前中位数: 5 # 添加 1, 当前中位数: 3.5 # 添加 9, 当前中位数: 5

3. Dijkstra最短路径算法

Dijkstra算法是图论中求解单源最短路径的经典算法,其核心步骤是每次从未访问的节点中选择距离最小的节点(即提取最小堆堆顶),然后松弛其邻接边。优先队列(最小堆)的使用使算法的时间复杂度从O(V^2)降低到O((V+E) log V),是堆在算法中的经典应用。

import heapq from collections import defaultdict def dijkstra(graph, start): """Dijkstra最短路径算法 参数: graph: 邻接表表示的图 {node: [(neighbor, weight), ...]} start: 起始节点 返回: dist: 从start到各节点的最短距离 prev: 前驱节点字典(用于重建路径) """ dist = {node: float('inf') for node in graph} prev = {node: None for node in graph} dist[start] = 0 # 优先队列:(距离, 节点) pq = [(0, start)] while pq: cur_dist, node = heapq.heappop(pq) # 惰性删除:如果弹出的是过时条目,跳过 if cur_dist > dist[node]: continue for neighbor, weight in graph[node]: new_dist = cur_dist + weight if new_dist < dist[neighbor]: dist[neighbor] = new_dist prev[neighbor] = node heapq.heappush(pq, (new_dist, neighbor)) return dist, prev def reconstruct_path(prev, target): """重建从起点到target的路径""" path = [] while target is not None: path.append(target) target = prev[target] return path[::-1] # 测试Dijkstra算法 graph = { 'A': [('B', 4), ('C', 2)], 'B': [('C', 1), ('D', 5)], 'C': [('D', 8), ('E', 10)], 'D': [('E', 2)], 'E': [] } dist, prev = dijkstra(graph, 'A') print(dist) # {'A': 0, 'B': 4, 'C': 2, 'D': 9, 'E': 11} print(reconstruct_path(prev, 'E')) # ['A', 'C', 'D', 'E']

4. 合并K个有序链表

合并K个有序链表是面试中非常常见的题目。使用大小为k的最小堆,每次从堆顶取出当前最小的节点,将其下一个节点加入堆中,可以在O(N log k)时间内完成合并(N为总节点数)。

import heapq def merge_k_lists(lists): """使用堆合并K个有序链表""" # 哨兵节点 dummy = ListNode(0) cur = dummy # 将每个链表的头节点入堆 heap = [] for i, head in enumerate(lists): if head: # 使用 (值, 索引, 节点) 三元组避免节点比较问题 heapq.heappush(heap, (head.val, i, head)) while heap: val, i, node = heapq.heappop(heap) cur.next = node cur = cur.next if node.next: heapq.heappush(heap, (node.next.val, i, node.next)) return dummy.next

5. 任务调度

堆结构被广泛应用于操作系统和框架的调度器中。例如,在抢占式调度中,总是选择优先级最高的任务执行;在定时任务调度中,使用最小堆按执行时间排序,每次取最近到期的任务。

import heapq import time class TaskScheduler: """基于优先级的任务调度器""" def __init__(self): self._queue = [] self._counter = 0 def schedule(self, task, priority, delay=0): """调度一个任务 task: 可调用对象 priority: 优先级(越小越优先) delay: 延迟执行(秒) """ exec_time = time.time() + delay heapq.heappush(self._queue, (priority, exec_time, self._counter, task)) self._counter += 1 def run_pending(self): """执行所有到期的任务""" now = time.time() while self._queue and self._queue[0][1] <= now: priority, exec_time, _, task = heapq.heappop(self._queue) task()

九、heapq高级用法

Python的heapq模块除了基本的heappush和heappop之外,还提供了一些高效的高级操作,这些操作在特定场景下可以显著提升性能。

1. heapreplace (等价于先pop再push,但更高效)

# heapreplace: 弹出堆顶,同时压入新值(单次O(log n)) # 相当于先heappop再heappush,但只需一次O(log n)操作而非两次 heap = [1, 3, 5, 7, 9] heapq.heapify(heap) result = heapq.heapreplace(heap, 4) print(result) # 输出: 1(原来的堆顶) print(heap) # 输出: [3, 4, 5, 7, 9] # 典型应用:维护固定大小的滑动窗口最大值 def top_k_stream(stream, k): """维护数据流中最大的k个元素""" heap = [] for i, val in enumerate(stream): if i < k: heapq.heappush(heap, val) elif val > heap[0]: heapq.heapreplace(heap, val) return heap

2. heappushpop (先push再pop,更高效)

# heappushpop: 先压入新值,再弹出堆顶(单次O(log n)) # 当新值不可能成为结果时效率极高:无需调整堆 heap = [2, 5, 7] heapq.heapify(heap) result = heapq.heappushpop(heap, 1) print(result) # 输出: 1(1入堆后成为最小值,被弹出) print(heap) # 输出: [2, 5, 7] # heapreplace与heappushpop的区别: # heapreplace(a, b) = pop(a), push(a, b) -- 先pop,再push # heappushpop(a, b) = push(a, b), pop(a) -- 先push,再pop # 对于最小堆:heapreplace可能返回大于b的值,heappushpop永远返回min(原堆顶, b)

3. nlargest / nsmallest

# heapq.nlargest 和 heapq.nsmallest 是TopK问题的快捷函数 # 内部实现:当k较小时使用堆,k较大时使用排序(自适应) data = [10, 3, 8, 1, 6, 9, 2, 5, 7, 4] print(heapq.nlargest(3, data)) # [10, 9, 8] print(heapq.nsmallest(3, data)) # [1, 2, 3] # 支持key参数,类似于sorted的key students = [ {'name': 'Alice', 'score': 85}, {'name': 'Bob', 'score': 92}, {'name': 'Charlie', 'score': 78}, ] top_students = heapq.nlargest(2, students, key=lambda s: s['score']) print(top_students) # [{'name': 'Bob', 'score': 92}, {'name': 'Alice', 'score': 85}]

高级技巧:heapq中的nlargest和nsmallest不仅比手动排序更简洁,而且在k较小时有更好的性能表现。内部实现中,当k=1时退化使用min/max函数,当k较小时使用固定大小的堆,当k接近n时直接排序。这种自适应策略保证了在各种k值下都有合理的性能。

十、完整堆实现代码

以下是一个完整的最大堆Python实现,将所有核心操作整合到一个类中,可直接用于生产环境或面试准备中。这个实现涵盖了从建堆、插入、提取到堆排序的全部功能。

class MaxHeap: """完全自实现的二叉最大堆""" def __init__(self, data=None): """初始化:可传入初始数据建堆""" self._heap = [] if data: self._heap = list(data) self._heapify() def _parent(self, i): return (i - 1) // 2 def _left(self, i): return 2 * i + 1 def _right(self, i): return 2 * i + 2 def _sift_up(self, i): while i > 0 and self._heap[i] > self._heap[self._parent(i)]: p = self._parent(i) self._heap[i], self._heap[p] = self._heap[p], self._heap[i] i = p def _sift_down(self, i, n=None): if n is None: n = len(self._heap) while True: largest = i left = self._left(i) right = self._right(i) if left < n and self._heap[left] > self._heap[largest]: largest = left if right < n and self._heap[right] > self._heap[largest]: largest = right if largest == i: break self._heap[i], self._heap[largest] = \ self._heap[largest], self._heap[i] i = largest def _heapify(self): n = len(self._heap) for i in range(n // 2 - 1, -1, -1): self._sift_down(i, n) def push(self, val): self._heap.append(val) self._sift_up(len(self._heap) - 1) def pop(self): if not self._heap: raise IndexError("堆为空") n = len(self._heap) self._heap[0], self._heap[n - 1] = \ self._heap[n - 1], self._heap[0] max_val = self._heap.pop() if self._heap: self._sift_down(0) return max_val def peek(self): return self._heap[0] if self._heap else None def __len__(self): return len(self._heap) def __bool__(self): return bool(self._heap) def heap_sort(self): """原地堆排序,返回排序后的新列表""" result = self._heap[:] n = len(result) for i in range(n - 1, 0, -1): result[0], result[i] = result[i], result[0] # 这里的_sift_down需要用n-1的版本 # 我们直接在调用处下沉 j, size = 0, i while True: largest = j left = 2 * j + 1 right = 2 * j + 2 if left < size and result[left] > result[largest]: largest = left if right < size and result[right] > result[largest]: largest = right if largest == j: break result[j], result[largest] = result[largest], result[j] j = largest return result # 测试完整功能 if __name__ == "__main__": mh = MaxHeap([10, 3, 5, 1, 7, 9]) print(mh.peek()) # 10 mh.push(15) print(mh.peek()) # 15 print(mh.pop()) # 15 print(mh.pop()) # 10 print(mh.heap_sort()) # [1, 3, 5, 7, 9]

十一、核心要点总结

堆的核心要点:

  • 堆的定义:堆是一棵完全二叉树,最大堆中父节点值 >= 子节点值,最小堆中父节点值 <= 子节点值。
  • 数组表示:使用数组存储,父子节点通过下标关系访问:parent(i) = (i-1)//2, left(i) = 2*i+1, right(i) = 2*i+2。
  • 核心操作:sift_up(上浮,O(log n))、sift_down(下沉,O(log n))、heapify(建堆,O(n))、insert(插入,O(log n))、extract(提取最值,O(log n))。
  • heapq模块:Python内置最小堆实现,提供heapify/heappush/heappop/heapreplace/heappushpush/nlargest/nsmallest。
  • 最大堆技巧:通过存储负值或自定义元组实现最大堆。
  • 堆排序:建堆O(n) + n次提取O(n log n) = O(n log n),原地排序,空间O(1)。
  • 优先队列:堆是实现优先队列的最佳选择,支持push/pop/peek/update操作。
  • 经典应用:TopK问题、数据流中位数、Dijkstra算法、合并K个有序链表、任务调度。
  • 优势:唯一能在O(log n)时间内同时支持插入和提取最值操作的数据结构。
  • 局限:缓存不友好(数组访问跳跃),不支持高效的任意元素查找/删除(需额外索引映射)。

十二、进一步思考与实践

堆数据结构在计算机科学中占有重要地位。理解堆的原理和实现是掌握更复杂数据结构和算法的基础。以下是一些值得进一步探索的方向:

学习建议:熟练掌握堆的精髓在于三点:深刻理解sift_up和sift_down的"恢复堆属性"过程,记住heapq模块每个函数的用法和复杂度,能够将现实问题转化为"动态维护最值"的抽象模型。面试中遇到"第k大"、"前k个"、"中位数"、"最短路径"等关键词时,优先考虑堆解法。日常刷题时,建议先在纸上画出堆的树形结构来模拟操作过程,再落实到代码实现,这样可以大大加深对堆原理的理解。