异步迭代器与异步上下文管理器

深入理解 async for 与 async with 协议

专题: Python 进阶编程 —— 异步编程核心协议

核心主题: 异步迭代器协议与异步上下文管理器协议的原理与实战

主要内容: __aiter__/__anext__ 异步迭代协议、AsyncIterable/AsyncIterator 抽象基类、async for 循环的底层机制、异步生成器(async yield)、__aenter__/__aexit__ 异步上下文协议、async with 语句执行流程、数据库连接池管理、HTTP 会话管理、流式数据处理、异步迭代器与上下文管理器组合模式

关键词: Python, 异步迭代器, 异步上下文管理器, async for, async with, __aiter__, __anext__, __aenter__, __aexit__, 异步生成器, asyncio

一、概述

Python 的异步编程生态中,异步迭代器(Async Iterator)异步上下文管理器(Async Context Manager)是两根核心支柱。它们分别通过 async forasync with 语句暴露给开发者,让异步代码能够以同步般的简洁风格处理流式数据和资源生命周期管理。

理解这两个协议,不仅有助于正确使用 asyncio 标准库和第三方框架(如 aiohttp、asyncpg、motor),更重要的是能够自定义异步迭代器和异步上下文管理器,构建属于自己的异步基础设施。

核心概念速览:

  • 异步迭代器协议: 实现 __aiter__()__anext__() 方法,支持 async for 遍历
  • 异步可迭代对象: 实现 __aiter__() 方法,返回一个异步迭代器
  • 异步生成器: 使用 async yield 的协程函数,自动实现异步迭代器协议
  • 异步上下文管理器协议: 实现 __aenter__()__aexit__() 方法,支持 async with 语句
  • 关系密度: 一次异步数据库查询或者一个 HTTP 请求,可能同时使用异步迭代器(游标遍历)和异步上下文管理器(连接管理)

二、异步迭代器协议

异步迭代器是对同步迭代器的异步化扩展。它与同步迭代器的核心区别在于:__anext__() 方法是一个协程方法,返回一个 awaitable 对象,而不是直接返回下一个值。

2.1 协议的正式定义

根据 PEP 492,异步迭代器需要实现两个方法:

import asyncio from typing import AsyncIterator, AsyncIterable, Awaitable, Any class RangeAsyncIterator: """一个简单的异步迭代器:异步地产生 0 到 n-1 的数字""" def __init__(self, n: int): self._n = n self._current = 0 def __aiter__(self) -> AsyncIterator[int]: return self async def __anext__(self) -> int: if self._current >= self._n: raise StopAsyncIteration value = self._current self._current += 1 await asyncio.sleep(0.1) # 模拟异步 I/O return value

2.2 AsyncIterable 与 AsyncIterator 抽象基类

Python 在 collections.abc 模块中提供了两个抽象基类:

from collections.abc import AsyncIterator class MyAsyncRange(AsyncIterator): """使用抽象基类确保协议的正确实现""" def __init__(self, limit: int): self._limit = limit self._i = 0 async def __anext__(self) -> int: if self._i >= self._limit: raise StopAsyncIteration await asyncio.sleep(0.05) result = self._i self._i += 1 return result # __aiter__ 由 AsyncIterator 基类自动提供:return self # 使用示例 async def main(): async for num in MyAsyncRange(5): print(f"Got: {num}")

"__aiter__ 可以是一个普通方法或协程方法,但 __anext__ 必须是协程方法(即用 async def 定义),因为它需要被 await 来获取下一个值。"

三、async for 循环的底层机制

当我们编写 async for x in async_iterable: 时,Python 编译器将其展开为类似以下的人工代码:

# async for x in async_iterable: # body(x) # # 等价于: async def _async_for_machinery(async_iterable): __aiter = async_iterable.__aiter__() # 获取异步迭代器 while True: try: x = await __aiter.__anext__() # 等待下一个值 except StopAsyncIteration: # 没有更多值 break # 退出循环 else: body(x) # 执行循环体

关键点在于:每一轮迭代都会产生一次 await 暂停点。这意味着在等待 __anext__() 返回的过程中,事件循环可以切换到其他协程执行,从而实现真正的异步并发。

await 暂停点的意义:

  • async for 的每一次循环体执行之间,当前协程都会暂停(在 await __anext__() 处)
  • 暂停期间事件循环可以处理其他任务,实现并发 I/O
  • 这与同步 for 有本质区别——同步 for 中整个遍历是阻塞的
  • 如果 __anext__() 内部没有真正的 await 点,异步迭代不会带来并发收益

3.1 异步迭代器的常见错误

错误 1:在同步函数中返回异步迭代器

# 错误!普通函数不能返回 awaitable def wrong_aiter(): return SomeAsyncIterator() # 直接在同步迭代中使用 async for 会报错 # 正确做法:在 async def 中使用 async for async def use_async_iter(): async for item in SomeAsyncIterator(): process(item)

错误 2:忘记抛出 StopAsyncIteration

# 错误!如果 __anext__ 不抛出 StopAsyncIteration # async for 会无限循环下去 async def __anext__(self): if self._exhausted: raise StopAsyncIteration # 必须! ...

四、异步生成器

手动实现 __aiter____anext__ 比较繁琐。Python 3.6+ 提供了异步生成器(Async Generator),只需在 async def 函数中使用 yield 语句即可自动实现整个异步迭代器协议。

import asyncio async def async_range(n: int): """异步生成器:自动成为 AsyncIterator""" for i in range(n): await asyncio.sleep(0.1) # 模拟异步操作 yield i # 自动暂停并返回值 async def main(): async for value in async_range(5): print(f"Received: {value}")

4.1 异步生成器的工作原理

异步生成器函数返回一个异步生成器对象(AsyncGenerator),该对象同时实现了 AsyncIteratorAsyncIterable 协议。与普通生成器类似,异步生成器在遇到 yield 时暂停执行,但区别在于:

异步生成器的四个异步方法

  • __anext__() — 继续执行到下一个 yield 并返回值
  • asend(value) — 向生成器发送一个值并继续执行(对应 yield 表达式的值)
  • athrow(type, value, traceback) — 在生成器暂停点抛出异常
  • aclose() — 关闭生成器,使其释放资源

4.2 异步生成器表达式

Python 3.6+ 还支持异步生成器表达式,语法与普通生成器表达式类似,只是在 for 前加上 async 关键字:

async def fetch_urls(urls): import aiohttp async with aiohttp.ClientSession() as session: # 异步生成器表达式 results = (await session.get(url)).text() async for url in urls # 注意:这不是列表推导式,而是一个异步生成器 async for text in results: print(text[:50])

"异步生成器优雅地解决了手动实现异步迭代器协议的样板代码问题。一个 async def 函数中的 yield 语句,自动为你处理了 __aiter__、__anext__、StopAsyncIteration 的所有细节。"

五、异步上下文管理器协议

同步上下文管理器使用 __enter____exit__ 方法,用于资源获取和释放。异步上下文管理器则将其替换为协程方法 __aenter____aexit__,使得资源管理操作(如建立/关闭数据库连接)可以在不阻塞事件循环的情况下执行。

5.1 协议的正式定义

from typing import AsyncContextManager, Optional from types import TracebackType class AsyncDatabaseConnection: """模拟异步数据库连接管理器""" def __init__(self, dsn: str): self.dsn = dsn self.connection = None async def __aenter__(self) -> "AsyncDatabaseConnection": """异步建立连接""" print(f"Connecting to {self.dsn}...") await asyncio.sleep(1) # 模拟异步 I/O 建立连接 self.connection = f"Connection({self.dsn})" print("Connected!") return self async def __aexit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> Optional[bool]: """异步关闭连接""" print(f"Closing connection {self.connection}...") await asyncio.sleep(0.5) # 模拟异步关闭 self.connection = None print("Connection closed.") return False # False 表示不抑制异常(如果发生的话) async def main(): async with AsyncDatabaseConnection("postgresql://localhost/mydb") as conn: print(f"Using: {conn.connection}") # 在这里执行数据库操作... # 退出 async with 块后,连接自动关闭

5.2 AsyncContextManager 抽象基类

collections.abc.AsyncContextManager 定义了正式的协议接口,但它的使用是非强制的——只要对象实现了 __aenter____aexit__ 方法,就可以用于 async with 语句。

from collections.abc import AsyncContextManager import asyncio class AsyncTimer(AsyncContextManager): """用 async with 测量异步操作耗时""" async def __aenter__(self): self._start = asyncio.get_event_loop().time() return self async def __aexit__(self, exc_type, exc_val, exc_tb): elapsed = asyncio.get_event_loop().time() - self._start print(f"Elapsed: {elapsed:.3f}s") return False async def main(): async with AsyncTimer(): await asyncio.sleep(1.5) # 输出: Elapsed: 1.500s

六、async with 语句的执行流程

Python 编译器将 async with 语句展开为类似以下的伪代码:

# async with AsyncObject() as x: # body(x) # # 等价于: async def _async_with_machinery(): __manager = AsyncObject() # 构造上下文管理器 __exit = type(__manager).__aexit__ # 提前获取 __aexit__ 引用 __value = await type(__manager).__aenter__(__manager) # 1. 异步进入 exc = True # 标记是否发生异常 try: try: x = __value # 2. as 子句绑定 body(x) # 3. 执行代码块 except: exc = False if not await __exit(__manager, *sys.exc_info()): # 4. 异常时调用 __aexit__ raise finally: if exc: await __exit(__manager, None, None, None) # 5. 正常退出也调用 __aexit__

执行流程的五步:

  1. 进入: await __aenter__() — 建立资源连接
  2. 绑定:__aenter__ 的返回值赋给 as 子句的变量
  3. 执行: 执行 async with 代码块
  4. 异常处理: 代码块中若抛出异常,await __aexit__() 并传入异常信息;若 __aexit__ 返回 True 则抑制异常
  5. 清理: 无论正常还是异常退出,__aexit__() 都会被调用以释放资源

七、实战应用:数据库连接管理

异步上下文管理器最常见的应用场景之一就是数据库连接池管理。下面以 asyncpg(PostgreSQL 异步驱动)和 redis.asyncio 为例展示真实世界的用法。

import asyncio import asyncpg class ConnectionPool: """简化的异步数据库连接池""" def __init__(self, dsn: str, min_size: int = 2, max_size: int = 10): self.dsn = dsn self.min_size = min_size self.max_size = max_size self._pool = None self._conns: list = [] async def __aenter__(self) -> "ConnectionPool": """异步创建连接池""" self._pool = await asyncpg.create_pool( dsn=self.dsn, min_size=self.min_size, max_size=self.max_size ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步关闭连接池中的所有连接""" if self._pool: await self._pool.close() self._pool = None return False async def fetch(self, query: str, *args): """在连接池中执行查询""" async with self._pool.acquire() as conn: return await conn.fetch(query, *args) # 使用示例 async def main(): async with ConnectionPool("postgresql://user:pass@localhost/db") as pool: rows = await pool.fetch("SELECT * FROM users WHERE active = $1", True) print(rows) # 退出后连接池自动关闭

7.1 HTTP 会话管理

aiohttp 的 ClientSession 是异步上下文管理器的经典实现,它管理着底层的 TCP 连接池:

import aiohttp import asyncio async def fetch_multiple(urls: list[str]): """异步 HTTP 会话管理 + 流式响应读取""" async with aiohttp.ClientSession() as session: # 组合使用 async with 和 async for for url in urls: async with session.get(url) as response: # 流式读取响应内容 chunk_count = 0 async for chunk in response.content.iter_chunked(1024): process_chunk(chunk) chunk_count += 1 print(f"{url}: {chunk_count} chunks received")

"aiohttp 的 ClientSession 和 Response 对象各自实现了异步上下文管理器协议:Session 管理连接池生命周期,Response 管理单个 HTTP 响应的资源释放。两者嵌套组合使用,展示了 async with 在真实项目中的最佳实践。"

八、实战应用:流式数据处理

异步迭代器在流式数据处理场景中展现出显著优势。以下示例展示了如何从大文件中异步读取行、从 WebSocket 接收消息、以及从 Kafka 消费消息。

8.1 异步文件行读取器

import aiofiles import asyncio async def read_large_file(filepath: str): """异步生成器:逐行读取大文件,不阻塞事件循环""" async with aiofiles.open(filepath, mode='r') as f: async for line in f: yield line.strip() async def process_logs(filepath: str): """处理日志文件,过滤出 ERROR 级别的日志""" async for line in read_large_file(filepath): if "ERROR" in line: print(f"[ERROR] {line}")

8.2 WebSocket 消息流

import asyncio from collections.abc import AsyncIterator class WebSocketStream(AsyncIterator): """将 WebSocket 消息流包装为异步迭代器""" def __init__(self, websocket): self._ws = websocket self._closed = False async def __anext__(self) -> str: if self._closed: raise StopAsyncIteration try: msg = await self._ws.recv() if msg is None: self._closed = True raise StopAsyncIteration return msg except ConnectionClosed: self._closed = True raise StopAsyncIteration async def handle_websocket(websocket): async for message in WebSocketStream(websocket): print(f"Received: {message}") # 处理消息...

8.3 异步迭代器转换管道

利用异步生成器可以轻松构建流式数据处理管道,每个阶段都是一个独立的异步生成器:

async def source(n: int): """数据源:产生原始数字""" for i in range(n): await asyncio.sleep(0.1) yield i async def filter_even(async_iter): """过滤管道:只保留偶数""" async for value in async_iter: if value % 2 == 0: yield value async def double(async_iter): """转换管道:将值加倍""" async for value in async_iter: yield value * 2 async def main(): pipeline = double(filter_even(source(10))) async for result in pipeline: print(result, end=" ") # 输出: 0 4 8 12 16

九、组合使用:异步迭代器 + 异步上下文管理器

真实世界的高质量异步代码几乎总是同时使用这两种协议。下面是一个综合示例:一个异步数据库游标,它同时作为上下文管理器管理连接,又作为迭代器逐行返回查询结果。

import asyncio from collections.abc import AsyncIterator class AsyncCursor(AsyncIterator): """ 同时实现异步上下文管理器和异步迭代器。 使用 async with 管理连接生命周期, 使用 async for 逐行获取查询结果。 """ def __init__(self, dsn: str, query: str): self.dsn = dsn self.query = query self._conn = None self._cursor = None self._rows: list = [] self._index = 0 async def __aenter__(self) -> "AsyncCursor": # 异步建立连接并执行查询 self._conn = await create_async_connection(self.dsn) self._cursor = await self._conn.execute(self.query) self._rows = await self._cursor.fetchmany(100) # 预取一批 return self async def __aexit__(self, exc_type, exc_val, exc_tb): # 异步关闭游标和连接 if self._cursor: await self._cursor.close() if self._conn: await self._conn.close() return False async def __anext__(self) -> dict: # 逐行返回结果,当前批次取完时自动获取下一批 if self._index >= len(self._rows): self._rows = await self._cursor.fetchmany(100) self._index = 0 if not self._rows: raise StopAsyncIteration row = self._rows[self._index] self._index += 1 return row # 使用:一个对象同时管理连接和迭代 async def query_database(): async with AsyncCursor( "postgresql://localhost/mydb", "SELECT * FROM large_table" ) as cursor: async for row in cursor: print(row["id"], row["name"]) # async with 退出时连接自动关闭

组合模式的三大好处:

  • 资源安全: async with 确保无论迭代是否完成、是否发生异常,连接都会被正确关闭
  • 内存友好: async for 逐行或分批获取数据,避免将所有结果加载到内存中
  • 代码简洁: 一个对象封装所有资源管理逻辑,调用方只需一个 async with + async for 嵌套

十、更多实用模式

10.1 使用 @asynccontextmanager 装饰器

Python 3.7+ 的 contextlib 提供了 @asynccontextmanager 装饰器,可以用生成器的方式快速创建异步上下文管理器:

from contextlib import asynccontextmanager @asynccontextmanager async def async_open_file(filepath: str, mode: str = "r"): """使用装饰器快速创建异步上下文管理器""" print(f"Opening {filepath}...") f = await aiofiles.open(filepath, mode) try: yield f finally: print(f"Closing {filepath}...") await f.close() # 使用 async def main(): async with async_open_file("data.txt") as f: content = await f.read() print(content[:100])

10.2 异步可迭代对象的上下文管理

有些场景需要将异步可迭代对象同时作为上下文管理器使用。Python 3.7 之后,@asynccontextmanager 创建的异步上下文管理器本身就可以在 async for 中使用(因为它实现了 __aiter__ 协议——通过 yield 的值):

@asynccontextmanager async def streaming_api_client(base_url: str): """同时作为上下文管理器和异步可迭代对象""" session = aiohttp.ClientSession(base_url) try: async def stream(endpoint: str): async with session.get(endpoint) as resp: async for chunk in resp.content.iter_any(): yield chunk yield stream finally: await session.close() # 使用 async def main(): async with streaming_api_client("https://api.example.com") as client: async for chunk in client("/large-data"): process(chunk)

10.3 超时控制与自动重试

AsyncRetry 上下文管理器

@asynccontextmanager async def async_retry(max_retries: int = 3, delay: float = 1.0): """带自动重试的异步上下文管理器""" last_exc = None for attempt in range(max_retries): try: yield attempt + 1 return except (ConnectionError, TimeoutError) as e: last_exc = e if attempt < max_retries - 1: print(f"Attempt {attempt + 1} failed, retrying...") await asyncio.sleep(delay * (2 ** attempt)) raise last_exc # 使用 async def fetch_with_retry(url: str): async with async_retry() as attempt: print(f"Attempt #{attempt}") async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.json()

十一、核心要点总结

十二、进一步思考

理解异步迭代器和异步上下文管理器的协议层次,是掌握 Python 异步编程的关键一跃。在此基础之上,可以继续探索以下方向:

扩展学习路径:

  • 异步可等待对象与协程: 深入理解 __await__ 方法和协程的底层实现
  • Trio / anyio 的替代模型: 结构化并发(Structured Concurrency)如何改变资源管理方式
  • 异步队列(asyncio.Queue): 结合异步迭代器实现生产者-消费者模式
  • 异步流式框架: 如 aiostream 库提供的组合式流操作(map、filter、merge 等)
  • 上下文变量(contextvars): 在异步上下文中安全传递请求级别的状态
  • 异步上下文管理器在测试中的应用: 如 pytest-asyncio 的夹具管理
  • AsyncExitStack: contextlib.AsyncExitStack 用于动态管理多个异步上下文管理器

设计原则:协议为王

Python 的异步协议设计遵循与同步协议完全一致的思路——协议(Protocol)是语言特性的核心抽象。只要对象实现了正确的协议方法,语言特性(async for、async with)就会自动工作。这种"鸭式 typing"的设计哲学在异步世界中同样成立。掌握协议,就等于掌握了语言特性的扩展能力。

"Python 的异步编程模型并不是凭空创造一套全新的语言,而是在现有语言基础上,通过两个协议(迭代器和上下文管理器)的异步化,让协程能够自然地融入 for 循环和 with 语句的语法框架中。这正是 Python 设计的优雅之处。"