← 返回并发编程目录
← 返回学习笔记首页
专题: Python并发编程系统学习
关键词: Python, 并发编程, 限流, 令牌桶, 漏桶, 滑动窗口, 限流算法, rate limiting
一、为什么需要限流
限流(Rate Limiting)是保护系统不被突发流量冲垮的关键技术手段。在分布式系统和高并发场景中,限流器的作用等同于电路中的保险丝——当流量超过系统承载能力时,主动丢弃或延迟部分请求,确保核心服务的可用性。
限流的核心目标包括以下五个方面:
防止系统过载 :每个系统都有处理能力的上限(如数据库连接数、线程池大小、网络带宽等)。限流确保实际流量始终低于系统承载阈值,避免因过载导致雪崩效应。
公平使用资源 :在多租户系统中,限流防止单一客户端抢占过多资源,保障所有用户的公平访问。
成本控制 :云服务按使用量计费。通过限流控制资源消耗,有助于将运营成本维持在预算范围内。
DoS防护 :限流可以有效防御低成本的拒绝服务攻击,防止恶意客户端通过大量请求耗尽服务端资源。
服务SLA保障 :通过合理的限流策略,确保服务响应时间和服务等级协议(SLA)得到满足。
核心思想: 限流的本质是在"可用性"与"吞吐量"之间寻找平衡——既要充分利用系统资源,又不能让系统过载崩溃。不同的限流算法在这个权衡上各有侧重。
二、固定窗口计数器
固定窗口计数器(Fixed Window Counter)是最简单的限流算法。它将时间划分为固定的窗口(如1秒),在每个窗口内维护一个计数器,请求到达时计数器加1;当计数器超过阈值时拒绝后续请求,窗口结束后重置计数器。
这种算法的核心逻辑极其简单,仅需一个计数器加上时间判断,但存在一个严重的临界问题(Boundary Problem) :如果用户在窗口末尾和下一个窗口开头集中发送请求,会导致短时间内实际流量达到阈值的两倍。
Python实现(线程安全版本):
import time
import threading
class FixedWindowCounter :
"""固定窗口计数器限流器"""
def __init__ (self , limit, window_size):
self .limit = limit # 窗口内允许的最大请求数
self .window_size = window_size # 窗口大小(秒)
self .counter = 0
self .window_start = time.time()
self .lock = threading.Lock()
def allow_request (self ) -> bool:
with self .lock:
now = time.time()
# 如果已超出当前窗口,重置计数器
if now - self .window_start >= self .window_size:
self .window_start = now
self .counter = 0
if self .counter < self .limit:
self .counter += 1
return True
return False
临界问题分析:假设限制为 100 次/秒。用户在 00:00:00.999 发送了 100 个请求,然后在 00:00:01.001 又发送 100 个请求。虽然在以秒为单位的两个独立窗口中都没有超过限额,但用户在两毫秒内实际发出了 200 个请求,这可能导致系统瞬间被冲垮。
三、滑动窗口日志
滑动窗口日志(Sliding Window Log)算法通过记录每个请求的时间戳来解决固定窗口的临界问题。它维护一个按时间排序的请求时间戳列表,每次新请求到达时,先移除窗口范围之外的时间戳,再检查窗口内的请求数量是否超过阈值。
精度高是滑动窗口日志算法的最大优势——它不再依赖固定的时间边界,而是以当前时间点为窗口终点向前滑动一个窗口大小,从而完全消除了临界问题。但代价是内存开销大,需要存储窗口内的所有时间戳。
Python实现:
import time
import collections
class SlidingWindowLog :
"""滑动窗口日志限流器"""
def __init__ (self , limit, window_size):
self .limit = limit
self .window_size = window_size
self .timestamps = collections.deque()
def allow_request (self ) -> bool:
now = time.time()
# 移除窗口之外的时间戳
while self .timestamps and \
self .timestamps[0 ] <= now - self .window_size:
self .timestamps.popleft()
if len(self .timestamps) < self .limit:
self .timestamps.append(now)
return True
return False
该实现的优点是逻辑直观、精度高。缺点是需要为每个请求维护时间戳记录,当窗口大、阈值高时内存开销显著。在实际高并发场景中(如每秒处理数万请求),滑动窗口日志可能因频繁的列表操作和内存分配而影响性能。
优化思路:不记录所有时间戳,而是用多个细粒度计数器代替(即滑动窗口计数器),将窗口分成N个小格子,每个格子记录该时间段内的请求数,以近似滑动窗口的效果。
四、漏桶算法(Leaky Bucket)
漏桶算法的思想源自一个形象的比喻:将请求想象成倒入桶中的水,桶底部有一个固定速率的小孔不断漏水。无论水倒入的速度有多快,水流出的速率始终是恒定的。当桶被装满时,新来的水就会溢出(即请求被拒绝)。
漏桶算法的两大核心参数:
桶容量(capacity) :允许的最大排队请求数量,决定了能容忍的瞬时突发量。
漏水速率(leak_rate) :每秒钟能处理的请求数量,决定了系统的最大吞吐量。
漏桶的核心特性是强制平滑流量 ——无论输入流量多么不均匀,输出始终保持恒定速率。这使得漏桶特别适合需要流量整形(Traffic Shaping)的场景,如保护数据库连接池或下游脆弱服务。
Python实现(线程安全版本):
import time
import threading
class LeakyBucket :
"""漏桶算法限流器"""
def __init__ (self , capacity, leak_rate):
self .capacity = capacity # 桶容量
self .leak_rate = leak_rate # 漏水速率(请求/秒)
self .water = 0 # 当前水量
self .last_time = time.time()
self .lock = threading.Lock()
def allow_request (self ) -> bool:
with self .lock:
now = time.time()
# 先漏水:根据时间差计算漏掉的水量
elapsed = now - self .last_time
self .water = max(0 , self .water - elapsed * self .leak_rate)
self .last_time = now
# 检查桶是否还有剩余空间
if self .water < self .capacity:
self .water += 1
return True
return False
使用示例:
# 每秒最多处理5个请求,最多允许10个请求排队
bucket = LeakyBucket(capacity=10 , leak_rate=5 )
for i in range(20 ):
if bucket.allow_request():
print(f"请求 {i}: 通过" )
else :
print(f"请求 {i}: 被限流" )
time.sleep(0.05 )
漏桶的局限性在于它过于"死板"——即使系统有能力处理更多请求,漏桶也会强制将输出速率限制在固定值。这意味着在突发流量时,即便后端资源充裕,漏桶也无法利用这些资源快速处理积压的请求。
五、令牌桶算法(Token Bucket)
令牌桶算法(Token Bucket)是目前最广泛使用的限流算法,也是 Guava RateLimiter 等主流限流库的底层实现。与漏桶不同,令牌桶允许一定程度的突发流量,同时在长期来看维持平均速率。
算法原理:系统以恒定速率向桶中添加令牌(token),每个请求需要消耗一个令牌才能被处理。如果桶中有足够令牌,请求即被放行;否则被拒绝或等待。桶有最大容量限制,令牌数量不会超过容量。
令牌桶相比漏桶的关键优势:
允许突发 :当桶内积累了足够令牌时,可以一次性发送大量请求,充分利用空闲资源。
平滑与弹性兼顾 :长期速率受控于令牌生成速率,短期则允许弹性波动。
灵活配置 :调整桶容量和生成速率即可控制突发程度和平均速率。
Python实现(线程安全版本):
import time
import threading
class TokenBucket :
"""令牌桶算法限流器"""
def __init__ (self , capacity, fill_rate):
self .capacity = capacity # 桶容量(最大令牌数)
self .fill_rate = fill_rate # 令牌生成速率(个/秒)
self .tokens = capacity # 当前令牌数,初始满桶
self .last_time = time.time()
self .lock = threading.Lock()
def consume (self , tokens=1 ) -> bool:
"""尝试消耗 tokens 个令牌,成功返回 True"""
with self .lock:
now = time.time()
# 按时间差补充令牌
elapsed = now - self .last_time
self .tokens = min(
self .capacity,
self .tokens + elapsed * self .fill_rate
)
self .last_time = now
if self .tokens >= tokens:
self .tokens -= tokens
return True
return False
支持等待消费的版本:
def wait_and_consume (self , tokens=1 ):
"""等待直到获得足够令牌"""
while True :
if self .consume(tokens):
return
# 计算需要等待的时间
wait_time = (tokens - self .tokens) / self .fill_rate
time.sleep(wait_time)
令牌桶的典型应用场景包括:API网关限流(如 Nginx limit_req 模块底层使用类似算法)、服务间RPC调用的客户端限流、以及需要允许突发但控制平均速率的通用场景。
六、asyncio下的协程安全限流器
在基于 asyncio 的异步编程中,传统的 threading.Lock 不再适用,因为协程在执行过程中会通过 await 主动让出控制权。我们需要使用 asyncio 提供的协程安全同步原语来实现限流。
基于 asyncio.Semaphore 的简单限流
Semaphore(信号量)是最简单的协程并发控制手段。它限制同时执行的协程数量,可以像漏桶一样限制并发度:
import asyncio
class ConcurrencyLimiter :
"""基于信号量的并发度限流器"""
def __init__ (self , max_concurrency):
self .sem = asyncio.Semaphore(max_concurrency)
async def run (self , coro):
async with self .sem:
return await coro
异步令牌桶实现
将令牌桶改造为 asyncio 兼容版本,关键在于将 threading.Lock 替换为 asyncio.Lock,并在令牌不足时使用 asyncio.sleep 而非 time.sleep:
import asyncio
import time
class AsyncTokenBucket :
"""协程安全的异步令牌桶限流器"""
def __init__ (self , capacity, fill_rate):
self .capacity = capacity
self .fill_rate = fill_rate
self .tokens = capacity
self .last_time = time.time()
self .lock = asyncio.Lock()
async def consume (self , tokens=1 ) -> bool:
async with self .lock:
now = time.time()
elapsed = now - self .last_time
self .tokens = min(
self .capacity,
self .tokens + elapsed * self .fill_rate
)
self .last_time = now
if self .tokens >= tokens:
self .tokens -= tokens
return True
return False
async def wait_and_consume (self , tokens=1 ):
while True :
if await self .consume(tokens):
return
wait_time = (tokens - self .tokens) / self .fill_rate
await asyncio.sleep(wait_time)
异步限流器在协程爬虫、异步API客户端等场景中非常实用。例如控制对某个外部API的请求速率不超过每秒10次:
async def fetch_with_throttle (urls, rate_limiter):
async with aiohttp.ClientSession() as session:
for url in urls:
await rate_limiter.wait_and_consume()
async with session.get(url) as resp:
print(await resp.text())
七、分布式限流(Redis实现)
在单体应用中,可以使用进程内锁(threading.Lock)来保证限流器的线程安全。但在微服务架构中,多个服务实例各自持有独立的限流器,无法协同工作。要实现在多实例间共享的全局限流,通常借助 Redis 等外部存储。
固定窗口的Redis实现
利用 Redis 的 INCR 命令和过期时间(EXPIRE),可以高效实现分布式固定窗口计数器:
import redis.asyncio as aioredis
async def fixed_window_redis (r, key, limit, window):
"""Redis固定窗口限流,返回是否允许请求"""
count = await r.incr(key)
if count == 1 :
await r.expire(key, window)
return count <= limit
# 使用示例
# r = aioredis.Redis.from_url("redis://localhost")
# allowed = await fixed_window_redis(r, "rate:api:v1", 100, 1)
Redis Lua脚本实现原子限流
为确保多步操作的原子性,推荐使用 Lua 脚本在 Redis 服务端执行整个限流逻辑。这样可以避免竞态条件,并且减少网络往返:
# Lua脚本:令牌桶限流
"""
local key = KEYS[1]
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local fill_rate = tonumber(ARGV[3])
local request_tokens = tonumber(ARGV[4])
-- 读取当前令牌数和时间戳
local tokens = redis.call("GET", key .. ":tokens")
local last_time = redis.call("GET", key .. ":time")
if tokens == false then
tokens = capacity
else
tokens = tonumber(tokens)
end
if last_time == false then
last_time = now
else
last_time = tonumber(last_time)
end
-- 补充令牌
local elapsed = now - last_time
tokens = math.min(capacity, tokens + elapsed * fill_rate)
-- 尝试消耗
if tokens >= request_tokens then
tokens = tokens - request_tokens
redis.call("SET", key .. ":tokens", tokens, "EX", 60)
redis.call("SET", key .. ":time", now, "EX", 60)
return 1 -- 允许
else
redis.call("SET", key .. ":tokens", tokens, "EX", 60)
redis.call("SET", key .. ":time", last_time, "EX", 60)
return 0 -- 拒绝
end
"""
在 Python 中调用该 Lua 脚本:
async def token_bucket_redis (r, key, capacity, fill_rate, tokens=1 ):
"""Redis Lua脚本实现分布式令牌桶限流"""
script = """
-- (脚本内容如上)
"""
now = time.time()
result = await r.eval(script, 1 , key, now, capacity, fill_rate, tokens)
return result == 1
Redis滑动窗口近似实现
结合 ZSET(有序集合)可以实现滑动窗口日志的分布式版本。每个请求的时间戳作为 score 存入 ZSET,查询时通过 ZREMRANGEBYSCORE 移除过期记录,再用 ZCARD 统计活跃请求数:
async def sliding_window_redis (r, key, limit, window):
"""Redis ZSET实现滑动窗口限流"""
now = time.time()
window_start = now - window
async with r.pipeline(transaction=True ) as pipe:
await pipe.zremrangebyscore(key, 0 , window_start)
await pipe.zcard(key)
count = (await pipe.execute())[1 ]
if count < limit:
await r.zadd(key, {str(now): now})
await r.expire(key, int(window) + 1 )
return True
return False
分布式限流的要点是保证操作原子性 。Redis单线程模型保证了单个命令的原子性,但涉及多条命令(如 ZREMRANGEBYSCORE + ZCARD + ZADD)时,应使用 Lua 脚本或事务来避免竞态。此外,Redis 限流需要关注网络延迟对精度的影响(通常可接受毫秒级误差),并考虑 Redis 宕机时降级为本地限流的兜底策略。
八、四种算法对比与选型
下表从多个维度对比了四种主流限流算法:
特性
固定窗口
滑动窗口日志
漏桶
令牌桶
精度
低(有临界问题)
高(精确到单请求)
中(平滑但不精确)
中高(可精确到令牌)
内存开销
极低(仅一个计数器)
高(存储所有时间戳)
低(仅桶状态)
低(仅桶状态)
允许突发
否(但存在边界突发漏洞)
否
否(强制恒定输出)
是(积累令牌后突发)
流量整形
弱
中
强(输出完全平滑)
中(长期平滑短期突发)
实现复杂度
极低
低
中
中
分布式实现
简单(INCR+EXPIRE)
中等(ZSET实现)
较复杂(需Lua脚本)
较复杂(需Lua脚本)
适用场景
精度要求不高的简单场景
精度要求极高的场景
下游脆弱、必须平滑流量
通用场景,兼顾平滑与突发
选型建议
API网关限流 :推荐令牌桶或滑动窗口计数器(细粒度版本)。令牌桶允许用户在资源空闲时加速处理,更适合Web API场景。
数据库连接池保护 :推荐漏桶。数据库连接数必须严格控制在最大值以下,流量平滑至关重要。
支付/交易系统 :推荐滑动窗口日志+令牌桶组合。滑动窗口确保不会因临界问题导致重复扣款风险,令牌桶控制平均速率。
文件上传/下载限速 :推荐漏桶。带宽资源需要严格限制,不能让突发流量打满带宽。
消息队列生产者 :推荐令牌桶。消息生产速率可以有弹性,但不能长时间超过下游消费能力。
登录接口防护 :推荐固定窗口或滑动窗口计数器。简单有效,防止暴力破解。
综合使用策略
在实际生产环境中,单一限流算法往往无法满足所有需求。推荐采用多级限流 策略:
第一层(最外层) :Nginx/Gateway层的分布式令牌桶限流,基于客户端IP或API Key分流。
第二层(应用层) :进程内本地令牌桶(如 Guava RateLimiter),提供毫秒级响应的限流决策,避免每次都请求Redis。
第三层(资源层) :数据库连接池和线程池自身的容量限制作为最后防线。
多级限流的好处是:外层拦截大部分超限流量,内层做精细控制,每一层的失败都可以回退到上一层的决策。同时,每层的限流参数应随负载动态调整,配合服务发现和自动扩缩容机制,实现弹性的流量控制。
最佳实践总结:
1. 所有限流操作都应记录日志,用于事后分析和阈值调优。
2. 被限流的请求应返回明确的状态码(如 HTTP 429 Too Many Requests)和 Retry-After 头部。
3. 限流参数不应硬编码,应通过配置中心动态下发。
4. 做好降级预案——Redis 不可用时回退到本地限流,本地限流不可用时回退到熔断。
5. 限流和熔断、降级、负载均衡配合使用,形成完整的系统保护体系。
注意: 本文档中的代码示例仅供学习参考。在实际生产环境中使用限流算法时,建议优先考虑成熟的限流库(如 Google Guava RateLimiter、令牌桶实现参考 limits 库等),并充分测试极端流量下的表现。