def fibonacci(n):
"""生成前n个斐波那契数列"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
for num in fibonacci(10):
print(num, end=' ') # 0 1 1 2 3 5 8 13 21 34
# 列表推导式:一次性创建所有元素,占用内存与元素数量成正比
squares_list = [x * x for x in range(1000000)]
print(f"列表大小:{sys.getsizeof(squares_list) / 1024 / 1024:.2f} MB")
# 生成器表达式:惰性求值,几乎不占内存
squares_gen = (x * x for x in range(1000000))
print(f"生成器大小:{sys.getsizeof(squares_gen)} 字节") # 约 112 字节
3.2 生成器表达式的链式组合
# 链式组合:多个生成器表达式串联,数据处理流水线式
nums = range(1000)
squared = (x * x for x in nums) # 平方
evens = (x for x in squared if x % 2 == 0) # 过滤偶数
result = (x / 2 for x in evens) # 除以2
# 直到迭代时才开始计算
for val in result:
if val > 100:
break
print(val, end=' ')
def countdown(n):
"""倒计时,结束后返回状态信息"""
while n > 0:
yield n
n -= 1
return f"完成倒计时"
gen = countdown(3)
try:
while True:
print(next(gen))
except StopIteration as e:
print(f"返回值: {e.value}") # 返回值: 完成倒计时
def naturals(start=0):
"""无限自然数序列"""
while True:
yield start
start += 1
# 取前5个自然数
ns = naturals()
for _ in range(5):
print(next(ns), end=' ') # 0 1 2 3 4
7.2 素数生成器(埃拉托色尼筛法)
def primes():
"""无限素数序列——埃拉托色尼筛法"""
yield 2
n = 3
while True:
is_prime = True
for i in range(3, int(n**0.5) + 1, 2):
if n % i == 0:
is_prime = False
break
if is_prime:
yield n
n += 2
# 取前10个素数
p = primes()
print([next(p) for _ in range(10)])
# [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
7.3 更高效的素数筛(优化版)
def primes_optimized():
"""使用字典记录合数的最小质因子,实现更高效的无限素数生成"""
yield 2
d = {} # 记录合数 -> 其最小质因子
q = 3
while True:
p = d.pop(q, None)
if p is None:
# q 是素数
d[q * q] = q
yield q
else:
# q 是合数
x = q + 2 * p
while x in d:
x += 2 * p
d[x] = p
q += 2
def read_large_file(file_path):
"""逐行读取大文件的生成器(文件对象本身已经是生成器风格)"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f: # f本身是惰性迭代的
yield line.strip()
# 使用示例:统计日志文件中包含 "ERROR" 的行数
count = 0
for line in read_large_file("server.log"):
if "ERROR" in line:
count += 1
if count <= 10: # 只打印前10个错误
print(line)
print(f"共发现 {count} 个错误")
8.2 分块读取二进制文件
def read_in_chunks(file_path, chunk_size=8192):
"""分块读取二进制文件,避免一次性加载整个文件"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# 计算大文件的MD5(逐块处理)
import hashlib
hash_md5 = hashlib.md5()
for chunk in read_in_chunks("large_video.mp4"):
hash_md5.update(chunk)
print(f"MD5: {hash_md5.hexdigest()}")
8.3 流式CSV解析
import csv
from typing import Iterator, Dict
def stream_csv(file_path: str) -> Iterator[Dict[str, str]]:
"""流式读取CSV,每次产出一行字典"""
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
yield row
# 处理5GB的CSV文件,内存占用始终不超过几百KB
for row in stream_csv("massive_data.csv"):
# 对每一行进行处理
process_row(row)
# 步骤1:读取原始行
def read_lines(file_path):
with open(file_path, 'r') as f:
for line in f:
yield line.rstrip('\n')
# 步骤2:过滤(只保留ERROR级别日志)
def filter_errors(lines):
for line in lines:
if 'ERROR' in line or 'CRITICAL' in line:
yield line
# 步骤3:解析(提取时间戳和消息)
def parse_log(lines):
import re
pattern = re.compile(r'\[(.*?)\]\s+\[(.*?)\]\s+(.*)')
for line in lines:
match = pattern.match(line)
if match:
yield {
'timestamp': match.group(1),
'level': match.group(2),
'message': match.group(3)
}
# 步骤4:格式化输出
def format_output(parsed_entries):
for entry in parsed_entries:
yield f"[{entry['timestamp']}] {entry['level']}: {entry['message']}"
# 组装管道
pipeline = format_output(
parse_log(
filter_errors(
read_lines('app.log')
)
)
)
# 消费:逐条取出处理结果
for formatted_line in pipeline:
print(formatted_line)
# 可以继续添加过滤条件或中断
# 例如:取前100条错误日志
9.2 用yield from简化管道
def number_pipeline(n):
"""一条完整的数据处理管道"""
# 阶段1:生成原始数字
numbers = (i for i in range(n))
# 阶段2:过滤偶数
evens = (x for x in numbers if x % 2 == 0)
# 阶段3:平方
squared = (x * x for x in evens)
# 阶段4:按条件截断
result = (x for x in squared if x < 500)
yield from result # 委托给最终的生成器
for val in number_pipeline(100):
print(val, end=' ')
# 0 4 16 36 64 100 144 196 256 324 400 484
9.3 管道模式的高级应用:多阶段数据清洗
# 一个更复杂的——数据清洗流水线
def clean_data(input_path):
"""完整的数据清洗流水线:读取 -> 清洗 -> 转换 -> 验证 -> 输出"""
# 阶段1:原始读取
raw = read_lines(input_path)
# 阶段2:去空白和注释行
stripped = (line.strip() for line in raw if line.strip() and not line.startswith('#'))
# 阶段3:按分隔符解析
parsed = (line.split(',') for line in stripped)
# 阶段4:类型转换和验证
def validate(rows):
for row in parsed:
try:
yield {
'id': int(row[0]),
'name': row[1].strip(),
'value': float(row[2]),
'active': row[3].strip().lower() == 'true'
}
except (ValueError, IndexError) as e:
# 记录错误行并跳过
print(f"跳过无效行: {row}, 错误: {e}")
# 阶段5:过滤(只保留激活项)
active_only = (item for item in validate(parsed) if item['active'])
yield from active_only