← 返回Python标准库精讲目录
← 返回学习笔记首页
专题: Python标准库精讲系统学习
关键词: Python, 标准库, itertools, 迭代器, count, cycle, chain, product, permutations, combinations, groupby
一、itertools模块概述
itertools是Python标准库中用于高效处理迭代器的工具模块,它提供了一系列用于创建和操作迭代器的函数。该模块的设计哲学是"惰性求值"(lazy evaluation)——只在需要时才计算下一个值,这使得它处理大规模数据时几乎不占用额外内存。itertools中的函数构建了"迭代器代数"(iterator algebra),可以将简单的迭代器组合成复杂的管道。
Python的迭代器协议是itertools的基石。任何实现了__iter__()和__next__()方法的对象都是迭代器。当使用for x in iterable时,Python会隐式调用iter()获取迭代器,然后反复调用next()直到抛出StopIteration异常。itertools的所有函数都返回迭代器对象,这意味着它们可以无缝嵌入到for循环、列表推导式或任何接受可迭代对象的上下文。
itertools模块提供了20多个迭代器构建函数,可分为三大类:无限迭代器、终止于最短输入序列的迭代器、以及组合生成器。掌握这些工具可以显著简化数据处理代码,使其更具可读性和内存效率。
使用itertools对比传统循环有三大优势:其一,内存效率极高——中间结果不必存储在列表中,而是按需生成;其二,代码简洁优雅——用函数组合替代多层嵌套循环;其三,组合灵活——迭代器可以像UNIX管道那样串联起来构建数据处理管线。在函数式编程范式中,itertools扮演着核心角色,常与map()、filter()、functools等函数式工具配合使用。
# itertools的惰性求值演示 — 处理十亿级序列内存不变
from itertools import count, islice
# count() 创建无限递增序列,但不占用内存
# islice 只取出前5个,count内部状态仅维持当前位置
result = list(islice(count(start=10, step=2), 5))
print (result) # [10, 12, 14, 16, 18]
# 如果使用 range,需先创建包含所有元素的 range 对象
# itertools 的优势在于可以处理"无限"序列
在Python 3中,map()和filter()也返回迭代器而非列表,这与itertools的设计一致。itertools的许多函数在Python 3.10+版本中有所增强,如新增的pairwise()函数,以及zip_longest的改进。掌握itertools不仅是学习一个模块,更是理解Python函数式编程哲学的重要一步。
二、无限迭代器(Infinite Iterators)
无限迭代器可以持续生成值而不终止,适用于需要无限数据流或重复模式的场景。使用时必须通过islice()、takewhile()或循环中的条件判断来限制取值数量,否则会导致无线循环。
1. count(start=0, step=1)
count()生成从start开始、以step为步长的等差数列。与range()不同,它没有上限且支持浮点数步长。在需要自动递增ID编号、模拟枚举索引、或作为时间序列数据生成器时非常有用。step参数也可以使用浮点数,如count(0, 0.1)会生成0, 0.1, 0.2, ...。
from itertools import count, islice
# 基本用法:从5开始,步长3
list (islice(count(5, 3), 6))
# 输出: [5, 8, 11, 14, 17, 20]
# 典型应用:与 enumerate 类似但更灵活
for i, item in zip (count(start=1), ['a' , 'b' , 'c' ]):
print (f"{i}: {item}" )
# 输出: 1: a, 2: b, 3: c
2. cycle(iterable)
cycle()将可迭代对象无限循环重复。它会保存输入可迭代对象的每个元素,然后无限次地重复这个序列。注意:如果输入的可迭代对象非常大,cycle会消耗大量内存来缓存所有元素。典型应用包括轮询调度、交通灯状态循环、重复模式着色等。
from itertools import cycle, islice
# 基本用法:在三个状态间循环
colors = ['Red' , 'Green' , 'Blue' ]
print (list (islice(cycle(colors), 7)))
# 输出: ['Red', 'Green', 'Blue', 'Red', 'Green', 'Blue', 'Red']
# 实战:轮询分配任务给三个worker
workers = ['Alice' , 'Bob' , 'Charlie' ]
tasks = ['task1' , 'task2' , 'task3' , 'task4' , 'task5' ]
assignment = list (zip (cycle(workers), tasks))
print (assignment)
# 输出: [('Alice','task1'), ('Bob','task2'), ('Charlie','task3'),
# ('Alice','task4'), ('Bob','task5')]
3. repeat(object, times=None)
repeat()重复返回同一个对象,times指定重复次数,不指定则无限重复。与cycle()不同,它不保存序列而是重复同一个值。性能优化场景中,map(str, repeat(5, 10))可以替代列表推导式。配合zip()使用可以为序列的每个元素填充相同的默认值。
from itertools import repeat
# 重复固定次数
list (repeat('Hello' , 3))
# 输出: ['Hello', 'Hello', 'Hello']
# 实战:为列表每个元素提供默认值
names = ['Alice' , 'Bob' , 'Charlie' ]
result = list (zip (names, repeat('active' )))
print (result)
# 输出: [('Alice', 'active'), ('Bob', 'active'), ('Charlie', 'active')]
# 性能优化:map + repeat 替代列表推导
squares = list (map (lambda x: x**2, repeat(5, 4)))
# 但这里更自然的写法是直接传入列表
三、组合生成器(Combinatoric Generators)
组合生成器用于生成输入可迭代对象元素的各种排列组合。它们在不同领域有广泛用途:密码学中的暴力破解、抽样统计中的组合枚举、机器学习中的特征组合搜索等。所有组合生成器都遵循字典序输出,且将输入元素基于位置而非值进行处理。
1. product(*iterables, repeat=1)
product()计算多个可迭代对象的笛卡尔积,等价于嵌套的for循环。参数repeat指定重复使用同一可迭代对象的次数。当处理多个序列的交叉组合时,这是最直接的解决方案。product在测试用例生成、超参数网格搜索、以及多维坐标枚举中极为常用。
from itertools import product
# 基本用法:两个集合的笛卡尔积
list (product(['A' , 'B' ], ['1' , '2' ]))
# 输出: [('A','1'), ('A','2'), ('B','1'), ('B','2')]
# repeat 参数:相当于对同一集合多次笛卡尔积
list (product(['H' , 'T' ], repeat=2))
# 输出: [('H','H'), ('H','T'), ('T','H'), ('T','T')]
# 相当于抛硬币两次的所有可能结果
# 实战:超参数网格搜索
params = {
'lr' : [0.001 , 0.01 , 0.1 ],
'batch_size' : [16 , 32 , 64 ],
'optimizer' : ['sgd' , 'adam' ]
}
combos = list (product(*params.values()))
print (len (combos)) # 3×3×2 = 18 种组合
# 实战:按位全枚举(3位二进制)
list (product([0 , 1 ], repeat=3))
# 输出: [(0,0,0), (0,0,1), ..., (1,1,1)] 共 8 种
2. permutations(iterable, r=None)
permutations()生成输入可迭代对象中所有长度为r的排列。如果r未指定或为None,则r默认可迭代对象的长度,生成所有全排列。排列考虑顺序,即('A','B')和('B','A')被视为不同的结果。总数为n!/(n-r)!,其中n为输入序列长度。
from itertools import permutations
# 3个元素的全部排列
list (permutations(['A' , 'B' , 'C' ]))
# 输出: [('A','B','C'), ('A','C','B'), ('B','A','C'),
# ('B','C','A'), ('C','A','B'), ('C','B','A')]
# 指定长度 r=2
list (permutations([1 , 2 , 3 ], 2 ))
# 输出: [(1,2), (1,3), (2,1), (2,3), (3,1), (3,2)] 共 P(3,2)=6 个
# 实战:赛跑前三名排列枚举
runners = ['Tom' , 'Jerry' , 'Spike' , 'Tyke' ]
podiums = list (permutations(runners, 3 ))
print (f"冠亚季军可能有 {len(podiums)} 种排列" )
# P(4,3) = 4×3×2 = 24 种
3. combinations(iterable, r)
combinations()生成输入可迭代对象中长度为r的所有组合。组合与排列的区别在于组合不考虑顺序,即('A','B')和('B','A')被视为相同结果。结果按字典序输出,且假设输入序列本身是有序的。总数为C(n,r) = n!/(r!(n-r)!)。
from itertools import combinations
# 4个元素取2个的全部组合
list (combinations(['A' , 'B' , 'C' , 'D' ], 2 ))
# 输出: [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]
# 实战:选课方案枚举(从5门课中选3门)
courses = ['Math' , 'Physics' , 'Chem' , 'Bio' , 'CS' ]
schedules = list (combinations(courses, 3 ))
print (f"共有 {len(schedules)} 种选课方案" )
# C(5,3) = 10 种
4. combinations_with_replacement(iterable, r)
combinations_with_replacement()生成允许元素重复的组合。与普通组合不同的是,同一个元素可以在结果中出现多次(但位置顺序仍然不重要)。这相当于从n种物品中有放回地抽取r次,不考虑顺序。总数为C(n+r-1, r)。
from itertools import combinations_with_replacement
# 从3种口味中选2个,允许重复
flavors = ['Vanilla' , 'Chocolate' , 'Strawberry' ]
list (combinations_with_replacement(flavors, 2 ))
# 输出: [('Vanilla','Vanilla'), ('Vanilla','Chocolate'),
# ('Vanilla','Strawberry'), ('Chocolate','Chocolate'),
# ('Chocolate','Strawberry'), ('Strawberry','Strawberry')]
# C(3+2-1,2) = C(4,2) = 6 种
# 四种函数对比集锦
# product('ABCD', repeat=2) → 16 种(有序,可重复)
# permutations('ABCD', 2) → 12 种(有序,不重复)
# combinations('ABCD', 2) → 6 种(无序,不重复)
# combinations_with_replacement → 10 种(无序,可重复)
四、短路迭代器(Short-circuiting Iterators)
短路迭代器根据条件或位置从输入可迭代对象中选择或过滤元素。它们都会在最短输入序列时终止,适合构建数据处理流水线中的过滤环节。
1. chain(*iterables) 与 chain.from_iterable(iterable)
chain()将多个可迭代对象首尾连接成一个连续的迭代器。这是一个非常实用的工具,可以避免手动编写嵌套循环或使用+连接列表(后者会创建新的列表副本)。chain.from_iterable()接收一个可迭代对象,其每个元素本身也是可迭代对象,常用于将嵌套列表展平。
from itertools import chain
# chain 连接多个序列
result = list (chain([1 , 2 ], [3 , 4 ], [5 ]))
print (result) # [1, 2, 3, 4, 5]
# 性能对比:chain 不创建新列表,O(1) 额外内存
# 而 list1 + list2 + list3 会创建 O(n) 的新列表
# from_iterable 展平嵌套列表
nested = [[1 , 2 ], [3 , 4 , 5 ], [6 ]]
list (chain.from_iterable(nested))
# 输出: [1, 2, 3, 4, 5, 6]
# 实战:合并多个数据源的行
csv_files = [
['header1,header2' , '1,2' ],
['3,4' , '5,6' ],
]
all_rows = list (chain.from_iterable(csv_files))
# 输出: ['header1,header2', '1,2', '3,4', '5,6']
2. compress(data, selectors)
compress()根据selectors可迭代对象中的对应布尔值,过滤data中的元素。类似于filter(),但选择器预先计算好并通过可迭代对象传入。selectors中的真值可以是任何布尔可转换的值(1、True、非零数字等)。当data和selectors长度不一致时,在较短的序列处终止。
from itertools import compress
# 使用选择器过滤数据
data = ['A' , 'B' , 'C' , 'D' , 'E' ]
selectors = [1 , 0 , 1 , 0 , 1 ]
list (compress(data, selectors))
# 输出: ['A', 'C', 'E']
# 实战:根据周末标记筛选工作日数据
weekdays = ['Mon' , 'Tue' , 'Wed' , 'Thu' , 'Fri' , 'Sat' , 'Sun' ]
is_weekend = [0 , 0 , 0 , 0 , 0 , 1 , 1 ]
workdays = list (compress(weekdays, is_weekend))
print (workdays) # ['Sat', 'Sun']
3. dropwhile(predicate, iterable) 与 takewhile(predicate, iterable)
dropwhile()从可迭代对象中跳过满足predicate的元素,直到遇到第一个不满足条件的元素,然后返回该元素及之后的所有元素。与之相反,takewhile()仅保留满足predicate的元素,直到遇到第一个不满足条件的元素为止。这两个函数常用于处理有序数据中需要"越过头部"或"截取头部"的场景。
from itertools import dropwhile, takewhile
# dropwhile:跳过开头的负数
data = [-3 , -2 , -1 , 0 , 1 , 2 , 3 ]
list (dropwhile(lambda x: x < 0 , data))
# 输出: [0, 1, 2, 3]
# takewhile:从开头取到第一个不满足条件的地方
list (takewhile(lambda x: x < 0 , data))
# 输出: [-3, -2, -1]
# 实战:处理日志文件,跳过开头注释行
log_lines = [
'# Date: 2025-01-01' ,
'# Author: admin' ,
'INFO: Server started' ,
'INFO: Connection established' ,
]
data_lines = list (dropwhile(lambda line: line.startswith('#' ), log_lines))
print (data_lines)
# 输出: ['INFO: Server started', 'INFO: Connection established']
4. filterfalse(predicate, iterable)
filterfalse()返回所有使predicate为False的元素,相当于内置filter()的逆操作(filter返回True的元素)。如果predicate为None,则返回所有假值元素。这在需要排除满足特定条件的元素时非常有用。
from itertools import filterfalse
# 筛选出偶数(filterfalse返回使谓词为False的元素)
list (filterfalse(lambda x: x % 2 == 1 , range(10 )))
# 输出: [0, 2, 4, 6, 8]
# 等效于: [x for x in range(10) if x % 2 == 0]
# predicate为None时,过滤假值(空字符串、0、None、False)
data = ['hello' , '' , 'world' , 0 , 42 , None , 'python' ]
list (filterfalse(None , data))
# 输出: ['hello', 'world', 42, 'python']
5. islice(iterable, stop) / islice(iterable, start, stop, step)
islice()对可迭代对象执行切片操作,但不创建中间列表。itertools.islice不同于普通的list[start:stop:step]切片,它逐一遍历元素并丢弃start之前的元素,性能上与直接切片相当,但内存效率更高。start和step都是可选的,默认为0和1。
from itertools import islice
# 取前5个元素
list (islice(range(100 ), 5 ))
# 输出: [0, 1, 2, 3, 4]
# 从索引5到10
list (islice(range(100 ), 5 , 10 ))
# 输出: [5, 6, 7, 8, 9]
# 带步长的切片:从0到20,每隔3个取一个
list (islice(range(100 ), 0 , 20 , 3 ))
# 输出: [0, 3, 6, 9, 12, 15, 18]
# 实战:大型文件的批量读取(伪代码)
# def read_in_chunks(file_path, chunk_size=1000):
# with open(file_path) as f:
# while True:
# chunk = list(islice(f, chunk_size))
# if not chunk:
# break
# yield chunk
五、分组与累积(Grouping and Accumulating)
本节介绍两个重要的数据处理函数:groupby用于对连续元素分组,accumulate用于累积计算。它们常用于数据聚合和运行统计场景。
1. groupby(iterable, key=None)
groupby()将可迭代对象中连续的、具有相同key函数返回值的元素分组。注意:groupby只对连续的相同键值进行分组!如果输入序列不是按key排序的,相同键值的元素可能会出现在不同分组中。因此使用前通常需要对数据排序。groupby返回的每个分组是一个(key, iterator)对,其中iterator是共享同一个key的所有元素。
from itertools import groupby
# 基本用法:对连续字符分组
data = 'AAABBBCCAA'
result = [(key, list (group)) for key, group in groupby(data)]
print (result)
# 输出: [('A', ['A','A','A']), ('B', ['B','B','B']),
# ('C', ['C','C']), ('A', ['A','A'])]
# 注意两个'A'组是分开的,因为它们不连续
# 正确用法:先排序再分组
records = [
{'dept' : 'IT' , 'name' : 'Alice' },
{'dept' : 'HR' , 'name' : 'Bob' },
{'dept' : 'IT' , 'name' : 'Charlie' },
]
sorted_records = sorted (records, key=lambda x: x['dept' ])
for dept, group in groupby(sorted_records, key=lambda x: x['dept' ]):
print (dept, list (group))
# 输出: HR [{'dept':'HR', 'name':'Bob'}]
# IT [{'dept':'IT', 'name':'Alice'}, {'dept':'IT', 'name':'Charlie'}]
2. accumulate(iterable, func=operator.add)
accumulate()对输入序列依次累积应用func函数,返回每一步的累积结果。默认func是加法,产生累加和。可以指定其他二元操作函数实现不同语义,如累乘、累取最大值等。在Python 3.8+版本中,func还可以接收operator.mul实现累乘。accumulate返回的迭代器长度与输入序列相同。
from itertools import accumulate
import operator
# 默认累加
list (accumulate([1 , 2 , 3 , 4 , 5 ]))
# 输出: [1, 3, 6, 10, 15] (1, 1+2, 1+2+3, ...)
# 累乘
list (accumulate([1 , 2 , 3 , 4 , 5 ], operator.mul))
# 输出: [1, 2, 6, 24, 120] (1, 1×2, 1×2×3, ...)
# 最大值累积
list (accumulate([3 , 1 , 5 , 2 , 4 ], max ))
# 输出: [3, 3, 5, 5, 5] — 每一步都是当前为止的最大值
# 实战:计算运行总和(可用于财务流水统计)
transactions = [100 , -50 , 200 , -30 , 80 ]
running_balance = list (accumulate(transactions))
print (running_balance)
# 输出: [100, 50, 250, 220, 300]
# 实战:斐波那契数列生成
fib = [1 , 1 ]
[fib.append(sum (fib[-2 :])) for _ in range(8 )]
print (fib[:10 ]) # [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
六、合并迭代器(Merging Iterators)
合并迭代器函数涉及多个迭代器的交互操作,包括并行遍历、复制、映射等。它们使得处理多个数据流变得更加简单和高效。
1. zip_longest(*iterables, fillvalue=None)
zip_longest()类似于内置zip(),但会持续到最长的可迭代对象结束,缺失的值用fillvalue填充。内置zip()在最短序列处终止,而zip_longest确保完全遍历所有输入。这在处理不等长数据记录、对齐时间序列数据时非常有用。
from itertools import zip_longest
# zip_longest 与 zip 的对比
a = [1 , 2 , 3 ]
b = ['a' , 'b' ]
list (zip (a, b))
# 输出: [(1, 'a'), (2, 'b')] — 在最短序列处终止
list (zip_longest(a, b, fillvalue='N/A' ))
# 输出: [(1, 'a'), (2, 'b'), (3, 'N/A')] — 补齐到最长
# 实战:合并不等长的CSV列
headers = ['Name' , 'Age' , 'City' , 'Country' ]
row1 = ['Alice' , '30' , 'NYC' ]
padded = list (zip_longest(headers, row1, fillvalue='' ))
print (padded)
# 输出: [('Name', 'Alice'), ('Age', '30'), ('City', 'NYC'), ('Country', '')]
2. tee(iterable, n=2)
tee()将一个迭代器复制为n个独立的迭代器。返回的迭代器各自独立,可以分别消耗。但注意:tee需要缓存原始迭代器已被消耗但某个副本尚未消耗的元素,因此如果迭代器之间存在较大的消耗差距,会占用额外内存。通常用于需要对同一数据流进行多次不同处理的场景。
from itertools import tee
# 复制迭代器
original = [1 , 2 , 3 , 4 , 5 ]
it1, it2 = tee(original, 2 )
print (list (it1)) # [1, 2, 3, 4, 5]
print (list (it2)) # [1, 2, 3, 4, 5]
# 实战:对同一数据流同时计算平均值和最大值
data = [10 , 20 , 30 , 40 , 50 ]
avg_it, max_it = tee(iter(data), 2 )
avg = sum (avg_it) / len (data)
max_val = max (max_it)
print (f"Average: {avg}, Max: {max_val}" )
3. starmap(function, iterable)
starmap()类似于map(),但假设iterable的每个元素本身是一个元组,并将元组解包为function的参数。这在处理已预打包的参数列表时非常有用,避免了显式的lambda包装。当参数在预处理阶段已被收集为元组时,starmap是比map(lambda x: f(*x), ...)更简洁的替代方案。
from itertools import starmap
# starmap 解包参数元组
data = [(2 , 3 ), (4 , 5 ), (6 , 7 )]
list (starmap(lambda a, b: a * b, data))
# 输出: [6, 20, 42]
# 等效于: [pow(2,3), pow(4,5), pow(6,7)]
# 实战:使用 pow 函数计算幂
list (starmap(pow , [(2 , 3 ), (3 , 4 ), (4 , 5 )]))
# 输出: [8, 81, 1024]
# 实战:计算多个点的欧氏距离
points = [((0 , 0 ), (3 , 4 )), ((1 , 1 ), (4 , 5 ))]
def distance (p1, p2):
return ((p1[0 ]-p2[0 ])**2 + (p1[1 ]-p2[1 ])**2 )**0.5
list (starmap(distance, points))
# 输出: [5.0, 5.0]
4. pairwise(iterable) — Python 3.10+
pairwise()返回输入可迭代对象中连续重叠的对,形式为(s0,s1), (s1,s2), (s2,s3), ...。该函数在Python 3.10中新增。典型应用包括:计算相邻元素的差值、创建滑动窗口、检测数据序列中的变化点等。
from itertools import pairwise
# 基本用法
list (pairwise(['a' , 'b' , 'c' , 'd' ]))
# 输出: [('a', 'b'), ('b', 'c'), ('c', 'd')]
# 实战:计算相邻元素的差值
prices = [100 , 102 , 98 , 105 , 110 ]
changes = [y - x for x, y in pairwise(prices)]
print (changes) # [2, -4, 7, 5]
# 实战:检测序列中的转折点
def find_turning_points (seq):
if len (seq) < 3 :
return []
result = []
for (a, b), (b2, c) in pairwise(pairwise(seq)):
if (a > b < c) or (a < b > c):
result.append(b)
return result
print (find_turning_points(prices)) # [98, 105] — 局部最小值
七、实战应用(Practical Applications)
本节将展示itertools在真实编程场景中的典型应用,展示如何将多个迭代器工具组合成强大的数据处理管线。
应用1:数据分页(Pagination)
使用islice和count实现高效的数据分页,不需要将全部数据加载到内存中。
from itertools import islice, count
def paginate (iterable, page_size):
"""将可迭代对象分页,每次返回一页数据"""
it = iter(iterable)
return iter(lambda : list (islice(it, page_size)), [])
# 示例:将1-20分页,每页5条
data = range(1 , 21 )
page_gen = paginate(data, page_size=5 )
for page_num, page in enumerate (page_gen, 1 ):
print (f"Page {page_num}: {page}" )
# 输出: Page 1: [1,2,3,4,5]
# Page 2: [6,7,8,9,10]
# Page 3: [11,12,13,14,15]
# Page 4: [16,17,18,19,20]
应用2:排列组合枚举(Schedule Generation)
使用product、permutations和combinations解决实际的枚举问题。
from itertools import product, combinations, permutations
# 场景1:考试座位安排,从20个座位中选5个
seats = range(1 , 21 )
print (f"选座方案数: {len(list(combinations(seats, 5)))}" )
# C(20,5) = 15504 种
# 场景2:四位数密码暴力破解空间
digits = range(10 )
password_space = product(digits, repeat=4 )
print (f"4位密码空间: {10**4} 种" )
# 场景3:球队对阵表生成(循环赛)
teams = ['Lakers' , 'Celtics' , 'Warriors' , 'Bulls' ]
matches = list (combinations(teams, 2 ))
print (f"循环赛对阵: {matches}" )
# C(4,2) = 6 场比赛
# 场景4:菜单组合优化(从8道菜中选3道,每道都可重复选)
menu = ['A' , 'B' , 'C' , 'D' , 'E' , 'F' , 'G' , 'H' ]
set_meals = list (combinations_with_replacement(menu, 3 ))
print (f"套餐组合数: {len(set_meals)}" )
# C(8+3-1,3) = C(10,3) = 120 种
应用3:数据流处理管线(Pipeline Pattern)
使用itertools函数组合构建数据处理管线,实现类似UNIX管道的数据处理风格。每个函数只做一件事,通过组合实现复杂的数据转换。
from itertools import chain, islice, dropwhile, groupby, accumulate, filterfalse
import operator
# 场景:处理传感器数据流
# 1. 原始数据(模拟传感器读数,含无效值-1)
raw_data = [-1 , -1 , 23 , 25 , 27 , -1 , 30 , 32 , 35 , -1 , -1 , 40 , 42 ]
# 2. 构建处理管线
pipeline = (
raw_data
# 跳过开头的无效值
| dropwhile(lambda x: x == -1 )
# 过滤中间的无效值
| filterfalse(lambda x: x == -1 )
# 计算运行累积温度
| accumulate(operator.add)
)
# 注意:上述 | 语法仅为示意,实际应使用嵌套函数调用
# 实际写法:
clean_data = filterfalse(lambda x: x == -1 ,
dropwhile(lambda x: x == -1 , raw_data))
accumulated = list (accumulate(clean_data))
print (accumulated)
# 输出: [23, 48, 75, 105, 137, 177, 219]
# 实战:日志分析管线 — 统计每小时错误数
logs = [
'INFO: 2025-01-01 08:00:00 Server started' ,
'ERROR: 2025-01-01 08:05:00 Connection timeout' ,
'ERROR: 2025-01-01 08:10:00 Disk full' ,
'INFO: 2025-01-01 09:00:00 Backup completed' ,
'ERROR: 2025-01-01 09:15:00 Memory warning' ,
]
# 提取错误日志并分组统计
errors = [log for log in logs if log.startswith('ERROR' )]
print (f"错误总数: {len(errors)}" )
# 可以结合正则表达式进一步按小时分组
八、核心总结(Summary and Cheatsheet)
1. itertools函数速查表
分类 函数 功能描述 语法要点
无限迭代器 count 无限等差数列 count(start=0, step=1)
cycle 无限循环序列 cycle(iterable)
repeat 重复单一值 repeat(obj, times=None)
组合生成器 product 笛卡尔积 product(*iterables, repeat=1)
permutations 排列 permutations(iterable, r=None)
combinations 组合 combinations(iterable, r)
combinations_with_replacement 允许重复的组合 combinations_with_replacement(iterable, r)
短路迭代器 chain 连接多个迭代器 chain(*iterables)
compress 按选择器过滤 compress(data, selectors)
dropwhile 跳过头部满足条件的元素 dropwhile(pred, iterable)
takewhile 保留头部满足条件的元素 takewhile(pred, iterable)
filterfalse 保留不满足条件的元素 filterfalse(pred, iterable)
islice 迭代器切片 islice(iterable, start, stop, step)
分组与累积 groupby 按键值分组(需先排序) groupby(iterable, key=None)
accumulate 累积计算 accumulate(iterable, func=add)
合并迭代器 zip_longest 最长zip,缺失填充 zip_longest(*iterables, fillvalue=None)
tee 复制迭代器 tee(iterable, n=2)
starmap 解包参数映射 starmap(func, iterable)
pairwise 连续重叠对 pairwise(iterable) # 3.10+
2. 与for循环的性能对比
itertools的主要优势不在于执行速度(虽然在某些场景下确实更快),而在于内存效率和代码表达力。以下是几个关键对比:
内存效率 :处理1000万个整数时,range(10_000_000)几乎不占用内存,而list(range(10_000_000))占用约320MB。itertools的所有函数都采用惰性求值,始终保持低内存占用。
代码简洁度 :使用product替代三层嵌套for循环,chain替代列表拼接,groupby替代手动分组逻辑,代码量减少约50%。
组合灵活性 :迭代器可以随意组合和嵌套,构建复杂的数据处理链,而传统的for循环改写需要大量重构。
延迟计算 :只在需要时才实际计算值,这对无限序列或超大文件处理具有决定性意义。
3. 使用注意事项
迭代器的一次性 :迭代器一旦被消耗就不可重用。需要使用tee()或用list()缓存才能多次遍历。
groupby的连续限制 :groupby只分组连续相同的键值,使用前务必对数据排序。
tee的内存陷阱 :如果tee的副本之间消耗进度差距过大,未消耗的元素会被缓存,可能导致OOM。
无限迭代器必须限流 :使用无限迭代器时,务必使用islice、takewhile等限制元素数量。
Python版本差异 :pairwise()仅Python 3.10+可用;accumulate在3.8+支持更多操作。
核心要诀: itertools是Python函数式编程的瑞士军刀。掌握它需要记住三个关键词——惰性 (按需计算,节省内存)、组合 (小函数构建大管道)、替代 (用声明式迭代替代命令式循环)。建议在实践中多思考"这个for循环能否用itertools替代",逐步培养函数式编程思维。