协程深入与事件循环

Python进阶编程专题 · 理解Python异步的核心机制

专题:Python进阶编程系统学习

关键词:Python, 协程, 事件循环, async/await, asyncio, 调度, Event Loop, Task

一、概述:从同步到异步的演进

Python 的异步编程模型经历了从回调、生成器协程到原生 async/await 的三代演进。理解协程与事件循环是掌握 Python 异步编程的基石。传统的同步 I/O 模型中,当一个线程执行阻塞操作(如网络请求、文件读写)时,整个线程被挂起等待,CPU 资源被白白浪费。异步模型通过在等待 I/O 时切换到其他任务执行,大幅提升了并发能力。

asyncio 在 Python 3.4 中作为临时模块引入,Python 3.5 正式确立 async/await 语法,Python 3.7 开始 asyncio.run() 成为推荐入口,Python 3.10 移除了 loop.get_event_loop() 等旧 API。这一演进过程反映了 Python 语言对异步编程支持从实验性到生产级的成熟过程。

核心概念速览:协程(Coroutine)是暂停自身执行、让出控制权的可恢复计算单元;事件循环(Event Loop)是协调协程调度、监听 I/O 事件、管理定时器和回调的无限循环。两者共同构成了 Python 异步编程的运行时基础。

二、async/await 语法本质与 PEP 380

Python 的 async/await 语法并非凭空创造,其底层根基是 PEP 380 定义的 yield from 机制。在 Python 3.3 之前,生成器只能通过 yield 向外产出值,调用方通过 .send() 向内传入值。这种双向通信为协程提供了基础,但手动处理子生成器的委派非常繁琐。

PEP 380 引入的 yield from 本质上是一个语法糖,它将生成器委托给子生成器,自动在调用方和子生成器之间建立全双工通道。这个机制恰恰就是 async/await 在底层的工作方式——await 关键字会被编译器编译为 yield from 语义。

2.1 yield from 与 await 的等价关系

以下代码展示了两者在语义上的等价性。理解这个等价关系是深入理解 async/await 的关键。

# 基于 @asyncio.coroutine 和 yield from 的旧式协程(Python 3.4) @asyncio.coroutine def old_style_fetch(url): response = yield from aiohttp.request('GET', url) data = yield from response.read() return data # 基于 async/await 的原生协程(Python 3.5+) async def native_fetch(url): response = await aiohttp.request('GET', url) data = await response.read() return data # 两者在字节码层面是等价的——await 被编译为 yield from

2.2 awaitable 对象的本质

await 关键字要求其后面的对象必须是 awaitable(可等待对象)。Python 中 awaitable 对象包含三类:原生协程对象(由 async def 返回)、实现了 __await__ 方法的对象、以及基于生成器的协程对象(带有 @asyncio.coroutine 装饰器)。

from collections.abc import Coroutine import inspect async def my_coro(): return 42 # 验证协程的类型体系 c = my_coro() print(isinstance(c, Coroutine)) # True print(inspect.iscoroutine(c)) # True print(inspect.iscoroutinefunction(my_coro)) # True # 协程对象的状态机 from inspect import CORO_CREATED, CORO_SUSPENDED, CORO_RUNNING, CORO_CLOSED print(c.cr_code) # 协程的代码对象 print(c.cr_frame) # 协程的帧对象 print(c.cr_running) # 协程是否正在运行 print(c.cr_await) # 当前正在 await 的对象,None 表示未暂停 c.close() print(c.cr_frame) # None,协程已关闭

关键理解:async def 定义的函数是"协程函数"(coroutine function),调用它返回的是"协程对象"(coroutine object)。协程对象是一个惰性的可暂停计算单元,它不会自动执行——必须由事件循环驱动,或者通过 await 链式调用。深入理解这个区别,有助于分析异步代码的执行流程。

三、协程对象与协程函数的深层区别

这是初学者最容易混淆的概念。让我们从 Python 运行时层面彻底厘清两者的区别。

3.1 CPython 内部表示

在 CPython 层面,协程函数是一个类型为 function 的普通函数对象,但其 CO_COROUTINE 标志位被设置。当解释器看到 async def 时,它生成的字节码在函数定义阶段携带了这个标志。协程对象则是运行时通过调用协程函数创建的一个独立栈帧对象,内部持有 cr_frame(运行帧)和 cr_code(字节码)。

import asyncio # 1. 协程函数 —— 不包含任何运行状态 async def demo(): print("协程开始") await asyncio.sleep(1) print("协程结束") print(type(demo)) # <class 'function'> print(demo.__code__.co_flags & 0x0080) # 128, CO_COROUTINE 标志位 # 2. 协程对象 —— 包含运行状态 coro = demo() print(type(coro)) # <class 'coroutine'> print(coro.cr_running) # False,尚未运行 print(coro.cr_frame) # 非 None,已分配栈帧但未执行 print(coro.cr_code is demo.__code__) # True,共享同一代码对象 print(coro.cr_await) # None,未暂停在任何 await 上 # 3. 协程对象的生命周期 coro2 = demo() # coro2.send(None) # 启动协程,会执行到第一个 await 然后暂停 # coro2.send(None) # 恢复协程,执行 await 后续代码 coro2.close() # 显式关闭协程,触发 GeneratorExit print(coro2.cr_frame) # None,协程已终结

3.2 关键差异总结

维度协程函数(Coroutine Function)协程对象(Coroutine Object)
Python 类型functioncoroutine
创建时机定义时(def 阶段)调用时(函数调用阶段)
是否可执行否,仅定义行为是,包含运行状态
内部状态无状态有栈帧(cr_frame)、状态机
生命周期与模块共存创建→启动→暂停→恢复→结束
判断方法inspect.iscoroutinefunction()inspect.iscoroutine()

常见陷阱:调用协程函数却不 await 它——这时协程对象被创建但从未执行,创建时可能分配资源且不会被回收,导致资源泄漏。Python 3.8+ 会发出 RuntimeWarning: "coroutine 'xxx' was never awaited"。

四、事件循环核心实现原理

事件循环是 asyncio 的心脏。它在底层使用操作系统的 I/O 多路复用机制来同时监视多个文件描述符,当某个描述符就绪(可读/可写)时,唤醒对应的回调或恢复挂起的协程。

4.1 SelectorEventLoop 与 ProactorEventLoop

asyncio 在 Unix 平台默认使用 SelectorEventLoop,基于 selectors 模块,底层使用 epoll(Linux)或 kqueue(macOS/FreeBSD)。Windows 平台默认使用 ProactorEventLoop,底层基于 IOCP(I/O Completion Ports),更适合 Windows 的异步 I/O 模型。

import asyncio import platform import selectors # 查看默认事件循环类型 loop = asyncio.new_event_loop() print(type(loop)) # Linux 输出: <class 'uvloop.Loop'> 或 <class 'selector_events.BaseSelectorEventLoop'> # Windows 输出: <class 'ProactorEventLoop'> print(type(loop._selector) if hasattr(loop, '_selector') else 'N/A') # 在 Windows 上显式使用 SelectorEventLoop(某些场景需要) if platform.system() == 'Windows': from asyncio import SelectorEventLoop, ProactorEventLoop # SelectorEventLoop 在 Windows 上使用 select(),性能较差 # ProactorEventLoop 使用 IOCP,效率更高 loop = ProactorEventLoop() asyncio.set_event_loop(loop) loop.close()

4.2 事件循环的内部循环结构

事件循环本质上是一个 while True 循环,每次迭代执行以下操作:计算下一次超时时间、调用 select() 等待 I/O 事件、处理就绪的回调、执行到期的定时器回调。我们通过简化实现来理解其核心逻辑。

import selectors import time from queue import Queue from collections import deque class SimpleEventLoop: """简化版事件循环——理解核心机制""" def __init__(self): self._selector = selectors.DefaultSelector() self._ready = deque() # 就绪回调队列 self._timers = [] # 定时器列表 self._stopping = False def call_soon(self, callback, *args): """将回调加入就绪队列,下次迭代执行""" self._ready.append((callback, args)) def call_later(self, delay, callback, *args): """延迟 delay 秒后执行回调""" deadline = time.monotonic() + delay import heapq heapq.heappush(self._timers, (deadline, callback, args)) def _run_once(self): """事件循环单次迭代""" # 1. 计算 select 超时时间 timeout = None if self._timers: deadline = self._timers[0][0] timeout = max(0, deadline - time.monotonic()) # 2. 等待 I/O 事件 events = self._selector.select(timeout) for key, mask in events: callback = key.data self._ready.append((callback, (key.fileobj, mask))) # 3. 处理到期的定时器 now = time.monotonic() while self._timers and self._timers[0][0] <= now: _, callback, args = heapq.heappop(self._timers) self._ready.append((callback, args)) # 4. 执行所有就绪回调 while self._ready: callback, args = self._ready.popleft() callback(*args) def run_forever(self): """启动事件循环""" self._stopping = False while not self._stopping: self._run_once() def stop(self): """停止事件循环""" self._stopping = True

核心原理总结:事件循环本质是一个"事件驱动"的调度器。它的工作模式是:等待事件 → 处理回调 → 等待下一个事件。与传统多线程模型不同,所有任务在同一个线程中协作执行,通过主动暂停(await)让出控制权,而非被操作系统抢占。

五、事件循环的重要调度方法

事件循环提供了多种调度协程和回调的方法,理解它们的区别是编写高效异步代码的基础。

5.1 create_task —— 创建并发任务

loop.create_task() 将一个协程包装为 Task 对象并安排其执行。Task 会立即被调度到事件循环中,但不会立即执行全部代码——它会在下一次事件循环迭代中被执行到第一个 yield 点。

async def worker(name, n): for i in range(n): print(f"[{name}] step {i}") await asyncio.sleep(0.1) # 让出控制权 return f"{name} done" async def main(): loop = asyncio.get_running_loop() # create_task:创建任务,立即调度,不阻塞当前协程 task1 = loop.create_task(worker("A", 3)) task2 = loop.create_task(worker("B", 3)) # 此时两个 task 已被调度但尚未执行 print("任务已创建,状态:", task1._state) # PENDING # 主动让出控制权,让事件循环调度 task1 和 task2 await asyncio.sleep(0) print("任务已启动,状态:", task1._state) # 视时间而定 # 等待任务完成 result1 = await task1 result2 = await task2 return [result1, result2] asyncio.run(main())

5.2 call_soon / call_later / call_at —— 调度回调

这三个方法用于调度普通的同步回调函数(非协程)。它们绕过 Task 封装,直接将回调插入事件循环的内部队列。理解它们的区别对于调试性能问题和分析执行顺序至关重要。

import asyncio import time def callback(msg): print(f"[{time.strftime('%H:%M:%S.%f')}] 回调: {msg}") async def demo_scheduling(): loop = asyncio.get_running_loop() # call_soon: 下一次迭代立即执行(FIFO 顺序) loop.call_soon(callback, "call_soon #1") loop.call_soon(callback, "call_soon #2") # call_later: 延迟指定秒数后执行 loop.call_later(0.5, callback, "call_later 0.5s") loop.call_later(1.0, callback, "call_later 1.0s") # call_at: 在指定的绝对时间(loop.time())执行 now = loop.time() loop.call_at(now + 1.5, callback, "call_at +1.5s") # 所有回调在同一线程中执行,不会互相抢占 # 执行顺序: call_soon #1 → call_soon #2 → (0.5s后) → call_later 0.5s → ... await asyncio.sleep(2) asyncio.run(demo_scheduling())

5.3 三种调度方式的对比

方法参数类型执行时机是否可取消常用场景
create_task协程尽快(下次迭代)是(task.cancel())并发执行协程
call_soon回调函数当前迭代结束前信号处理、回调链
call_later回调函数延迟后是(通过返回的 Handle)超时、心跳、重试
call_at回调函数指定绝对时间是(通过返回的 Handle)定时任务、周期性调度

性能提示:call_soon 的时间复杂度是 O(1)——它只是向双端队列 append。而 call_later/call_at 内部使用堆数据结构,入队复杂度为 O(log n)。当你有大量定时器时,要注意这个差异。

六、asyncio.Future 底层实现

asyncio.Future 是异步编程中最核心的底层组件之一。它代表一个"未来"的结果——一个尚未完成但最终会完成的操作。Task 继承自 Future,理解 Future 的底层实现是理解 Task 调度和 await 机制的前提。

6.1 Future 的内部状态机

Future 内部维护一个三态状态机:PENDING(挂起,初始状态) → FINISHED(已完成,结果已设置) 或 CANCELLED(已取消)。每次状态转换时,Future 会触发注册的所有回调。

import asyncio from asyncio import Future # 手动创建一个 Future 并观察其状态变化 f: Future = Future() print(f._state) # PENDING # 添加回调——当 Future 完成时触发 def on_done(fut): print(f"Future 完成! 结果: {fut.result()}") f.add_done_callback(on_done) # 设置结果——这会触发回调并将状态变为 FINISHED f.set_result(42) print(f._state) # FINISHED print(f.result()) # 42(立即返回,不会阻塞) # 查看底层实现 print(f._callbacks) # 已执行完毕的回调列表 print(f._asyncio_future_blocking) # 内部标志,用于检测非法 await # await 一个 Future 的本质 # 当我们在协程中 await future_obj 时, # 实际上调用的是 future.__iter__() 方法(兼容旧版生成器协程) # 它通过 yield self 将 Future 自身"抛出"到事件循环中 # 事件循环注册回调后,在 Future 完成时恢复协程

6.2 Future 的 await 机制源码分析

当一个协程 await 一个 Future 时,事件循环会在 Future 上注册一个回调,当 Future 完成时,这个回调会恢复挂起的协程。这个机制通过 Future.__iter__ 实现——它 yield 自身,让事件循环获取控制权。

# 模拟 Future 的可等待本质 import asyncio from asyncio import Future async def demo_future_await(): loop = asyncio.get_running_loop() # 创建一个 Future fut = loop.create_future() # 模拟另一个协程在 1 秒后设置 Future 的结果 loop.call_later(1.0, fut.set_result, "来自未来的消息") # await Future —— 协程在此暂停,直到 Future 完成 # 底层逻辑等价于: # fut._asyncio_future_blocking = True # yield fut (将控制权交给事件循环) # # 事件循环注册回调: when future done, resume this coroutine # # 1 秒后 fut.set_result("来自未来的消息") 被调用 # # 事件循环恢复协程执行 result = await fut print(f"收到结果: {result}") return result # 等效的旧式写法(Python 3.4) @asyncio.coroutine def old_style_future(): loop = asyncio.get_event_loop() fut = loop.create_future() loop.call_later(1.0, fut.set_result, "来自未来的消息") result = yield from fut # yield from 和 await 在此等价 return result asyncio.run(demo_future_await())

Future vs Task 的核心区别:Future 是一个"容器"——它存放一个未来才会就绪的结果,不包含任何执行逻辑。Task 是 Future 的子类——它不仅存放结果,还包装了一个协程并驱动它执行。可以这样理解:Task = Future + 协程执行器。

七、Task 调度与 yield 点

Task 是异步编程中最常用的可等待对象。它包装一个协程,将其注册到事件循环中,并负责驱动协程的执行。理解 Task 的调度机制和 yield 点的概念,对于编写正确且高效的异步代码至关重要。

7.1 Task 的底层工作流程

当一个 Task 被创建时,它会在事件循环中注册一个"启动回调"。在下一个事件循环迭代中,这个回调被触发,调用 coro.send(None) 启动协程。协程执行到第一个 await 时暂停,将控制权还给事件循环。事件循环继续处理其他任务。当被 await 的对象(可能是另一个 Future、Task 或协程)完成后,事件循环再次调用 coro.send(result) 恢复协程执行。

import asyncio async def phase1(): print(" Phase 1: 开始") await asyncio.sleep(0.2) # yield 点 #1 print(" Phase 1: 完成") return "P1结果" async def phase2(): print(" Phase 2: 开始") await asyncio.sleep(0.1) # yield 点 #2 print(" Phase 2: 完成") return "P2结果" async def coordinator(): print("协调器: 创建任务") t1 = asyncio.create_task(phase1()) t2 = asyncio.create_task(phase2()) print("协调器: 任务已创建,让出控制权") await asyncio.sleep(0) # yield 点 #3 —— 让事件循环有机会调度 t1, t2 print("协调器: 等待任务完成") r1 = await t1 # yield 点 #4 —— 如果 t1 未完成则暂停等待 r2 = await t2 # yield 点 #5 print(f"协调器: 结果 {r1}, {r2}") asyncio.run(coordinator()) # 可能的执行顺序: # 1. 协调器启动,创建 t1 和 t2 # 2. 协调器 await asyncio.sleep(0),暂停 # 3. 事件循环调度 t1: phase1() 开始 → await sleep(0.2) → t1 暂停 # 4. 事件循环调度 t2: phase2() 开始 → await sleep(0.1) → t2 暂停 # 5. 协调器的 sleep(0) 完成 → 恢复协调器 # 6. 协调器 await t1 → t1 尚未完成 → 协调器暂停 # 7. 0.1 秒后 phase2 的 sleep 完成 → t2 恢复并完成 → t2 结果存入 Task # 8. 0.2 秒后 phase1 的 sleep 完成 → t1 恢复并完成 → t1 结果存入 Task # 9. 协调器被恢复,拿到 t1 结果,继续 await t2(立即完成) # 10. 协调器打印结果

7.2 yield 点的分类与影响

yield 点是协程中所有可能导致暂停的 await 表达式。不同的 yield 点对调度行为有不同的影响。

yield 点类型示例暂停语义调度影响
I/O 等待await asyncio.sleep(n)特定时长允许其他任务运行
I/O 就绪await sock.read()等待数据到达epoll 监听 fd,就绪时恢复
任务等待await other_task等待另一个任务完成当前协程挂起,事件循环调度其他任务
Future 等待await loop.create_future()等待 Future 完成回调驱动恢复
零延迟await asyncio.sleep(0)立即让出控制权用于显式触发任务切换

实践建议:await asyncio.sleep(0) 是一个非常有用的工具。当你在协程中执行长时间的计算任务时(CPU-bound),定期插入 await asyncio.sleep(0) 可以让事件循环有机会调度其他任务,避免"饿死"其他协程。

八、asyncio.run() 内部流程揭秘

asyncio.run() 是 Python 3.7+ 推荐的异步程序入口。在看似简单的一行调用背后,执行了一系列重要的初始化、运行和清理工作。理解其内部流程有助于诊断异步程序中的各种问题。

8.1 asyncio.run() 的完整内部流程

import asyncio # 展示 asyncio.run() 的内部流程 # 以下代码等价于 asyncio.run(main()) 但暴露了更多细节 async def main(): print("主协程开始") await asyncio.sleep(0.5) print("主协程结束") return 42 # asyncio.run() 内部大致流程: def my_asyncio_run(coro): # Step 1: 检查当前是否有正在运行的事件循环 try: loop = asyncio.get_running_loop() raise RuntimeError("asyncio.run() 不能从正在运行的事件循环中调用") except RuntimeError: pass # 正常——没有正在运行的事件循环 # Step 2: 创建新的事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # Step 3: 将协程包装为 Task 并运行直到完成 # 内部调用 loop.run_until_complete() result = loop.run_until_complete(coro) return result finally: # Step 4: 清理——取消所有未完成的任务 try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) loop.run_until_complete(loop.shutdown_default_executor()) finally: # Step 5: 关闭事件循环 asyncio.set_event_loop(None) loop.close() def _cancel_all_tasks(loop): """取消事件循环中所有未完成的任务""" to_cancel = asyncio.all_tasks(loop) if not to_cancel: return for task in to_cancel: task.cancel() loop.run_until_complete( asyncio.gather(*to_cancel, return_exceptions=True) ) for task in to_cancel: if task.cancelled(): continue if task.exception() is not None: loop.call_exception_handler({ 'message': '未处理的任务异常', 'exception': task.exception(), 'task': task, }) # 实际使用 result = asyncio.run(main()) print(f"结果: {result}")

8.2 run_until_complete 的底层逻辑

run_until_complete 是事件循环的重要方法之一。它将协程包装为 Task,然后反复调用 _run_once 直到 Task 完成。这与 run_forever 的区别在于它有一个明确的终止条件。

# run_until_complete 的核心逻辑: # 1. 将协程包装为 Task: task = loop.create_task(coro) # 2. 在 task 上注册回调: task.add_done_callback(loop.stop) # 3. 运行事件循环: loop.run_forever() # 4. task 完成时触发 loop.stop(),事件循环退出 # 5. 返回 task.result() # 这意味着 run_until_complete 会在以下情况返回: # - task 正常完成 → 返回结果 # - task 抛出异常 → 重新抛出异常 # - task 被取消 → 抛出 CancelledError # 手动模拟 run_until_complete: async def demo(): await asyncio.sleep(0.1) return "完成" loop = asyncio.new_event_loop() try: task = loop.create_task(demo()) # 注册停止回调 task.add_done_callback(lambda t: loop.stop()) loop.run_forever() # 当 task 完成时停止 print(task.result()) # "完成" finally: loop.close()

重要提醒:asyncio.run() 每次调用都会创建一个新的事件循环,并在程序结束时关闭它。这意味着你无法在外部获取到这个事件循环的引用。如果你需要在主协程中操作事件循环,应使用 asyncio.get_running_loop() 而不是 asyncio.get_event_loop()(后者在线程不安全场景下可能返回错误的事件循环)。

九、事件循环的启动、停止与关闭

正确地管理事件循环的生命周期是编写健壮异步应用的关键。事件循环经历三个状态:运行中、停止中、已关闭。错误的状态转换会导致难以调试的问题。

9.1 事件循环生命周期管理

import asyncio import signal async def cleanup(): """清理资源的协程""" print("清理资源...") await asyncio.sleep(0.1) print("清理完成") async def main_work(): """主工作协程""" try: for i in range(10): print(f"工作中... {i}") await asyncio.sleep(0.5) if i == 3: print("触发停止条件") asyncio.get_running_loop().stop() except asyncio.CancelledError: print("任务被取消") raise finally: print("任务 finally 块执行") # 手动管理事件循环生命周期 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # 阶段 1: 启动 print("=== 事件循环状态: 创建 ===") print(f"is_running: {loop.is_running()}") # False print(f"is_closed: {loop.is_closed()}") # False # 阶段 2: 运行 print("=== 事件循环状态: 运行中 ===") loop.run_until_complete(main_work()) # 如果 loop.stop() 被调用, run_until_complete 会重新调度 run_forever # 但实际上 main_work 调用了 loop.stop(), 所以这里只是示意 # 阶段 3: 关闭前的清理 print("=== 事件循环状态: 清理中 ===") finally: # 阶段 4: 关闭 print("=== 事件循环状态: 已关闭 ===") loop.close() print(f"is_closed: {loop.is_closed()}") # True # 重要: 已关闭的事件循环不能再使用 # loop.run_until_complete(asyncio.sleep(1)) # RuntimeError! 事件循环已关闭

9.2 shutdown_asyncgens 与 shutdown_default_executor

Python 3.6+ 引入了 loop.shutdown_asyncgens(),用于清理所有未完成的异步生成器。Python 3.9+ 增加了 loop.shutdown_default_executor(),用于优雅关闭默认线程池执行器。asyncio.run() 在 finally 块中自动调用这两个方法。

import asyncio async def async_gen(): """异步生成器——如果不清理可能导致 ResourceWarning""" try: for i in range(5): yield i await asyncio.sleep(0.1) finally: print(f"异步生成器清理: aclose() 被调用") async def main(): async for value in async_gen(): print(f"收到: {value}") if value == 2: break # 提前退出,生成器需要被清理 # shutdown_asyncgens 会触发未完成异步生成器的 aclose() # shutdown_default_executor 会等待默认线程池中的任务完成 loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, lambda: "在线程池中执行") print(result) asyncio.run(main()) # asyncio.run() 自动执行: # 1. loop.shutdown_asyncgens() — 清理异步生成器 # 2. loop.shutdown_default_executor() — 清理线程池

最佳实践:始终使用 asyncio.run() 作为异步程序入口,而不是手动管理事件循环。它自动处理了创建、运行、取消任务、清理生成器和关闭事件循环等所有步骤,避免了常见的内存泄漏和资源泄漏问题。只有在特殊场景(如嵌入到已有事件循环、自定义事件循环策略等)才需要手动管理。

十、run_in_executor 与线程池

事件循环运行在单线程中,如果协程中执行了阻塞的同步代码(如文件读写、CPU 密集型计算、第三方阻塞库调用),会阻塞整个事件循环,导致所有协程无法执行。loop.run_in_executor() 是解决这个问题的关键工具——它将同步的阻塞操作委托给线程池或进程池执行,避免阻塞事件循环。

10.1 基本用法与执行原理

import asyncio import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def blocking_io(): """模拟阻塞 I/O 操作,如文件读写、数据库查询""" print(f" [线程 {time.strftime('%H:%M:%S')}] 阻塞 I/O 开始") time.sleep(2) # 注意这里使用 time.sleep 而非 asyncio.sleep print(f" [线程 {time.strftime('%H:%M:%S')}] 阻塞 I/O 结束") return "I/O 结果" def cpu_intensive(n): """模拟 CPU 密集型计算""" total = 0 for i in range(n): total += i ** 2 return total async def main(): loop = asyncio.get_running_loop() # 方法 1: 使用默认线程池(ThreadPoolExecutor) print("在默认线程池中执行阻塞 I/O") result = await loop.run_in_executor(None, blocking_io) print(f"结果: {result}") # 方法 2: 使用自定义线程池 with ThreadPoolExecutor(max_workers=4) as pool: results = await asyncio.gather( loop.run_in_executor(pool, cpu_intensive, 10_000_000), loop.run_in_executor(pool, cpu_intensive, 20_000_000), loop.run_in_executor(pool, cpu_intensive, 30_000_000), ) print(f"CPU 密集型计算完成: {results}") # 方法 3: 使用进程池(绕开 GIL) with ProcessPoolExecutor(max_workers=2) as proc_pool: proc_result = await loop.run_in_executor( proc_pool, cpu_intensive, 50_000_000 ) print(f"进程池计算结果: {proc_result}") asyncio.run(main())

10.2 run_in_executor 的内部机制

run_in_executor 的核心机制是:它向线程池提交一个同步函数,返回一个 concurrent.futures.Future,然后将这个 Future 包装为 asyncio.Future。asyncio 事件循环通过 add_done_callback 机制监视线程池 Future 的完成状态,当线程池任务完成时恢复等待的协程。

from asyncio import futures import concurrent.futures # run_in_executor 的底层逻辑(简化版) async def my_run_in_executor(loop, executor, func, *args): # Step 1: 确定执行器(None 表示使用默认线程池) if executor is None: executor = loop._default_executor if executor is None: executor = ThreadPoolExecutor(thread_name_prefix='asyncio') loop._default_executor = executor # Step 2: 向执行器提交任务,获得 concurrent.futures.Future cf = executor.submit(func, *args) # Step 3: 创建 asyncio.Future,注册回调桥接 af = loop.create_future() def _callback(cf): # concurrent.futures.Future 的回调中桥接到 asyncio if af.cancelled(): return exc = cf.exception() if exc is not None: loop.call_soon_threadsafe(af.set_exception, exc) else: loop.call_soon_threadsafe(af.set_result, cf.result()) cf.add_done_callback(_callback) # Step 4: await asyncio.Future,等待线程池任务完成 return await af # 演示: 混合异步和同步代码 async def mixed_workflow(): loop = asyncio.get_running_loop() # 并发执行: 一个异步 I/O + 一个阻塞 I/O async def async_task(): for i in range(3): print(f" 异步任务: {i}") await asyncio.sleep(0.3) return "异步结果" async def sync_wrapper(): result = await loop.run_in_executor( None, lambda: (time.sleep(1), "同步结果")[1] ) return result results = await asyncio.gather( async_task(), sync_wrapper() ) print(f"混合工作流结果: {results}") asyncio.run(mixed_workflow())

警告:不要在线程池中执行会修改共享状态的操作而不加锁。事件循环和线程池运行在不同的线程中,这意味着存在数据竞争。使用 run_in_executor 时,请确保线程安全,必要时使用 threading.Lock 或避免共享可变状态。

十一、事件循环的调试模式

异步程序的调试比同步程序更困难——因为调用栈在 await 处断裂,异常传播路径不直观。asyncio 提供了调试模式来帮助开发者发现异步程序中的常见问题。

11.1 启用调试模式

import asyncio import os # 方法 1: 通过环境变量启用 # PYTHONASYNCIODEBUG=1 python script.py # 方法 2: 在代码中启用 async def main(): loop = asyncio.get_running_loop() # 启用调试模式 loop.set_debug(True) # 设置调试参数 loop.slow_callback_duration = 0.1 # 超过 0.1 秒的回调被视为"慢" # 调试模式下提供的信息: # 1. 检测"从未 await"的协程并发出警告 # 2. 检测阻塞事件循环的同步调用 # 3. 记录回调执行时间(超过 slow_callback_duration 的发出警告) # 4. ResourceWarning: 未关闭的传输、事件循环等 # 5. 更详细的异常回溯 await asyncio.sleep(1) asyncio.run(main(), debug=True) # 或者 loop = asyncio.new_event_loop() loop.set_debug(True) asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close()

11.2 调试模式的实际应用

import asyncio import time async def problematic_coroutine(): """有问题的协程——在异步代码中执行了阻塞调用""" print("开始执行阻塞操作...") time.sleep(0.5) # 应该用 await asyncio.sleep()! print("阻塞操作完成") async def forgotten_coroutine(): """被遗忘的协程——创建后从不 await""" pass async def main(): loop = asyncio.get_running_loop() loop.set_debug(True) loop.slow_callback_duration = 0.05 # 50ms 以上的回调即告警 # 1. 阻塞调用检测 # 在调试模式下,asyncio 会检测到协程中执行了阻塞操作 # 并发出警告: "Executing <Task> took X seconds" loop.create_task(problematic_coroutine()) # 2. 忘记 await 的协程 # 调试模式下会立即警告 forgotten_coroutine() # 没有 await! # 3. 检查回调执行时间 def slow_callback(): time.sleep(0.1) # 超过 slow_callback_duration loop.call_soon(slow_callback) await asyncio.sleep(1) # 运行时会看到多个警告 # asyncio.run(main(), debug=True) # Python 3.10+ 的详细回溯: # asyncio.run(main(), debug=True) # 输出类似: # Coroutine 'forgotten_coroutine' was never awaited # Executing <Task finished ...> took 0.502 seconds # Callback <function slow_callback ...> took 0.101 seconds import warnings warnings.filterwarnings('always', category=ResourceWarning) asyncio.run(main(), debug=True)

11.3 调试工具与技术

工具/技术用途启用方式
loop.set_debug(True)启用 asyncio 调试模式代码设置或 PYTHONASYNCIODEBUG=1
slow_callback_duration检测慢回调阈值loop.slow_callback_duration = 0.1
ResourceWarning检测未关闭的资源warnings.simplefilter('always', ResourceWarning)
task.cr_frame / task.print_stack()检查协程当前执行位置task.print_stack() 或 task.get_stack()
aiomonitor 库REPL 式调试异步应用pip install aiomonitor
asyncio.all_tasks()列出所有未完成的任务asyncio.all_tasks(loop)
import asyncio import traceback async def inspect_tasks(): loop = asyncio.get_running_loop() async def stuck_coro(): """永远不完成的协程""" await asyncio.Event().wait() # 永远等待 async def worker(): await asyncio.sleep(1) # 创建任务 t1 = asyncio.create_task(stuck_coro(), name="stuck_task") t2 = asyncio.create_task(worker(), name="worker_task") await asyncio.sleep(0.1) # 检查所有运行中的任务 print("=== 当前所有任务 ===") for task in asyncio.all_tasks(): print(f" 任务: {task.get_name()}") print(f" 状态: {task._state}") print(f" 是否完成: {task.done()}") if not task.done(): print(" 栈帧:") # 打印协程暂停位置的栈信息 for frame in task.get_stack(): traceback.print_stack(frame) # 取消卡住的任务 t1.cancel() try: await t1 except asyncio.CancelledError: print("卡住的任务已取消") await t2 asyncio.run(inspect_tasks(), debug=True)

调试建议:在开发环境中始终开启 asyncio 调试模式。这能帮你尽早发现"忘记 await 的协程"、"事件循环中隐藏的阻塞调用"和"未关闭的资源"这三类最常见的异步编程错误。生产环境中应关闭调试模式以避免性能开销。

十二、总结与最佳实践

12.1 核心要点回顾

1. async/await 本质:await 编译为 yield from,协程对象 = 可暂停的计算单元。协程函数定义行为,协程对象包含执行状态。

2. 事件循环本质:while True 循环,使用 I/O 多路复用(epoll/kqueue/IOCP)监听事件。单线程协作式调度,非抢占式。

3. Future 与 Task:Future = 未来结果的容器,Task = Future + 协程执行器。所有 awaitable 对象最终都会连接到一个 Future。

4. 调度方式:create_task 调度协程,call_soon/call_later/call_at 调度回调。run_in_executor 桥接同步代码。

5. 生命周期:asyncio.run() = 创建事件循环 + 运行 main + 取消任务 + 清理生成器 + 关闭循环。

12.2 编写安全异步代码的黄金法则

应做 (Do)

  • 始终使用 asyncio.run() 作为入口
  • 阻塞操作使用 run_in_executor
  • 使用 asyncio.gather() 并发执行任务
  • 为 task 设置有意义的名称
  • 开发环境启用 debug 模式
  • 使用 try/finally 确保资源释放
  • 使用 asyncio.timeout() 设置超时

不应做 (Don't)

  • 在协程中使用 time.sleep()
  • 在协程中执行 CPU 密集计算而不让出
  • 忘记 await 协程对象
  • 混用 asyncio.Future 和 concurrent.futures.Future
  • 在回调中抛出未捕获的异常
  • 在多线程中共享未加锁的可变状态
  • 在 asyncio.run 外部操作事件循环

12.3 完整的生产级示例

import asyncio import logging import signal from typing import Optional logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class GracefulShutdown: """优雅关闭管理器""" def __init__(self): self._shutdown_event = asyncio.Event() async def wait_shutdown(self): await self._shutdown_event.wait() def trigger_shutdown(self): self._shutdown_event.set() async def monitor_service(name: str, shutdown: GracefulShutdown): """模拟一个需要优雅关闭的后台服务""" try: while not shutdown._shutdown_event.is_set(): logger.info(f"[{name}] 服务运行中...") try: await asyncio.wait_for( shutdown._shutdown_event.wait(), timeout=2.0 ) except asyncio.TimeoutError: continue # 超时是正常的,继续检查关闭信号 except asyncio.CancelledError: logger.info(f"[{name}] 服务被取消") raise finally: logger.info(f"[{name}] 服务正在清理资源...") await asyncio.sleep(0.1) # 模拟清理 logger.info(f"[{name}] 服务已关闭") async def main(): shutdown = GracefulShutdown() loop = asyncio.get_running_loop() # 注册信号处理 for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler( sig, shutdown.trigger_shutdown ) # 启动多个服务 services = [ asyncio.create_task( monitor_service(f"Service-{i}", shutdown) ) for i in range(3) ] logger.info("所有服务已启动,等待关闭信号...") await shutdown.wait_shutdown() logger.info("收到关闭信号,正在停止所有服务...") # 优雅关闭:取消所有服务并等待完成 for task in services: task.cancel() results = await asyncio.gather(*services, return_exceptions=True) # 处理结果 for i, r in enumerate(results): if isinstance(r, asyncio.CancelledError): logger.info(f"Service-{i}: 已取消") elif isinstance(r, Exception): logger.error(f"Service-{i}: 异常 {r}") else: logger.info(f"Service-{i}: 完成") logger.info("程序退出") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("收到 KeyboardInterrupt,退出")

"异步编程的本质不是让程序运行得更快,而是让程序在等待时不做无用功。协程和事件循环共同提供了一种优雅的协作式多任务模型——没有线程切换的开销,没有锁的复杂性,只有在需要时主动让出的谦让。"