专题:Python进阶编程系统学习
关键词:Python, 缓存, lru_cache, cache, cached_property, 记忆化, cachetools, 缓存策略
一、概述:什么是缓存与记忆化
缓存(Caching)是计算机科学中一项核心优化技术,其基本原理是将昂贵的计算结果或频繁访问的数据存储在高速存储介质中,以便后续请求能够以更低的成本直接获取结果。记忆化(Memoization)是缓存的一种特殊形式,特指在函数级别缓存其返回值,确保对于相同的输入参数只计算一次。
在Python中,缓存机制的应用非常广泛,从内置的 functools.lru_cache 装饰器到第三方库 cachetools,再到外部缓存系统如Redis,为不同场景提供了多层次解决方案。合理使用缓存可以将程序性能提升数个数量级,尤其是对于计算密集型或I/O密集型的重复调用场景。
核心思想:用空间换时间。缓存通过占用额外的内存空间来存储计算结果,从而避免重复执行昂贵的计算或I/O操作。
二、functools.lru_cache:最常用的记忆化工具
functools.lru_cache 是Python标准库中最强大的记忆化工具之一。它基于LRU(Least Recently Used,最近最少使用)算法实现缓存淘汰,在缓存容量达到上限时自动移除最久未使用的条目。
2.1 基本用法
通过一个简单的装饰器即可为函数启用记忆化功能。下面以计算斐波那契数列为例,对比使用缓存前后的性能差异:
import functools
import time
# 未使用缓存的版本
def fib_no_cache(n):
if n < 2:
return n
return fib_no_cache(n - 1) + fib_no_cache(n - 2)
# 使用 lru_cache 的版本
@functools.lru_cache(maxsize=128)
def fib_with_cache(n):
if n < 2:
return n
return fib_with_cache(n - 1) + fib_with_cache(n - 2)
# 性能对比
start = time.perf_counter()
fib_no_cache(35)
print(f"无缓存: {time.perf_counter() - start:.4f}s")
start = time.perf_counter()
fib_with_cache(35)
print(f"有缓存: {time.perf_counter() - start:.4f}s")
# 输出示例:
# 无缓存: 2.3456s
# 有缓存: 0.0001s
从输出可以看出,对于斐波那契数列这种具有大量重复子问题的计算,记忆化带来的性能提升可以达到数千倍甚至更多。这是因为 lru_cache 记录了每个参数组合对应的返回值,后续相同参数的调用直接从缓存中读取。
2.2 maxsize 参数与缓存淘汰
maxsize 参数控制缓存的最大条目数。当缓存条目达到上限时,LRU算法会淘汰最久未被访问的条目。设置为 None 表示无限制缓存(此时退化为普通字典缓存),但需要注意内存泄漏风险。
# 限制缓存大小
@functools.lru_cache(maxsize=32)
def expensive_func(n):
# 模拟耗时计算
time.sleep(0.1)
return n ** 2
# maxsize 设置为 None 表示无限制
@functools.lru_cache(maxsize=None)
def unlimited_cache(n):
return n ** 2
最佳实践:maxsize 应设置为 2 的幂次方(如 128、256、512),因为Python内部实现对此做了优化。对于大部分场景,默认值 128 已经足够。如果函数的参数空间本来就很小,可以设置为 None。
2.3 typed 参数:区分类型
typed 参数控制是否区分不同类型的参数。当设为 True 时,3 和 3.0 被视为不同的缓存键:
@functools.lru_cache(typed=True)
def process_value(x):
return f"处理: {x} (类型: {type(x).__name__})"
# typed=True 时,3 和 3.0 使用不同缓存条目
print(process_value(3)) # 计算并缓存 int 3
print(process_value(3.0)) # 计算并缓存 float 3.0(新条目)
print(process_value(3)) # 命中 int 3 的缓存
# 输出:
# 处理: 3 (类型: int)
# 处理: 3.0 (类型: float)
# 处理: 3 (类型: int)
2.4 cache_info():缓存统计
cache_info() 方法返回一个命名元组,包含命中次数(hits)、未命中次数(misses)、当前大小(currsize)和最大容量(maxsize)。这是监控缓存效率的重要工具:
@functools.lru_cache(maxsize=32)
def compute(x):
return x * x
# 执行一系列调用
for i in range(10):
compute(i)
for i in range(5):
compute(i) # 重复调用前5个值
info = compute.cache_info()
print(f"命中: {info.hits}")
print(f"未命中: {info.misses}")
print(f"当前大小: {info.currsize}")
print(f"最大容量: {info.maxsize}")
print(f"命中率: {info.hits / (info.hits + info.misses):.2%}")
# 输出:
# 命中: 5
# 未命中: 10
# 当前大小: 10
# 最大容量: 32
# 命中率: 33.33%
2.5 cache_clear():手动清空缓存
在某些场景下,我们需要手动清空缓存。例如缓存的数据可能因为外部因素而失效,或者测试环境中需要重置状态:
@functools.lru_cache(maxsize=None)
def get_user_data(user_id):
print(f"从数据库获取用户 {user_id} 的数据...")
return {"id": user_id, "name": f"User_{user_id}"}
# 首次调用 - 实际计算
print(get_user_data(1)) # 输出: 从数据库获取用户 1 的数据...
# 第二次调用 - 命中缓存
print(get_user_data(1)) # 不会打印"从数据库获取..."
# 清空所有缓存
get_user_data.cache_clear()
print(f"缓存已清空,当前大小: {get_user_data.cache_info().currsize}")
# 再次调用 - 重新计算
print(get_user_data(1)) # 再次输出: 从数据库获取用户 1 的数据...
2.6 缓存失效机制
lru_cache 本身不提供基于时间的自动失效机制。缓存条目只有在以下情况下才会被移除:
- 容量淘汰:缓存条目达到
maxsize 上限时,最久未使用的条目被淘汰
- 手动清空:调用
cache_clear() 方法
如果需要基于时间的自动失效,需要自行封装或使用 cachetools 库中的 TTLCache。
import functools
import time
def timed_lru_cache(seconds: int, maxsize: int = 128):
"""带 TTL 的 lru_cache 封装"""
def wrapper(func):
func = functools.lru_cache(maxsize=maxsize)(func)
func.expire_time = seconds
func.last_clear = time.time()
@functools.wraps(func)
def wrapped(*args, **kwargs):
if time.time() - func.last_clear > func.expire_time:
func.cache_clear()
func.last_clear = time.time()
return func(*args, **kwargs)
return wrapped
return wrapper
@timed_lru_cache(seconds=5)
def get_time_based_data(key):
print(f"计算并缓存: {key}")
return f"{key}_at_{time.time()}"
print(get_time_based_data("a")) # 计算
print(get_time_based_data("a")) # 命中缓存
time.sleep(6) # 等待缓存过期
print(get_time_based_data("a")) # 缓存已过期,重新计算
三、functools.cache:Python 3.9 无限制缓存
Python 3.9 引入的 functools.cache 是 lru_cache(maxsize=None) 的简写形式。它的底层实现与 lru_cache 相同,但语义更清晰——明确表示"无限制缓存":
import functools
# Python 3.9+ 新语法,等价于 maxsize=None 的 lru_cache
@functools.cache
def load_config(path):
print(f"加载配置文件: {path}")
with open(path, 'r') as f:
return f.read()
# 首次加载
config = load_config("/etc/app/config.yaml")
# 再次加载 - 直接从缓存返回
config = load_config("/etc/app/config.yaml") # 不会重新读取文件
何时使用 cache vs lru_cache:如果函数的参数空间有限且可枚举,使用 @functools.cache。如果参数空间很大或不确定,使用带合理 maxsize 的 @lru_cache 以防止内存无限增长。
四、functools.cached_property:属性缓存
cached_property 是Python 3.8引入的装饰器,用于将类的方法转换为缓存属性。与 @property 不同,@cached_property 在首次计算后会将结果存储在实例的 __dict__ 中,后续访问直接从字典读取,不会重新计算:
import functools
import time
class DataAnalyzer:
def __init__(self, data):
self.data = data
@property
def regular_property(self):
"""每次访问都重新计算"""
print("计算 regular_property...")
time.sleep(1)
return sum(self.data) / len(self.data)
@functools.cached_property
def cached_average(self):
"""只在首次访问时计算,后续直接返回缓存值"""
print("计算 cached_average(仅一次)...")
time.sleep(1)
return sum(self.data) / len(self.data)
analyzer = DataAnalyzer([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# property - 每次重新计算
start = time.time()
print(f"第1次: {analyzer.regular_property} ({time.time()-start:.2f}s)")
start = time.time()
print(f"第2次: {analyzer.regular_property} ({time.time()-start:.2f}s)")
# cached_property - 仅首次计算
start = time.time()
print(f"缓存第1次: {analyzer.cached_average} ({time.time()-start:.2f}s)")
start = time.time()
print(f"缓存第2次: {analyzer.cached_average} ({time.time()-start:.2f}s)")
# 手动清空缓存属性
del analyzer.__dict__['cached_average']
print("缓存已清空")
# 再次访问会重新计算
print(f"重新计算: {analyzer.cached_average}")
重要注意事项:
cached_property 是线程安全的,但在多线程环境下首次计算时可能会被多次调用,最终只有一个结果被保留
- 缓存存储在实例的
__dict__ 中,可以通过 del obj.attr 手动清空
cached_property 是类描述符,只适用于类方法,不适用于普通函数
- 与
@property 不同,cached_property 不支持 setter
五、手动实现记忆化装饰器
理解记忆化的底层原理有助于我们在标准库不满足需求时自行实现。下面逐步构建从简单到复杂的记忆化装饰器。
5.1 基于字典的简单缓存
import functools
import time
def memoize(func):
"""简单的字典缓存记忆化装饰器"""
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 使用参数元组作为缓存键
key = (args, tuple(sorted(kwargs.items())))
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
# 暴露缓存操作方法
wrapper.cache = cache
wrapper.cache_info = lambda: {
'size': len(cache),
'hits': getattr(wrapper, '_hits', 0)
}
wrapper.cache_clear = cache.clear
return wrapper
@memoize
def slow_add(a, b):
time.sleep(0.5)
return a + b
print(slow_add(1, 2)) # 耗时 0.5s
print(slow_add(1, 2)) # 立即返回(缓存命中)
print(f"缓存大小: {len(slow_add.cache)}")
5.2 手动实现LRU缓存装饰器
import functools
from collections import OrderedDict
def lru_cache_manual(maxsize=128):
"""手工实现的 LRU 缓存装饰器"""
def decorator(func):
cache = OrderedDict()
hits = 0
misses = 0
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal hits, misses
key = (args, tuple(sorted(kwargs.items())))
if key in cache:
hits += 1
# LRU: 将访问的条目移到末尾
cache.move_to_end(key)
return cache[key]
misses += 1
result = func(*args, **kwargs)
if len(cache) >= maxsize:
# 移除最久未使用(最前面)的条目
cache.popitem(last=False)
cache[key] = result
return result
wrapper.cache_info = lambda: {
'hits': hits,
'misses': misses,
'currsize': len(cache),
'maxsize': maxsize,
'hit_rate': f"{hits / (hits + misses) * 100:.1f}%" if (hits + misses) > 0 else "0%"
}
wrapper.cache_clear = lambda: cache.clear()
return wrapper
return decorator
@lru_cache_manual(maxsize=3)
def compute(x):
print(f"计算: {x}")
return x ** 2
compute(1) # 计算
compute(2) # 计算
compute(3) # 计算
compute(1) # 命中缓存,同时将 1 移到末尾
compute(4) # 计算,淘汰最久未使用的 2
compute(2) # 重新计算,因为 2 已被淘汰
print(compute.cache_info())
5.3 手动实现TTL缓存装饰器
import functools
import time
def ttl_cache_manual(seconds=60, maxsize=128):
"""带 TTL(生存时间)的缓存装饰器"""
def decorator(func):
cache = {} # key -> (value, expiry_time)
hits = 0
misses = 0
evictions = 0
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal hits, misses, evictions
key = (args, tuple(sorted(kwargs.items())))
now = time.time()
if key in cache:
value, expiry = cache[key]
if now < expiry:
hits += 1
return value
else:
# 条目已过期
del cache[key]
evictions += 1
misses += 1
result = func(*args, **kwargs)
if len(cache) >= maxsize:
# 淘汰最早过期的条目
oldest_key = min(cache, key=lambda k: cache[k][1])
del cache[oldest_key]
evictions += 1
cache[key] = (result, now + seconds)
return result
wrapper.cache_info = lambda: {
'hits': hits,
'misses': misses,
'currsize': len(cache),
'maxsize': maxsize,
'evictions': evictions,
'ttl': seconds
}
wrapper.cache_clear = lambda: cache.clear()
return wrapper
return decorator
@ttl_cache_manual(seconds=3)
def fetch_data(url):
print(f"实际请求: {url}")
return f""
print(fetch_data("/api/users")) # 实际请求
print(fetch_data("/api/users")) # 缓存命中
time.sleep(4) # 等待 TTL 过期
print(fetch_data("/api/users")) # TTL过期,重新请求
六、cachetools 库:专业的缓存工具集
cachetools 是Python生态中最流行的第三方缓存库,提供了多种缓存策略和便捷的装饰器。它支持 LRU、LFU、TTL、FIFO 等多种缓存算法,并且与 functools 风格一致,学习成本低。
6.1 安装与基本使用
# 安装
# pip install cachetools
from cachetools import cached, LRUCache, TTLCache
import time
# LRU 缓存装饰器(等价于 functools.lru_cache)
@cached(cache=LRUCache(maxsize=32))
def expensive_op(n):
print(f"计算: {n}")
time.sleep(0.5)
return n * 2
# TTL 缓存装饰器(自动过期)
@cached(cache=TTLCache(maxsize=100, ttl=30))
def get_weather(city):
print(f"获取 {city} 天气...")
return {"city": city, "temp": 25, "humidity": 60}
print(get_weather("北京")) # 实际调用
print(get_weather("北京")) # 缓存命中
time.sleep(31) # 等待 TTL 过期
print(get_weather("北京")) # TTL过期,重新调用
cachetools 最大的优势在于提供了 TTLCache(基于TTL自动过期),这是标准库 lru_cache 所不具备的。TTLCache 在内部维护了每个条目的过期时间戳,并在访问时检查是否过期,同时后台也会周期性清理过期条目。
6.2 多参数配置示例
from cachetools import cached, LRUCache, TTLCache
from cachetools.keys import hashkey
import time
# 自定义缓存键(忽略某些参数)
@cached(
cache=TTLCache(maxsize=50, ttl=60),
key=lambda url, timeout=30: hashkey(url) # 忽略 timeout 参数
)
def fetch_url(url, timeout=30):
print(f"请求: {url}")
time.sleep(0.2)
return f"response from {url}"
# 即使 timeout 不同,只要 url 相同就命中缓存
print(fetch_url("/api/data", timeout=30))
print(fetch_url("/api/data", timeout=60)) # 命中缓存!
6.3 缓存策略速查
| 缓存类 | 策略 | 特点 | 适用场景 |
| LRUCache | 最近最少使用 | 淘汰最久未访问条目 | 通用场景,访问模式有局部性 |
| LFUCache | 最不经常使用 | 淘汰访问频率最低条目 | 热点数据集中,频率分布不均 |
| TTLCache | 生存时间 | 基于时间自动过期 | 数据有时效性,需定期刷新 |
| FIFOCache | 先进先出 | 淘汰最早加入条目 | 数据流处理,公平淘汰 |
| RRCache | 随机淘汰 | 随机选择淘汰条目 | 最简单实现,但命中率不确定 |
6.4 cachetools LFU 缓存示例
from cachetools import cached, LFUCache
# LFU(Least Frequently Used)缓存
# 淘汰访问频率最低的条目,适合热点数据场景
@cached(cache=LFUCache(maxsize=3))
def query_database(sql):
print(f"执行SQL: {sql}")
return f"result of {sql}"
# 模拟访问模式:频繁访问某些SQL
for _ in range(10):
query_database("SELECT * FROM users") # 高频
for _ in range(5):
query_database("SELECT * FROM orders") # 中频
query_database("SELECT * FROM products") # 低频
query_database("SELECT * FROM logs") # 新查询,淘汰低频的 products
# 此时 products 已被淘汰,logs 被缓存
七、Redis外部缓存
当应用需要跨进程、跨服务共享缓存数据时,内存缓存无法满足需求。Redis作为高性能的键值存储系统,是最常用的外部缓存解决方案。Python通过 redis-py 库与Redis交互。
7.1 基本连接与操作
# 安装: pip install redis
import redis
import json
import time
# 连接 Redis
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # 自动解码为字符串
)
# 基本缓存操作
cache_key = "user:1001"
user_data = {"name": "张三", "age": 28, "city": "北京"}
# 设置缓存,过期时间 300 秒
r.setex(cache_key, 300, json.dumps(user_data))
# 获取缓存
cached = r.get(cache_key)
if cached:
print(f"缓存命中: {json.loads(cached)}")
else:
print("缓存未命中")
# 检查键是否存在
print(f"键存在: {r.exists(cache_key)}")
# 获取剩余 TTL
print(f"剩余时间: {r.ttl(cache_key)}秒")
7.2 装饰器封装
import redis
import json
import functools
import hashlib
class RedisCache:
"""Redis 缓存装饰器"""
def __init__(self, host='localhost', port=6379, db=0, default_ttl=300):
self.client = redis.Redis(
host=host, port=port, db=db,
decode_responses=True
)
self.default_ttl = default_ttl
def __call__(self, ttl=None):
ttl = ttl or self.default_ttl
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key_data = f"{func.__module__}:{func.__name__}:{args}:{kwargs}"
cache_key = f"cache:{hashlib.md5(key_data.encode()).hexdigest()}"
# 尝试从缓存获取
cached = self.client.get(cache_key)
if cached is not None:
return json.loads(cached)
# 执行函数并缓存结果
result = func(*args, **kwargs)
self.client.setex(cache_key, ttl, json.dumps(result))
return result
wrapper.cache_clear = lambda: None # Redis缓存清理需单独实现
return wrapper
return decorator
# 使用示例
cache = RedisCache(default_ttl=600)
@cache(ttl=300) # 单独设置5分钟过期
def get_user_info(user_id):
print(f"从数据库查询用户 {user_id}")
return {"id": user_id, "name": f"User_{user_id}", "timestamp": time.time()}
# 首次调用 - 查询数据库并写入Redis
info1 = get_user_info(1001)
# 第二次调用 - 直接从Redis返回
info2 = get_user_info(1001)
print(f"两次结果相同: {info1 == info2}")
7.3 Redis缓存高级模式
# 缓存穿透防护:布隆过滤器
# 使用 RedisBloom 模块
try:
r.bf().create('user_bloom', 0.01, 1000000)
r.bf().add('user_bloom', 'user:1001')
def safe_get_user(user_id):
cache_key = f"user:{user_id}"
# 布隆过滤器快速判断
if not r.bf().exists('user_bloom', cache_key):
return None # 肯定不存在,直接返回
# 从缓存获取
result = r.get(cache_key)
if result:
return json.loads(result)
# 从数据库查询(概率极小)
return query_database(user_id)
except redis.exceptions.ModuleError:
print("RedisBloom 模块未安装")
# 缓存击穿防护:互斥锁
def get_data_with_lock(key, compute_func, ttl=300):
"""使用互斥锁防止缓存击穿"""
data = r.get(key)
if data is not None:
return json.loads(data)
# 尝试获取锁
lock_key = f"lock:{key}"
if r.setnx(lock_key, "1"):
# 获取锁成功,计算数据
r.expire(lock_key, 10) # 防止死锁
data = compute_func()
r.setex(key, ttl, json.dumps(data))
r.delete(lock_key)
return data
else:
# 获取锁失败,等待后重试
time.sleep(0.1)
return get_data_with_lock(key, compute_func, ttl)
八、缓存策略对比分析
不同的缓存淘汰策略适用于不同的访问模式。选择正确的缓存策略对系统性能至关重要。
8.1 策略详解
from cachetools import LRUCache, LFUCache, FIFOCache, RRCache
import time
class CacheBenchmark:
"""缓存策略基准测试"""
def __init__(self, cache, name):
self.cache = cache
self.name = name
self.hits = 0
self.misses = 0
def access(self, key):
if key in self.cache:
self.hits += 1
return self.cache[key]
else:
self.misses += 1
# 模拟获取数据
result = f"data_{key}"
self.cache[key] = result
return result
def report(self):
total = self.hits + self.misses
rate = self.hits / total * 100 if total > 0 else 0
return f"{self.name:10s} | 命中: {self.hits:4d} | 未命中: {self.misses:4d} | 命中率: {rate:5.1f}%"
# 模拟80/20访问模式(80%的访问集中在20%的数据上)
def simulate_8020(cache_obj, n_accesses=1000):
for i in range(n_accesses):
# 80%概率访问前20%的热点数据
if i % 5 != 0:
key = f"key_{i % 20}" # 热点数据
else:
key = f"key_{i % 100}" # 冷数据
cache_obj.access(key)
# 运行测试
strategies = [
(LRUCache(maxsize=50), "LRU"),
(LFUCache(maxsize=50), "LFU"),
(FIFOCache(maxsize=50), "FIFO"),
(RRCache(maxsize=50), "RR"),
]
for cache_impl, name in strategies:
bench = CacheBenchmark(cache_impl, name)
simulate_8020(bench)
print(bench.report())
# 典型输出:
# LRU | 命中: 780 | 未命中: 220 | 命中率: 78.0%
# LFU | 命中: 802 | 未命中: 198 | 命中率: 80.2%
# FIFO | 命中: 720 | 未命中: 280 | 命中率: 72.0%
# RR | 命中: 650 | 未命中: 350 | 命中率: 65.0%
8.2 策略选择指南
| 策略 | 空间复杂度 | 时间复杂度 | 命中率 | 适用场景 |
| LRU | O(n) | O(1) | 高 | 通用场景,时间局部性好 |
| LFU | O(n) | O(log n) | 最高 | 频率差异明显的热点数据 |
| FIFO | O(n) | O(1) | 中 | 数据流处理,实现简单 |
| TTL | O(n) | O(1) | 依赖配置 | 有时效性的数据 |
| Random | O(n) | O(1) | 低 | 极少使用 |
选择建议:
- 通用场景优先选择 LRU(标准库 lru_cache 就是LRU)
- 热点数据明显且集中的场景选择 LFU
- 数据需要定期更新的场景选择 TTL
- 数据流式的顺序访问选择 FIFO
- 多级缓存策略往往效果更好(如 L1: LRU + L2: Redis)
九、缓存一致性与失效策略
缓存一致性是分布式系统中最棘手的问题之一。当数据源发生变更时,缓存中的数据必须同步更新或失效,否则就会出现"脏读"现象。以下是常见的缓存一致性策略:
9.1 常见失效模式
import functools
import time
import random
class CacheManager:
"""缓存管理器,支持多种失效策略"""
def __init__(self):
self._cache = {}
self._version = 0
# -------- 策略1: Write-Through(穿透写入)--------
def write_through_set(self, key, value, db_func):
"""同时写入缓存和数据库"""
self._cache[key] = value
db_func(key, value) # 同步写数据库
print(f"[Write-Through] 缓存和数据库同步写入: {key}")
# -------- 策略2: Write-Behind(异步写入)--------
def write_behind_set(self, key, value, db_func, delay=1):
"""先写入缓存,异步写数据库"""
self._cache[key] = value
print(f"[Write-Behind] 缓存已更新: {key},数据库将在{delay}秒后更新")
# 模拟异步任务
time.sleep(delay)
db_func(key, value)
# -------- 策略3: Cache-Aside(旁路缓存)--------
def cache_aside_get(self, key, db_func, ttl=60):
"""先查缓存,未命中则查数据库并更新缓存"""
if key in self._cache:
value, expiry = self._cache[key]
if time.time() < expiry:
print(f"[Cache-Aside] 缓存命中: {key}")
return value
else:
# 懒淘汰
del self._cache[key]
print(f"[Cache-Aside] 缓存未命中,查询数据库: {key}")
value = db_func(key)
self._cache[key] = (value, time.time() + ttl)
return value
# -------- 策略4: 主动失效(Invalidation)--------
def invalidate(self, key):
"""主动使缓存失效"""
if key in self._cache:
del self._cache[key]
print(f"[Invalidation] 缓存已失效: {key}")
def invalidate_pattern(self, pattern):
"""根据模式批量失效"""
keys = [k for k in self._cache if pattern in k]
for key in keys:
del self._cache[key]
print(f"[Invalidation] 批量失效 {len(keys)} 个缓存: pattern={pattern}")
# -------- 策略5: 版本号控制--------
def versioned_get(self, key, version_func):
"""使用版本号确保缓存一致性"""
current_version = version_func()
if key in self._cache:
cached_value, cached_version = self._cache[key]
if cached_version == current_version:
return cached_value
else:
print(f"[版本控制] 缓存版本过期: {cached_version} -> {current_version}")
return None
def versioned_set(self, key, value, version):
self._cache[key] = (value, version)
# 使用示例
cm = CacheManager()
def save_to_db(key, value):
print(f" 数据库已保存: {key}={value}")
def load_from_db(key):
print(f" 从数据库加载: {key}")
return f"db_value_{key}"
# Write-Through 测试
cm.write_through_set("user:1", "Alice", save_to_db)
# Cache-Aside 测试
cm.invalidate("user:1") # 先失效
print(cm.cache_aside_get("user:1", load_from_db))
print(cm.cache_aside_get("user:1", load_from_db)) # 命中缓存
9.2 失效策略对比
| 策略 | 延迟 | 一致性 | 复杂度 | 说明 |
| Cache-Aside | 低 | 最终一致 | 低 | 最常用模式,先读缓存,未命中再读DB |
| Write-Through | 中 | 强一致 | 中 | 每次写入同时更新缓存和DB |
| Write-Behind | 极低 | 最终一致 | 高 | 先更新缓存,异步写DB(有数据丢失风险) |
| Read-Through | 低 | 最终一致 | 中 | 缓存层自动加载缺失数据(类似Cache-Aside封装) |
| Refresh-Ahead | 极低 | 最终一致 | 高 | 缓存即将过期时自动刷新(预测性加载) |
十、缓存穿透、缓存雪崩与缓存击穿
这三个问题是大规模缓存系统中的经典难题,理解其原理并掌握对应的解决方案是构建高可用系统的必备技能。
10.1 缓存穿透(Cache Penetration)
问题:查询一个根本不存在的数据,缓存和数据库中都没有,导致每次请求都穿透到数据库。恶意攻击者可以利用此漏洞对数据库发起DDoS攻击。
# 缓存穿透问题示意图
# 请求 -> 缓存未命中 -> 数据库查询 -> 未找到 -> 返回空
# 下次请求再次走完整流程...
# 解决方案1:缓存空值
import functools
def cache_null_solution(get_data_func, cache, ttl=60):
"""缓存空值方案:对不存在的结果也缓存"""
def get_data(key):
# 先查缓存
result = cache.get(key)
if result is not None:
return result if result != "__NULL__" else None
# 查数据库
data = get_data_func(key)
if data is None:
# 对空结果也缓存,但TTL较短
cache.setex(key, ttl, "__NULL__")
else:
cache.set(key, data)
return data
return get_data
# 解决方案2:布隆过滤器(Bloom Filter)
# 在缓存层之前加一道屏障
class BloomFilter:
"""简单的布隆过滤器实现"""
def __init__(self, size=100000, hash_count=3):
self.size = size
self.hash_count = hash_count
self.bit_array = 0
def _hashes(self, item):
"""生成多个哈希值"""
result = []
for i in range(self.hash_count):
h = hash(f"{i}_{item}")
result.append(h % self.size)
return result
def add(self, item):
for pos in self._hashes(item):
self.bit_array |= (1 << pos)
def might_contain(self, item):
for pos in self._hashes(item):
if not (self.bit_array & (1 << pos)):
return False
return True
# 使用布隆过滤器
bf = BloomFilter()
# 预热:将所有存在的ID加入布隆过滤器
for i in range(10000):
bf.add(f"user_{i}")
def safe_query_user(user_id):
key = f"user_{user_id}"
if not bf.might_contain(key):
return None # 确定不存在,直接返回
# 继续查缓存和数据库...
return query_database(user_id)
10.2 缓存雪崩(Cache Avalanche)
问题:大量缓存同时过期,或者缓存节点宕机,导致海量请求直接打到数据库,可能引发数据库崩溃。
# 缓存雪崩问题
# 解决方案1:缓存过期时间增加随机偏移
import random
def set_cache_with_jitter(key, value, base_ttl=300):
"""设置缓存,过期时间加入随机偏移防止同时过期"""
jitter = random.uniform(0, 60) # 0~60秒随机偏移
ttl = base_ttl + jitter
cache.setex(key, int(ttl), value)
# 批量初始化时使用
def init_cache_batch(data_dict, base_ttl=300):
for key, value in data_dict.items():
set_cache_with_jitter(key, value, base_ttl)
# 解决方案2:多级缓存
def multi_level_get(key, l1_cache, l2_cache, db_func, l1_ttl=60, l2_ttl=600):
"""
多级缓存:
L1(本地内存)-> L2(Redis)-> 数据库
L1的TTL短,L2的TTL长
"""
# L1 缓存
result = l1_cache.get(key)
if result:
return result
# L2 缓存(Redis)
result = l2_cache.get(key)
if result:
# 异步回填L1(使用较短TTL)
l1_cache.setex(key, l1_ttl, result)
return result
# 数据库
result = db_func(key)
if result:
l2_cache.setex(key, l2_ttl, result)
l1_cache.setex(key, l1_ttl, result)
return result
# 解决方案3:限流与熔断(以Redis实现)
import time
class RateLimiter:
"""滑动窗口限流器"""
def __init__(self, redis_client, max_requests=100, window=1):
self.redis = redis_client
self.max_requests = max_requests
self.window = window
def allow_request(self, key="global"):
now = time.time()
window_key = f"ratelimit:{key}"
# 移除窗口外的记录
self.redis.zremrangebyscore(window_key, 0, now - self.window)
# 检查请求数
count = self.redis.zcard(window_key)
if count >= self.max_requests:
return False
# 添加当前请求
self.redis.zadd(window_key, {f"{now}_{random.random()}": now})
self.redis.expire(window_key, self.window + 1)
return True
# 使用限流保护数据库
limiter = RateLimiter(redis_client, max_requests=10, window=1)
def protected_query(db_func, key):
if not limiter.allow_request("db_query"):
raise Exception("请求过于频繁,请稍后重试")
return db_func(key)
10.3 缓存击穿(Cache Breakdown)
问题:某个热点缓存在过期瞬间,大量并发请求同时穿透到数据库。与雪崩的区别在于,击穿针对的是单个热点key。
# 缓存击穿问题:热点key过期瞬间的高并发
# 解决方案1:互斥锁(最常用)
import threading
class HotKeyProtector:
"""热点数据保护器(互斥锁方案)"""
def __init__(self):
self._locks = {}
self._lock_lock = threading.Lock()
def get_or_compute(self, key, compute_func, cache_get, cache_set, ttl=300):
"""
使用互斥锁保护热点key
只允许一个线程/进程查询数据库,其他等待
"""
# 先尝试直接从缓存获取
result = cache_get(key)
if result is not None:
return result
# 获取或创建该key的锁
with self._lock_lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
key_lock = self._locks[key]
# 加锁(只允许一个线程通过)
with key_lock:
# 双重检查:可能等待锁期间其他线程已经填充了缓存
result = cache_get(key)
if result is not None:
return result
# 查询数据库
result = compute_func()
cache_set(key, result, ttl)
return result
# 解决方案2:逻辑过期(不设置实际过期时间)
import dataclasses
import time
@dataclasses.dataclass
class CacheItem:
"""带逻辑过期时间的缓存条目"""
value: any
expire_time: float
class LogicalExpiryCache:
"""逻辑过期缓存:数据永不过期,但标记逻辑过期"""
def __init__(self, compute_func, ttl=300):
self._cache = {}
self._ttl = ttl
self._compute = compute_func
self._lock = threading.Lock()
def get(self, key):
now = time.time()
if key in self._cache:
item = self._cache[key]
if now < item.expire_time:
return item.value # 未过期,直接返回
else:
# 逻辑过期:异步刷新
self._async_refresh(key)
# 返回旧数据(牺牲一致性换可用性)
return item.value
# 缓存不存在:同步加载
return self._sync_load(key)
def _async_refresh(self, key):
"""异步刷新缓存"""
def refresh():
new_value = self._compute(key)
self._cache[key] = CacheItem(
value=new_value,
expire_time=time.time() + self._ttl
)
thread = threading.Thread(target=refresh, daemon=True)
thread.start()
def _sync_load(self, key):
"""同步加载缓存"""
with self._lock:
# 双重检查
if key in self._cache:
item = self._cache[key]
if time.time() < item.expire_time:
return item.value
value = self._compute(key)
self._cache[key] = CacheItem(
value=value,
expire_time=time.time() + self._ttl
)
return value
10.4 三大问题对比总结
| 问题 | 原因 | 影响 | 主要解决方案 |
| 缓存穿透 | 查询不存在的数据 | 恶意攻击导致数据库压力暴增 | ①缓存空值 ②布隆过滤器 ③参数校验 |
| 缓存雪崩 | 大量缓存同时过期/宕机 | 数据库瞬间崩溃,服务不可用 | ①TTL加随机偏移 ②多级缓存 ③限流熔断 ④缓存高可用 |
| 缓存击穿 | 热点key过期瞬间高并发 | 数据库单点压力过大 | ①互斥锁 ②逻辑过期(异步刷新)③永不过期 |
十一、实战:综合缓存系统设计
下面实现一个综合性的缓存系统,整合了多级缓存、多种失效策略、监控统计和故障保护机制:
import functools
import time
import threading
import json
import logging
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CacheStats:
"""缓存统计信息"""
hits: int = 0
misses: int = 0
evictions: int = 0
total_latency: float = 0.0
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def avg_latency(self) -> float:
total = self.hits + self.misses
return self.total_latency / total if total > 0 else 0.0
class TieredCache:
"""
多级缓存系统
L1: 本地内存(LRU,最快)
L2: 本地TTL缓存
L3: 外部Redis(可选)
"""
def __init__(self,
l1_maxsize: int = 128,
l2_ttl: int = 60,
l2_maxsize: int = 1024,
enable_redis: bool = False,
redis_client=None):
self.l1 = OrderedDict() # LRU缓存
self.l1_maxsize = l1_maxsize
self.l2 = {} # TTL缓存
self.l2_ttl = l2_ttl
self.l2_maxsize = l2_maxsize
self.redis = redis_client if enable_redis else None
self.stats = CacheStats()
self._lock = threading.Lock()
def get(self, key: str, compute_func: Callable, ttl: Optional[int] = None) -> Any:
"""
多级缓存查询
顺序: L1 -> L2 -> Redis -> compute_func
"""
start = time.perf_counter()
ttl = ttl or self.l2_ttl
# L1 缓存(最快的内存LRU)
result = self._l1_get(key)
if result is not None:
self.stats.hits += 1
self.stats.total_latency += time.perf_counter() - start
return result
# L2 缓存(TTL内存缓存)
result = self._l2_get(key)
if result is not None:
self._l1_set(key, result)
self.stats.hits += 1
self.stats.total_latency += time.perf_counter() - start
return result
# Redis 缓存
if self.redis:
result = self.redis.get(key)
if result is not None:
result = json.loads(result)
self._l1_set(key, result)
self._l2_set(key, result, ttl)
self.stats.hits += 1
self.stats.total_latency += time.perf_counter() - start
return result
# 全部未命中,执行计算函数
self.stats.misses += 1
result = compute_func()
# 回填各级缓存
self._l1_set(key, result)
self._l2_set(key, result, ttl)
if self.redis:
self.redis.setex(key, ttl, json.dumps(result))
self.stats.total_latency += time.perf_counter() - start
return result
def invalidate(self, key: str):
"""使所有级别的缓存失效"""
self.l1.pop(key, None)
self.l2.pop(key, None)
if self.redis:
self.redis.delete(key)
logger.info(f"缓存已失效: {key}")
def _l1_get(self, key: str) -> Optional[Any]:
if key in self.l1:
self.l1.move_to_end(key) # LRU: 移到末尾
return self.l1[key]
return None
def _l1_set(self, key: str, value: Any):
if len(self.l1) >= self.l1_maxsize:
self.l1.popitem(last=False) # 淘汰最久未使用
self.l1[key] = value
def _l2_get(self, key: str) -> Optional[Any]:
if key in self.l2:
value, expiry = self.l2[key]
if time.time() < expiry:
return value
else:
del self.l2[key]
self.stats.evictions += 1
return None
def _l2_set(self, key: str, value: Any, ttl: int):
if len(self.l2) >= self.l2_maxsize:
# 淘汰最早过期的
oldest = min(self.l2, key=lambda k: self.l2[k][1])
del self.l2[oldest]
self.l2[key] = (value, time.time() + ttl)
def stats_report(self) -> dict:
return {
"l1_size": len(self.l1),
"l2_size": len(self.l2),
"hits": self.stats.hits,
"misses": self.stats.misses,
"hit_rate": f"{self.stats.hit_rate:.1%}",
"avg_latency_ms": f"{self.stats.avg_latency * 1000:.2f}ms",
"evictions": self.stats.evictions,
}
# 使用示例
cache = TieredCache(l1_maxsize=100, l2_ttl=60)
def expensive_query(query_id):
time.sleep(0.5) # 模拟耗时
return {"id": query_id, "data": f"result_{query_id}", "time": time.time()}
# 首次查询(全部未命中)
result1 = cache.get("query:1", lambda: expensive_query(1))
print(f"首次查询: {result1}")
# 二次查询(命中L1)
result2 = cache.get("query:1", lambda: expensive_query(1))
print(f"再次查询: {result2}")
# 查看统计
print(f"缓存统计: {cache.stats_report()}")
十二、总结与最佳实践
核心要点回顾:
- 记忆化基础:
functools.lru_cache 是Python记忆化的首选工具,使用简单、性能优秀
- 无限制缓存:Python 3.9+ 使用
functools.cache,适合参数空间有限的纯函数
- 属性缓存:类中使用
functools.cached_property 缓存耗时属性
- 第三方库:
cachetools 提供 LRU/LFU/TTL/FIFO 等多种策略
- 外部缓存:Redis是分布式场景的标准选择,结合RedisBloom可防御缓存穿透
- 多级缓存:L1(内存)+ L2(Redis)+ DB 的三级架构是大规模系统的标准方案
12.1 选择指南
| 场景 | 推荐方案 | 说明 |
| 纯函数计算(递归、数学运算) | @lru_cache 或 @cache | 最简单的记忆化方案 |
| 类中耗时属性 | @cached_property | 比手动缓存更简洁 |
| 数据有时效性 | TTLCache(cachetools) | 自动过期刷新 |
| 跨进程共享缓存 | Redis + 自定义装饰器 | 分布式系统标配 |
| 高并发热点数据 | 多级缓存 + 互斥锁 | 防止缓存击穿 |
| 防御缓存穿透 | 布隆过滤器 + 空值缓存 | 保护数据库 |
黄金法则:
- 能用标准库就不要自己实现(
lru_cache 解决80%的场景)
- 始终监控缓存命中率(
cache_info())
- 缓存不是银弹——不当的缓存会增加系统复杂度
- 永远给缓存设置合理的 TTL 或 maxsize 上限
- 在分布式系统中,缓存一致性是最难解决的问题之一