限流算法:令牌桶/漏桶/滑动窗口Python实现

Python并发编程专题 · 保护系统不被突发流量冲垮的关键技术

专题:Python并发编程系统学习

关键词:Python, 并发编程, 限流, 令牌桶, 漏桶, 滑动窗口, 限流算法, rate limiting

一、为什么需要限流

限流(Rate Limiting)是保护系统不被突发流量冲垮的关键技术手段。在分布式系统和高并发场景中,限流器的作用等同于电路中的保险丝——当流量超过系统承载能力时,主动丢弃或延迟部分请求,确保核心服务的可用性。

限流的核心目标包括以下五个方面:

核心思想:限流的本质是在"可用性"与"吞吐量"之间寻找平衡——既要充分利用系统资源,又不能让系统过载崩溃。不同的限流算法在这个权衡上各有侧重。

二、固定窗口计数器

固定窗口计数器(Fixed Window Counter)是最简单的限流算法。它将时间划分为固定的窗口(如1秒),在每个窗口内维护一个计数器,请求到达时计数器加1;当计数器超过阈值时拒绝后续请求,窗口结束后重置计数器。

这种算法的核心逻辑极其简单,仅需一个计数器加上时间判断,但存在一个严重的临界问题(Boundary Problem):如果用户在窗口末尾和下一个窗口开头集中发送请求,会导致短时间内实际流量达到阈值的两倍。

Python实现(线程安全版本):

import time import threading class FixedWindowCounter: """固定窗口计数器限流器""" def __init__(self, limit, window_size): self.limit = limit # 窗口内允许的最大请求数 self.window_size = window_size # 窗口大小(秒) self.counter = 0 self.window_start = time.time() self.lock = threading.Lock() def allow_request(self) -> bool: with self.lock: now = time.time() # 如果已超出当前窗口,重置计数器 if now - self.window_start >= self.window_size: self.window_start = now self.counter = 0 if self.counter < self.limit: self.counter += 1 return True return False

临界问题分析:假设限制为 100 次/秒。用户在 00:00:00.999 发送了 100 个请求,然后在 00:00:01.001 又发送 100 个请求。虽然在以秒为单位的两个独立窗口中都没有超过限额,但用户在两毫秒内实际发出了 200 个请求,这可能导致系统瞬间被冲垮。

三、滑动窗口日志

滑动窗口日志(Sliding Window Log)算法通过记录每个请求的时间戳来解决固定窗口的临界问题。它维护一个按时间排序的请求时间戳列表,每次新请求到达时,先移除窗口范围之外的时间戳,再检查窗口内的请求数量是否超过阈值。

精度高是滑动窗口日志算法的最大优势——它不再依赖固定的时间边界,而是以当前时间点为窗口终点向前滑动一个窗口大小,从而完全消除了临界问题。但代价是内存开销大,需要存储窗口内的所有时间戳。

Python实现:

import time import collections class SlidingWindowLog: """滑动窗口日志限流器""" def __init__(self, limit, window_size): self.limit = limit self.window_size = window_size self.timestamps = collections.deque() def allow_request(self) -> bool: now = time.time() # 移除窗口之外的时间戳 while self.timestamps and \ self.timestamps[0] <= now - self.window_size: self.timestamps.popleft() if len(self.timestamps) < self.limit: self.timestamps.append(now) return True return False

该实现的优点是逻辑直观、精度高。缺点是需要为每个请求维护时间戳记录,当窗口大、阈值高时内存开销显著。在实际高并发场景中(如每秒处理数万请求),滑动窗口日志可能因频繁的列表操作和内存分配而影响性能。

优化思路:不记录所有时间戳,而是用多个细粒度计数器代替(即滑动窗口计数器),将窗口分成N个小格子,每个格子记录该时间段内的请求数,以近似滑动窗口的效果。

四、漏桶算法(Leaky Bucket)

漏桶算法的思想源自一个形象的比喻:将请求想象成倒入桶中的水,桶底部有一个固定速率的小孔不断漏水。无论水倒入的速度有多快,水流出的速率始终是恒定的。当桶被装满时,新来的水就会溢出(即请求被拒绝)。

漏桶算法的两大核心参数:

漏桶的核心特性是强制平滑流量——无论输入流量多么不均匀,输出始终保持恒定速率。这使得漏桶特别适合需要流量整形(Traffic Shaping)的场景,如保护数据库连接池或下游脆弱服务。

Python实现(线程安全版本):

import time import threading class LeakyBucket: """漏桶算法限流器""" def __init__(self, capacity, leak_rate): self.capacity = capacity # 桶容量 self.leak_rate = leak_rate # 漏水速率(请求/秒) self.water = 0 # 当前水量 self.last_time = time.time() self.lock = threading.Lock() def allow_request(self) -> bool: with self.lock: now = time.time() # 先漏水:根据时间差计算漏掉的水量 elapsed = now - self.last_time self.water = max(0, self.water - elapsed * self.leak_rate) self.last_time = now # 检查桶是否还有剩余空间 if self.water < self.capacity: self.water += 1 return True return False

使用示例:

# 每秒最多处理5个请求,最多允许10个请求排队 bucket = LeakyBucket(capacity=10, leak_rate=5) for i in range(20): if bucket.allow_request(): print(f"请求 {i}: 通过") else: print(f"请求 {i}: 被限流") time.sleep(0.05)

漏桶的局限性在于它过于"死板"——即使系统有能力处理更多请求,漏桶也会强制将输出速率限制在固定值。这意味着在突发流量时,即便后端资源充裕,漏桶也无法利用这些资源快速处理积压的请求。

五、令牌桶算法(Token Bucket)

令牌桶算法(Token Bucket)是目前最广泛使用的限流算法,也是 Guava RateLimiter 等主流限流库的底层实现。与漏桶不同,令牌桶允许一定程度的突发流量,同时在长期来看维持平均速率。

算法原理:系统以恒定速率向桶中添加令牌(token),每个请求需要消耗一个令牌才能被处理。如果桶中有足够令牌,请求即被放行;否则被拒绝或等待。桶有最大容量限制,令牌数量不会超过容量。

令牌桶相比漏桶的关键优势:

Python实现(线程安全版本):

import time import threading class TokenBucket: """令牌桶算法限流器""" def __init__(self, capacity, fill_rate): self.capacity = capacity # 桶容量(最大令牌数) self.fill_rate = fill_rate # 令牌生成速率(个/秒) self.tokens = capacity # 当前令牌数,初始满桶 self.last_time = time.time() self.lock = threading.Lock() def consume(self, tokens=1) -> bool: """尝试消耗 tokens 个令牌,成功返回 True""" with self.lock: now = time.time() # 按时间差补充令牌 elapsed = now - self.last_time self.tokens = min( self.capacity, self.tokens + elapsed * self.fill_rate ) self.last_time = now if self.tokens >= tokens: self.tokens -= tokens return True return False

支持等待消费的版本:

def wait_and_consume(self, tokens=1): """等待直到获得足够令牌""" while True: if self.consume(tokens): return # 计算需要等待的时间 wait_time = (tokens - self.tokens) / self.fill_rate time.sleep(wait_time)

令牌桶的典型应用场景包括:API网关限流(如 Nginx limit_req 模块底层使用类似算法)、服务间RPC调用的客户端限流、以及需要允许突发但控制平均速率的通用场景。

六、asyncio下的协程安全限流器

在基于 asyncio 的异步编程中,传统的 threading.Lock 不再适用,因为协程在执行过程中会通过 await 主动让出控制权。我们需要使用 asyncio 提供的协程安全同步原语来实现限流。

基于 asyncio.Semaphore 的简单限流

Semaphore(信号量)是最简单的协程并发控制手段。它限制同时执行的协程数量,可以像漏桶一样限制并发度:

import asyncio class ConcurrencyLimiter: """基于信号量的并发度限流器""" def __init__(self, max_concurrency): self.sem = asyncio.Semaphore(max_concurrency) async def run(self, coro): async with self.sem: return await coro

异步令牌桶实现

将令牌桶改造为 asyncio 兼容版本,关键在于将 threading.Lock 替换为 asyncio.Lock,并在令牌不足时使用 asyncio.sleep 而非 time.sleep:

import asyncio import time class AsyncTokenBucket: """协程安全的异步令牌桶限流器""" def __init__(self, capacity, fill_rate): self.capacity = capacity self.fill_rate = fill_rate self.tokens = capacity self.last_time = time.time() self.lock = asyncio.Lock() async def consume(self, tokens=1) -> bool: async with self.lock: now = time.time() elapsed = now - self.last_time self.tokens = min( self.capacity, self.tokens + elapsed * self.fill_rate ) self.last_time = now if self.tokens >= tokens: self.tokens -= tokens return True return False async def wait_and_consume(self, tokens=1): while True: if await self.consume(tokens): return wait_time = (tokens - self.tokens) / self.fill_rate await asyncio.sleep(wait_time)

异步限流器在协程爬虫、异步API客户端等场景中非常实用。例如控制对某个外部API的请求速率不超过每秒10次:

async def fetch_with_throttle(urls, rate_limiter): async with aiohttp.ClientSession() as session: for url in urls: await rate_limiter.wait_and_consume() async with session.get(url) as resp: print(await resp.text())

七、分布式限流(Redis实现)

在单体应用中,可以使用进程内锁(threading.Lock)来保证限流器的线程安全。但在微服务架构中,多个服务实例各自持有独立的限流器,无法协同工作。要实现在多实例间共享的全局限流,通常借助 Redis 等外部存储。

固定窗口的Redis实现

利用 Redis 的 INCR 命令和过期时间(EXPIRE),可以高效实现分布式固定窗口计数器:

import redis.asyncio as aioredis async def fixed_window_redis(r, key, limit, window): """Redis固定窗口限流,返回是否允许请求""" count = await r.incr(key) if count == 1: await r.expire(key, window) return count <= limit # 使用示例 # r = aioredis.Redis.from_url("redis://localhost") # allowed = await fixed_window_redis(r, "rate:api:v1", 100, 1)

Redis Lua脚本实现原子限流

为确保多步操作的原子性,推荐使用 Lua 脚本在 Redis 服务端执行整个限流逻辑。这样可以避免竞态条件,并且减少网络往返:

# Lua脚本:令牌桶限流 """ local key = KEYS[1] local now = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local fill_rate = tonumber(ARGV[3]) local request_tokens = tonumber(ARGV[4]) -- 读取当前令牌数和时间戳 local tokens = redis.call("GET", key .. ":tokens") local last_time = redis.call("GET", key .. ":time") if tokens == false then tokens = capacity else tokens = tonumber(tokens) end if last_time == false then last_time = now else last_time = tonumber(last_time) end -- 补充令牌 local elapsed = now - last_time tokens = math.min(capacity, tokens + elapsed * fill_rate) -- 尝试消耗 if tokens >= request_tokens then tokens = tokens - request_tokens redis.call("SET", key .. ":tokens", tokens, "EX", 60) redis.call("SET", key .. ":time", now, "EX", 60) return 1 -- 允许 else redis.call("SET", key .. ":tokens", tokens, "EX", 60) redis.call("SET", key .. ":time", last_time, "EX", 60) return 0 -- 拒绝 end """

在 Python 中调用该 Lua 脚本:

async def token_bucket_redis(r, key, capacity, fill_rate, tokens=1): """Redis Lua脚本实现分布式令牌桶限流""" script = """ -- (脚本内容如上) """ now = time.time() result = await r.eval(script, 1, key, now, capacity, fill_rate, tokens) return result == 1

Redis滑动窗口近似实现

结合 ZSET(有序集合)可以实现滑动窗口日志的分布式版本。每个请求的时间戳作为 score 存入 ZSET,查询时通过 ZREMRANGEBYSCORE 移除过期记录,再用 ZCARD 统计活跃请求数:

async def sliding_window_redis(r, key, limit, window): """Redis ZSET实现滑动窗口限流""" now = time.time() window_start = now - window async with r.pipeline(transaction=True) as pipe: await pipe.zremrangebyscore(key, 0, window_start) await pipe.zcard(key) count = (await pipe.execute())[1] if count < limit: await r.zadd(key, {str(now): now}) await r.expire(key, int(window) + 1) return True return False

分布式限流的要点是保证操作原子性。Redis单线程模型保证了单个命令的原子性,但涉及多条命令(如 ZREMRANGEBYSCORE + ZCARD + ZADD)时,应使用 Lua 脚本或事务来避免竞态。此外,Redis 限流需要关注网络延迟对精度的影响(通常可接受毫秒级误差),并考虑 Redis 宕机时降级为本地限流的兜底策略。

八、四种算法对比与选型

下表从多个维度对比了四种主流限流算法:

特性 固定窗口 滑动窗口日志 漏桶 令牌桶
精度 低(有临界问题) 高(精确到单请求) 中(平滑但不精确) 中高(可精确到令牌)
内存开销 极低(仅一个计数器) 高(存储所有时间戳) 低(仅桶状态) 低(仅桶状态)
允许突发 否(但存在边界突发漏洞) 否(强制恒定输出) 是(积累令牌后突发)
流量整形 强(输出完全平滑) 中(长期平滑短期突发)
实现复杂度 极低
分布式实现 简单(INCR+EXPIRE) 中等(ZSET实现) 较复杂(需Lua脚本) 较复杂(需Lua脚本)
适用场景 精度要求不高的简单场景 精度要求极高的场景 下游脆弱、必须平滑流量 通用场景,兼顾平滑与突发

选型建议

综合使用策略

在实际生产环境中,单一限流算法往往无法满足所有需求。推荐采用多级限流策略:

多级限流的好处是:外层拦截大部分超限流量,内层做精细控制,每一层的失败都可以回退到上一层的决策。同时,每层的限流参数应随负载动态调整,配合服务发现和自动扩缩容机制,实现弹性的流量控制。

最佳实践总结:

1. 所有限流操作都应记录日志,用于事后分析和阈值调优。

2. 被限流的请求应返回明确的状态码(如 HTTP 429 Too Many Requests)和 Retry-After 头部。

3. 限流参数不应硬编码,应通过配置中心动态下发。

4. 做好降级预案——Redis 不可用时回退到本地限流,本地限流不可用时回退到熔断。

5. 限流和熔断、降级、负载均衡配合使用,形成完整的系统保护体系。

注意:本文档中的代码示例仅供学习参考。在实际生产环境中使用限流算法时,建议优先考虑成熟的限流库(如 Google Guava RateLimiter、令牌桶实现参考 limits 库等),并充分测试极端流量下的表现。