专题:Python进阶编程系统学习
关键词:Python, 协程, 事件循环, async/await, asyncio, 调度, Event Loop, Task
一、概述:从同步到异步的演进
Python 的异步编程模型经历了从回调、生成器协程到原生 async/await 的三代演进。理解协程与事件循环是掌握 Python 异步编程的基石。传统的同步 I/O 模型中,当一个线程执行阻塞操作(如网络请求、文件读写)时,整个线程被挂起等待,CPU 资源被白白浪费。异步模型通过在等待 I/O 时切换到其他任务执行,大幅提升了并发能力。
asyncio 在 Python 3.4 中作为临时模块引入,Python 3.5 正式确立 async/await 语法,Python 3.7 开始 asyncio.run() 成为推荐入口,Python 3.10 移除了 loop.get_event_loop() 等旧 API。这一演进过程反映了 Python 语言对异步编程支持从实验性到生产级的成熟过程。
核心概念速览:协程(Coroutine)是暂停自身执行、让出控制权的可恢复计算单元;事件循环(Event Loop)是协调协程调度、监听 I/O 事件、管理定时器和回调的无限循环。两者共同构成了 Python 异步编程的运行时基础。
二、async/await 语法本质与 PEP 380
Python 的 async/await 语法并非凭空创造,其底层根基是 PEP 380 定义的 yield from 机制。在 Python 3.3 之前,生成器只能通过 yield 向外产出值,调用方通过 .send() 向内传入值。这种双向通信为协程提供了基础,但手动处理子生成器的委派非常繁琐。
PEP 380 引入的 yield from 本质上是一个语法糖,它将生成器委托给子生成器,自动在调用方和子生成器之间建立全双工通道。这个机制恰恰就是 async/await 在底层的工作方式——await 关键字会被编译器编译为 yield from 语义。
2.1 yield from 与 await 的等价关系
以下代码展示了两者在语义上的等价性。理解这个等价关系是深入理解 async/await 的关键。
# 基于 @asyncio.coroutine 和 yield from 的旧式协程(Python 3.4)
@asyncio.coroutine
def old_style_fetch(url):
response = yield from aiohttp.request('GET', url)
data = yield from response.read()
return data
# 基于 async/await 的原生协程(Python 3.5+)
async def native_fetch(url):
response = await aiohttp.request('GET', url)
data = await response.read()
return data
# 两者在字节码层面是等价的——await 被编译为 yield from
2.2 awaitable 对象的本质
await 关键字要求其后面的对象必须是 awaitable(可等待对象)。Python 中 awaitable 对象包含三类:原生协程对象(由 async def 返回)、实现了 __await__ 方法的对象、以及基于生成器的协程对象(带有 @asyncio.coroutine 装饰器)。
from collections.abc import Coroutine
import inspect
async def my_coro():
return 42
# 验证协程的类型体系
c = my_coro()
print(isinstance(c, Coroutine)) # True
print(inspect.iscoroutine(c)) # True
print(inspect.iscoroutinefunction(my_coro)) # True
# 协程对象的状态机
from inspect import CORO_CREATED, CORO_SUSPENDED, CORO_RUNNING, CORO_CLOSED
print(c.cr_code) # 协程的代码对象
print(c.cr_frame) # 协程的帧对象
print(c.cr_running) # 协程是否正在运行
print(c.cr_await) # 当前正在 await 的对象,None 表示未暂停
c.close()
print(c.cr_frame) # None,协程已关闭
关键理解:async def 定义的函数是"协程函数"(coroutine function),调用它返回的是"协程对象"(coroutine object)。协程对象是一个惰性的可暂停计算单元,它不会自动执行——必须由事件循环驱动,或者通过 await 链式调用。深入理解这个区别,有助于分析异步代码的执行流程。
三、协程对象与协程函数的深层区别
这是初学者最容易混淆的概念。让我们从 Python 运行时层面彻底厘清两者的区别。
3.1 CPython 内部表示
在 CPython 层面,协程函数是一个类型为 function 的普通函数对象,但其 CO_COROUTINE 标志位被设置。当解释器看到 async def 时,它生成的字节码在函数定义阶段携带了这个标志。协程对象则是运行时通过调用协程函数创建的一个独立栈帧对象,内部持有 cr_frame(运行帧)和 cr_code(字节码)。
import asyncio
# 1. 协程函数 —— 不包含任何运行状态
async def demo():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
print(type(demo)) # <class 'function'>
print(demo.__code__.co_flags & 0x0080) # 128, CO_COROUTINE 标志位
# 2. 协程对象 —— 包含运行状态
coro = demo()
print(type(coro)) # <class 'coroutine'>
print(coro.cr_running) # False,尚未运行
print(coro.cr_frame) # 非 None,已分配栈帧但未执行
print(coro.cr_code is demo.__code__) # True,共享同一代码对象
print(coro.cr_await) # None,未暂停在任何 await 上
# 3. 协程对象的生命周期
coro2 = demo()
# coro2.send(None) # 启动协程,会执行到第一个 await 然后暂停
# coro2.send(None) # 恢复协程,执行 await 后续代码
coro2.close() # 显式关闭协程,触发 GeneratorExit
print(coro2.cr_frame) # None,协程已终结
3.2 关键差异总结
| 维度 | 协程函数(Coroutine Function) | 协程对象(Coroutine Object) |
| Python 类型 | function | coroutine |
| 创建时机 | 定义时(def 阶段) | 调用时(函数调用阶段) |
| 是否可执行 | 否,仅定义行为 | 是,包含运行状态 |
| 内部状态 | 无状态 | 有栈帧(cr_frame)、状态机 |
| 生命周期 | 与模块共存 | 创建→启动→暂停→恢复→结束 |
| 判断方法 | inspect.iscoroutinefunction() | inspect.iscoroutine() |
常见陷阱:调用协程函数却不 await 它——这时协程对象被创建但从未执行,创建时可能分配资源且不会被回收,导致资源泄漏。Python 3.8+ 会发出 RuntimeWarning: "coroutine 'xxx' was never awaited"。
四、事件循环核心实现原理
事件循环是 asyncio 的心脏。它在底层使用操作系统的 I/O 多路复用机制来同时监视多个文件描述符,当某个描述符就绪(可读/可写)时,唤醒对应的回调或恢复挂起的协程。
4.1 SelectorEventLoop 与 ProactorEventLoop
asyncio 在 Unix 平台默认使用 SelectorEventLoop,基于 selectors 模块,底层使用 epoll(Linux)或 kqueue(macOS/FreeBSD)。Windows 平台默认使用 ProactorEventLoop,底层基于 IOCP(I/O Completion Ports),更适合 Windows 的异步 I/O 模型。
import asyncio
import platform
import selectors
# 查看默认事件循环类型
loop = asyncio.new_event_loop()
print(type(loop))
# Linux 输出: <class 'uvloop.Loop'> 或 <class 'selector_events.BaseSelectorEventLoop'>
# Windows 输出: <class 'ProactorEventLoop'>
print(type(loop._selector) if hasattr(loop, '_selector') else 'N/A')
# 在 Windows 上显式使用 SelectorEventLoop(某些场景需要)
if platform.system() == 'Windows':
from asyncio import SelectorEventLoop, ProactorEventLoop
# SelectorEventLoop 在 Windows 上使用 select(),性能较差
# ProactorEventLoop 使用 IOCP,效率更高
loop = ProactorEventLoop()
asyncio.set_event_loop(loop)
loop.close()
4.2 事件循环的内部循环结构
事件循环本质上是一个 while True 循环,每次迭代执行以下操作:计算下一次超时时间、调用 select() 等待 I/O 事件、处理就绪的回调、执行到期的定时器回调。我们通过简化实现来理解其核心逻辑。
import selectors
import time
from queue import Queue
from collections import deque
class SimpleEventLoop:
"""简化版事件循环——理解核心机制"""
def __init__(self):
self._selector = selectors.DefaultSelector()
self._ready = deque() # 就绪回调队列
self._timers = [] # 定时器列表
self._stopping = False
def call_soon(self, callback, *args):
"""将回调加入就绪队列,下次迭代执行"""
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
"""延迟 delay 秒后执行回调"""
deadline = time.monotonic() + delay
import heapq
heapq.heappush(self._timers, (deadline, callback, args))
def _run_once(self):
"""事件循环单次迭代"""
# 1. 计算 select 超时时间
timeout = None
if self._timers:
deadline = self._timers[0][0]
timeout = max(0, deadline - time.monotonic())
# 2. 等待 I/O 事件
events = self._selector.select(timeout)
for key, mask in events:
callback = key.data
self._ready.append((callback, (key.fileobj, mask)))
# 3. 处理到期的定时器
now = time.monotonic()
while self._timers and self._timers[0][0] <= now:
_, callback, args = heapq.heappop(self._timers)
self._ready.append((callback, args))
# 4. 执行所有就绪回调
while self._ready:
callback, args = self._ready.popleft()
callback(*args)
def run_forever(self):
"""启动事件循环"""
self._stopping = False
while not self._stopping:
self._run_once()
def stop(self):
"""停止事件循环"""
self._stopping = True
核心原理总结:事件循环本质是一个"事件驱动"的调度器。它的工作模式是:等待事件 → 处理回调 → 等待下一个事件。与传统多线程模型不同,所有任务在同一个线程中协作执行,通过主动暂停(await)让出控制权,而非被操作系统抢占。
五、事件循环的重要调度方法
事件循环提供了多种调度协程和回调的方法,理解它们的区别是编写高效异步代码的基础。
5.1 create_task —— 创建并发任务
loop.create_task() 将一个协程包装为 Task 对象并安排其执行。Task 会立即被调度到事件循环中,但不会立即执行全部代码——它会在下一次事件循环迭代中被执行到第一个 yield 点。
async def worker(name, n):
for i in range(n):
print(f"[{name}] step {i}")
await asyncio.sleep(0.1) # 让出控制权
return f"{name} done"
async def main():
loop = asyncio.get_running_loop()
# create_task:创建任务,立即调度,不阻塞当前协程
task1 = loop.create_task(worker("A", 3))
task2 = loop.create_task(worker("B", 3))
# 此时两个 task 已被调度但尚未执行
print("任务已创建,状态:", task1._state) # PENDING
# 主动让出控制权,让事件循环调度 task1 和 task2
await asyncio.sleep(0)
print("任务已启动,状态:", task1._state) # 视时间而定
# 等待任务完成
result1 = await task1
result2 = await task2
return [result1, result2]
asyncio.run(main())
5.2 call_soon / call_later / call_at —— 调度回调
这三个方法用于调度普通的同步回调函数(非协程)。它们绕过 Task 封装,直接将回调插入事件循环的内部队列。理解它们的区别对于调试性能问题和分析执行顺序至关重要。
import asyncio
import time
def callback(msg):
print(f"[{time.strftime('%H:%M:%S.%f')}] 回调: {msg}")
async def demo_scheduling():
loop = asyncio.get_running_loop()
# call_soon: 下一次迭代立即执行(FIFO 顺序)
loop.call_soon(callback, "call_soon #1")
loop.call_soon(callback, "call_soon #2")
# call_later: 延迟指定秒数后执行
loop.call_later(0.5, callback, "call_later 0.5s")
loop.call_later(1.0, callback, "call_later 1.0s")
# call_at: 在指定的绝对时间(loop.time())执行
now = loop.time()
loop.call_at(now + 1.5, callback, "call_at +1.5s")
# 所有回调在同一线程中执行,不会互相抢占
# 执行顺序: call_soon #1 → call_soon #2 → (0.5s后) → call_later 0.5s → ...
await asyncio.sleep(2)
asyncio.run(demo_scheduling())
5.3 三种调度方式的对比
| 方法 | 参数类型 | 执行时机 | 是否可取消 | 常用场景 |
| create_task | 协程 | 尽快(下次迭代) | 是(task.cancel()) | 并发执行协程 |
| call_soon | 回调函数 | 当前迭代结束前 | 否 | 信号处理、回调链 |
| call_later | 回调函数 | 延迟后 | 是(通过返回的 Handle) | 超时、心跳、重试 |
| call_at | 回调函数 | 指定绝对时间 | 是(通过返回的 Handle) | 定时任务、周期性调度 |
性能提示:call_soon 的时间复杂度是 O(1)——它只是向双端队列 append。而 call_later/call_at 内部使用堆数据结构,入队复杂度为 O(log n)。当你有大量定时器时,要注意这个差异。
六、asyncio.Future 底层实现
asyncio.Future 是异步编程中最核心的底层组件之一。它代表一个"未来"的结果——一个尚未完成但最终会完成的操作。Task 继承自 Future,理解 Future 的底层实现是理解 Task 调度和 await 机制的前提。
6.1 Future 的内部状态机
Future 内部维护一个三态状态机:PENDING(挂起,初始状态) → FINISHED(已完成,结果已设置) 或 CANCELLED(已取消)。每次状态转换时,Future 会触发注册的所有回调。
import asyncio
from asyncio import Future
# 手动创建一个 Future 并观察其状态变化
f: Future = Future()
print(f._state) # PENDING
# 添加回调——当 Future 完成时触发
def on_done(fut):
print(f"Future 完成! 结果: {fut.result()}")
f.add_done_callback(on_done)
# 设置结果——这会触发回调并将状态变为 FINISHED
f.set_result(42)
print(f._state) # FINISHED
print(f.result()) # 42(立即返回,不会阻塞)
# 查看底层实现
print(f._callbacks) # 已执行完毕的回调列表
print(f._asyncio_future_blocking) # 内部标志,用于检测非法 await
# await 一个 Future 的本质
# 当我们在协程中 await future_obj 时,
# 实际上调用的是 future.__iter__() 方法(兼容旧版生成器协程)
# 它通过 yield self 将 Future 自身"抛出"到事件循环中
# 事件循环注册回调后,在 Future 完成时恢复协程
6.2 Future 的 await 机制源码分析
当一个协程 await 一个 Future 时,事件循环会在 Future 上注册一个回调,当 Future 完成时,这个回调会恢复挂起的协程。这个机制通过 Future.__iter__ 实现——它 yield 自身,让事件循环获取控制权。
# 模拟 Future 的可等待本质
import asyncio
from asyncio import Future
async def demo_future_await():
loop = asyncio.get_running_loop()
# 创建一个 Future
fut = loop.create_future()
# 模拟另一个协程在 1 秒后设置 Future 的结果
loop.call_later(1.0, fut.set_result, "来自未来的消息")
# await Future —— 协程在此暂停,直到 Future 完成
# 底层逻辑等价于:
# fut._asyncio_future_blocking = True
# yield fut (将控制权交给事件循环)
# # 事件循环注册回调: when future done, resume this coroutine
# # 1 秒后 fut.set_result("来自未来的消息") 被调用
# # 事件循环恢复协程执行
result = await fut
print(f"收到结果: {result}")
return result
# 等效的旧式写法(Python 3.4)
@asyncio.coroutine
def old_style_future():
loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_later(1.0, fut.set_result, "来自未来的消息")
result = yield from fut # yield from 和 await 在此等价
return result
asyncio.run(demo_future_await())
Future vs Task 的核心区别:Future 是一个"容器"——它存放一个未来才会就绪的结果,不包含任何执行逻辑。Task 是 Future 的子类——它不仅存放结果,还包装了一个协程并驱动它执行。可以这样理解:Task = Future + 协程执行器。
七、Task 调度与 yield 点
Task 是异步编程中最常用的可等待对象。它包装一个协程,将其注册到事件循环中,并负责驱动协程的执行。理解 Task 的调度机制和 yield 点的概念,对于编写正确且高效的异步代码至关重要。
7.1 Task 的底层工作流程
当一个 Task 被创建时,它会在事件循环中注册一个"启动回调"。在下一个事件循环迭代中,这个回调被触发,调用 coro.send(None) 启动协程。协程执行到第一个 await 时暂停,将控制权还给事件循环。事件循环继续处理其他任务。当被 await 的对象(可能是另一个 Future、Task 或协程)完成后,事件循环再次调用 coro.send(result) 恢复协程执行。
import asyncio
async def phase1():
print(" Phase 1: 开始")
await asyncio.sleep(0.2) # yield 点 #1
print(" Phase 1: 完成")
return "P1结果"
async def phase2():
print(" Phase 2: 开始")
await asyncio.sleep(0.1) # yield 点 #2
print(" Phase 2: 完成")
return "P2结果"
async def coordinator():
print("协调器: 创建任务")
t1 = asyncio.create_task(phase1())
t2 = asyncio.create_task(phase2())
print("协调器: 任务已创建,让出控制权")
await asyncio.sleep(0) # yield 点 #3 —— 让事件循环有机会调度 t1, t2
print("协调器: 等待任务完成")
r1 = await t1 # yield 点 #4 —— 如果 t1 未完成则暂停等待
r2 = await t2 # yield 点 #5
print(f"协调器: 结果 {r1}, {r2}")
asyncio.run(coordinator())
# 可能的执行顺序:
# 1. 协调器启动,创建 t1 和 t2
# 2. 协调器 await asyncio.sleep(0),暂停
# 3. 事件循环调度 t1: phase1() 开始 → await sleep(0.2) → t1 暂停
# 4. 事件循环调度 t2: phase2() 开始 → await sleep(0.1) → t2 暂停
# 5. 协调器的 sleep(0) 完成 → 恢复协调器
# 6. 协调器 await t1 → t1 尚未完成 → 协调器暂停
# 7. 0.1 秒后 phase2 的 sleep 完成 → t2 恢复并完成 → t2 结果存入 Task
# 8. 0.2 秒后 phase1 的 sleep 完成 → t1 恢复并完成 → t1 结果存入 Task
# 9. 协调器被恢复,拿到 t1 结果,继续 await t2(立即完成)
# 10. 协调器打印结果
7.2 yield 点的分类与影响
yield 点是协程中所有可能导致暂停的 await 表达式。不同的 yield 点对调度行为有不同的影响。
| yield 点类型 | 示例 | 暂停语义 | 调度影响 |
| I/O 等待 | await asyncio.sleep(n) | 特定时长 | 允许其他任务运行 |
| I/O 就绪 | await sock.read() | 等待数据到达 | epoll 监听 fd,就绪时恢复 |
| 任务等待 | await other_task | 等待另一个任务完成 | 当前协程挂起,事件循环调度其他任务 |
| Future 等待 | await loop.create_future() | 等待 Future 完成 | 回调驱动恢复 |
| 零延迟 | await asyncio.sleep(0) | 立即让出控制权 | 用于显式触发任务切换 |
实践建议:await asyncio.sleep(0) 是一个非常有用的工具。当你在协程中执行长时间的计算任务时(CPU-bound),定期插入 await asyncio.sleep(0) 可以让事件循环有机会调度其他任务,避免"饿死"其他协程。
八、asyncio.run() 内部流程揭秘
asyncio.run() 是 Python 3.7+ 推荐的异步程序入口。在看似简单的一行调用背后,执行了一系列重要的初始化、运行和清理工作。理解其内部流程有助于诊断异步程序中的各种问题。
8.1 asyncio.run() 的完整内部流程
import asyncio
# 展示 asyncio.run() 的内部流程
# 以下代码等价于 asyncio.run(main()) 但暴露了更多细节
async def main():
print("主协程开始")
await asyncio.sleep(0.5)
print("主协程结束")
return 42
# asyncio.run() 内部大致流程:
def my_asyncio_run(coro):
# Step 1: 检查当前是否有正在运行的事件循环
try:
loop = asyncio.get_running_loop()
raise RuntimeError("asyncio.run() 不能从正在运行的事件循环中调用")
except RuntimeError:
pass # 正常——没有正在运行的事件循环
# Step 2: 创建新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Step 3: 将协程包装为 Task 并运行直到完成
# 内部调用 loop.run_until_complete()
result = loop.run_until_complete(coro)
return result
finally:
# Step 4: 清理——取消所有未完成的任务
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
# Step 5: 关闭事件循环
asyncio.set_event_loop(None)
loop.close()
def _cancel_all_tasks(loop):
"""取消事件循环中所有未完成的任务"""
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
asyncio.gather(*to_cancel, return_exceptions=True)
)
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': '未处理的任务异常',
'exception': task.exception(),
'task': task,
})
# 实际使用
result = asyncio.run(main())
print(f"结果: {result}")
8.2 run_until_complete 的底层逻辑
run_until_complete 是事件循环的重要方法之一。它将协程包装为 Task,然后反复调用 _run_once 直到 Task 完成。这与 run_forever 的区别在于它有一个明确的终止条件。
# run_until_complete 的核心逻辑:
# 1. 将协程包装为 Task: task = loop.create_task(coro)
# 2. 在 task 上注册回调: task.add_done_callback(loop.stop)
# 3. 运行事件循环: loop.run_forever()
# 4. task 完成时触发 loop.stop(),事件循环退出
# 5. 返回 task.result()
# 这意味着 run_until_complete 会在以下情况返回:
# - task 正常完成 → 返回结果
# - task 抛出异常 → 重新抛出异常
# - task 被取消 → 抛出 CancelledError
# 手动模拟 run_until_complete:
async def demo():
await asyncio.sleep(0.1)
return "完成"
loop = asyncio.new_event_loop()
try:
task = loop.create_task(demo())
# 注册停止回调
task.add_done_callback(lambda t: loop.stop())
loop.run_forever() # 当 task 完成时停止
print(task.result()) # "完成"
finally:
loop.close()
重要提醒:asyncio.run() 每次调用都会创建一个新的事件循环,并在程序结束时关闭它。这意味着你无法在外部获取到这个事件循环的引用。如果你需要在主协程中操作事件循环,应使用 asyncio.get_running_loop() 而不是 asyncio.get_event_loop()(后者在线程不安全场景下可能返回错误的事件循环)。
九、事件循环的启动、停止与关闭
正确地管理事件循环的生命周期是编写健壮异步应用的关键。事件循环经历三个状态:运行中、停止中、已关闭。错误的状态转换会导致难以调试的问题。
9.1 事件循环生命周期管理
import asyncio
import signal
async def cleanup():
"""清理资源的协程"""
print("清理资源...")
await asyncio.sleep(0.1)
print("清理完成")
async def main_work():
"""主工作协程"""
try:
for i in range(10):
print(f"工作中... {i}")
await asyncio.sleep(0.5)
if i == 3:
print("触发停止条件")
asyncio.get_running_loop().stop()
except asyncio.CancelledError:
print("任务被取消")
raise
finally:
print("任务 finally 块执行")
# 手动管理事件循环生命周期
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 阶段 1: 启动
print("=== 事件循环状态: 创建 ===")
print(f"is_running: {loop.is_running()}") # False
print(f"is_closed: {loop.is_closed()}") # False
# 阶段 2: 运行
print("=== 事件循环状态: 运行中 ===")
loop.run_until_complete(main_work())
# 如果 loop.stop() 被调用, run_until_complete 会重新调度 run_forever
# 但实际上 main_work 调用了 loop.stop(), 所以这里只是示意
# 阶段 3: 关闭前的清理
print("=== 事件循环状态: 清理中 ===")
finally:
# 阶段 4: 关闭
print("=== 事件循环状态: 已关闭 ===")
loop.close()
print(f"is_closed: {loop.is_closed()}") # True
# 重要: 已关闭的事件循环不能再使用
# loop.run_until_complete(asyncio.sleep(1)) # RuntimeError! 事件循环已关闭
9.2 shutdown_asyncgens 与 shutdown_default_executor
Python 3.6+ 引入了 loop.shutdown_asyncgens(),用于清理所有未完成的异步生成器。Python 3.9+ 增加了 loop.shutdown_default_executor(),用于优雅关闭默认线程池执行器。asyncio.run() 在 finally 块中自动调用这两个方法。
import asyncio
async def async_gen():
"""异步生成器——如果不清理可能导致 ResourceWarning"""
try:
for i in range(5):
yield i
await asyncio.sleep(0.1)
finally:
print(f"异步生成器清理: aclose() 被调用")
async def main():
async for value in async_gen():
print(f"收到: {value}")
if value == 2:
break # 提前退出,生成器需要被清理
# shutdown_asyncgens 会触发未完成异步生成器的 aclose()
# shutdown_default_executor 会等待默认线程池中的任务完成
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, lambda: "在线程池中执行")
print(result)
asyncio.run(main())
# asyncio.run() 自动执行:
# 1. loop.shutdown_asyncgens() — 清理异步生成器
# 2. loop.shutdown_default_executor() — 清理线程池
最佳实践:始终使用 asyncio.run() 作为异步程序入口,而不是手动管理事件循环。它自动处理了创建、运行、取消任务、清理生成器和关闭事件循环等所有步骤,避免了常见的内存泄漏和资源泄漏问题。只有在特殊场景(如嵌入到已有事件循环、自定义事件循环策略等)才需要手动管理。
十、run_in_executor 与线程池
事件循环运行在单线程中,如果协程中执行了阻塞的同步代码(如文件读写、CPU 密集型计算、第三方阻塞库调用),会阻塞整个事件循环,导致所有协程无法执行。loop.run_in_executor() 是解决这个问题的关键工具——它将同步的阻塞操作委托给线程池或进程池执行,避免阻塞事件循环。
10.1 基本用法与执行原理
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def blocking_io():
"""模拟阻塞 I/O 操作,如文件读写、数据库查询"""
print(f" [线程 {time.strftime('%H:%M:%S')}] 阻塞 I/O 开始")
time.sleep(2) # 注意这里使用 time.sleep 而非 asyncio.sleep
print(f" [线程 {time.strftime('%H:%M:%S')}] 阻塞 I/O 结束")
return "I/O 结果"
def cpu_intensive(n):
"""模拟 CPU 密集型计算"""
total = 0
for i in range(n):
total += i ** 2
return total
async def main():
loop = asyncio.get_running_loop()
# 方法 1: 使用默认线程池(ThreadPoolExecutor)
print("在默认线程池中执行阻塞 I/O")
result = await loop.run_in_executor(None, blocking_io)
print(f"结果: {result}")
# 方法 2: 使用自定义线程池
with ThreadPoolExecutor(max_workers=4) as pool:
results = await asyncio.gather(
loop.run_in_executor(pool, cpu_intensive, 10_000_000),
loop.run_in_executor(pool, cpu_intensive, 20_000_000),
loop.run_in_executor(pool, cpu_intensive, 30_000_000),
)
print(f"CPU 密集型计算完成: {results}")
# 方法 3: 使用进程池(绕开 GIL)
with ProcessPoolExecutor(max_workers=2) as proc_pool:
proc_result = await loop.run_in_executor(
proc_pool, cpu_intensive, 50_000_000
)
print(f"进程池计算结果: {proc_result}")
asyncio.run(main())
10.2 run_in_executor 的内部机制
run_in_executor 的核心机制是:它向线程池提交一个同步函数,返回一个 concurrent.futures.Future,然后将这个 Future 包装为 asyncio.Future。asyncio 事件循环通过 add_done_callback 机制监视线程池 Future 的完成状态,当线程池任务完成时恢复等待的协程。
from asyncio import futures
import concurrent.futures
# run_in_executor 的底层逻辑(简化版)
async def my_run_in_executor(loop, executor, func, *args):
# Step 1: 确定执行器(None 表示使用默认线程池)
if executor is None:
executor = loop._default_executor
if executor is None:
executor = ThreadPoolExecutor(thread_name_prefix='asyncio')
loop._default_executor = executor
# Step 2: 向执行器提交任务,获得 concurrent.futures.Future
cf = executor.submit(func, *args)
# Step 3: 创建 asyncio.Future,注册回调桥接
af = loop.create_future()
def _callback(cf):
# concurrent.futures.Future 的回调中桥接到 asyncio
if af.cancelled():
return
exc = cf.exception()
if exc is not None:
loop.call_soon_threadsafe(af.set_exception, exc)
else:
loop.call_soon_threadsafe(af.set_result, cf.result())
cf.add_done_callback(_callback)
# Step 4: await asyncio.Future,等待线程池任务完成
return await af
# 演示: 混合异步和同步代码
async def mixed_workflow():
loop = asyncio.get_running_loop()
# 并发执行: 一个异步 I/O + 一个阻塞 I/O
async def async_task():
for i in range(3):
print(f" 异步任务: {i}")
await asyncio.sleep(0.3)
return "异步结果"
async def sync_wrapper():
result = await loop.run_in_executor(
None, lambda: (time.sleep(1), "同步结果")[1]
)
return result
results = await asyncio.gather(
async_task(),
sync_wrapper()
)
print(f"混合工作流结果: {results}")
asyncio.run(mixed_workflow())
警告:不要在线程池中执行会修改共享状态的操作而不加锁。事件循环和线程池运行在不同的线程中,这意味着存在数据竞争。使用 run_in_executor 时,请确保线程安全,必要时使用 threading.Lock 或避免共享可变状态。
十一、事件循环的调试模式
异步程序的调试比同步程序更困难——因为调用栈在 await 处断裂,异常传播路径不直观。asyncio 提供了调试模式来帮助开发者发现异步程序中的常见问题。
11.1 启用调试模式
import asyncio
import os
# 方法 1: 通过环境变量启用
# PYTHONASYNCIODEBUG=1 python script.py
# 方法 2: 在代码中启用
async def main():
loop = asyncio.get_running_loop()
# 启用调试模式
loop.set_debug(True)
# 设置调试参数
loop.slow_callback_duration = 0.1 # 超过 0.1 秒的回调被视为"慢"
# 调试模式下提供的信息:
# 1. 检测"从未 await"的协程并发出警告
# 2. 检测阻塞事件循环的同步调用
# 3. 记录回调执行时间(超过 slow_callback_duration 的发出警告)
# 4. ResourceWarning: 未关闭的传输、事件循环等
# 5. 更详细的异常回溯
await asyncio.sleep(1)
asyncio.run(main(), debug=True)
# 或者
loop = asyncio.new_event_loop()
loop.set_debug(True)
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()
11.2 调试模式的实际应用
import asyncio
import time
async def problematic_coroutine():
"""有问题的协程——在异步代码中执行了阻塞调用"""
print("开始执行阻塞操作...")
time.sleep(0.5) # 应该用 await asyncio.sleep()!
print("阻塞操作完成")
async def forgotten_coroutine():
"""被遗忘的协程——创建后从不 await"""
pass
async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05 # 50ms 以上的回调即告警
# 1. 阻塞调用检测
# 在调试模式下,asyncio 会检测到协程中执行了阻塞操作
# 并发出警告: "Executing <Task> took X seconds"
loop.create_task(problematic_coroutine())
# 2. 忘记 await 的协程
# 调试模式下会立即警告
forgotten_coroutine() # 没有 await!
# 3. 检查回调执行时间
def slow_callback():
time.sleep(0.1) # 超过 slow_callback_duration
loop.call_soon(slow_callback)
await asyncio.sleep(1)
# 运行时会看到多个警告
# asyncio.run(main(), debug=True)
# Python 3.10+ 的详细回溯:
# asyncio.run(main(), debug=True)
# 输出类似:
# Coroutine 'forgotten_coroutine' was never awaited
# Executing <Task finished ...> took 0.502 seconds
# Callback <function slow_callback ...> took 0.101 seconds
import warnings
warnings.filterwarnings('always', category=ResourceWarning)
asyncio.run(main(), debug=True)
11.3 调试工具与技术
| 工具/技术 | 用途 | 启用方式 |
| loop.set_debug(True) | 启用 asyncio 调试模式 | 代码设置或 PYTHONASYNCIODEBUG=1 |
| slow_callback_duration | 检测慢回调阈值 | loop.slow_callback_duration = 0.1 |
| ResourceWarning | 检测未关闭的资源 | warnings.simplefilter('always', ResourceWarning) |
| task.cr_frame / task.print_stack() | 检查协程当前执行位置 | task.print_stack() 或 task.get_stack() |
| aiomonitor 库 | REPL 式调试异步应用 | pip install aiomonitor |
| asyncio.all_tasks() | 列出所有未完成的任务 | asyncio.all_tasks(loop) |
import asyncio
import traceback
async def inspect_tasks():
loop = asyncio.get_running_loop()
async def stuck_coro():
"""永远不完成的协程"""
await asyncio.Event().wait() # 永远等待
async def worker():
await asyncio.sleep(1)
# 创建任务
t1 = asyncio.create_task(stuck_coro(), name="stuck_task")
t2 = asyncio.create_task(worker(), name="worker_task")
await asyncio.sleep(0.1)
# 检查所有运行中的任务
print("=== 当前所有任务 ===")
for task in asyncio.all_tasks():
print(f" 任务: {task.get_name()}")
print(f" 状态: {task._state}")
print(f" 是否完成: {task.done()}")
if not task.done():
print(" 栈帧:")
# 打印协程暂停位置的栈信息
for frame in task.get_stack():
traceback.print_stack(frame)
# 取消卡住的任务
t1.cancel()
try:
await t1
except asyncio.CancelledError:
print("卡住的任务已取消")
await t2
asyncio.run(inspect_tasks(), debug=True)
调试建议:在开发环境中始终开启 asyncio 调试模式。这能帮你尽早发现"忘记 await 的协程"、"事件循环中隐藏的阻塞调用"和"未关闭的资源"这三类最常见的异步编程错误。生产环境中应关闭调试模式以避免性能开销。
十二、总结与最佳实践
12.1 核心要点回顾
1. async/await 本质:await 编译为 yield from,协程对象 = 可暂停的计算单元。协程函数定义行为,协程对象包含执行状态。
2. 事件循环本质:while True 循环,使用 I/O 多路复用(epoll/kqueue/IOCP)监听事件。单线程协作式调度,非抢占式。
3. Future 与 Task:Future = 未来结果的容器,Task = Future + 协程执行器。所有 awaitable 对象最终都会连接到一个 Future。
4. 调度方式:create_task 调度协程,call_soon/call_later/call_at 调度回调。run_in_executor 桥接同步代码。
5. 生命周期:asyncio.run() = 创建事件循环 + 运行 main + 取消任务 + 清理生成器 + 关闭循环。
12.2 编写安全异步代码的黄金法则
应做 (Do)
- 始终使用 asyncio.run() 作为入口
- 阻塞操作使用 run_in_executor
- 使用 asyncio.gather() 并发执行任务
- 为 task 设置有意义的名称
- 开发环境启用 debug 模式
- 使用 try/finally 确保资源释放
- 使用 asyncio.timeout() 设置超时
不应做 (Don't)
- 在协程中使用 time.sleep()
- 在协程中执行 CPU 密集计算而不让出
- 忘记 await 协程对象
- 混用 asyncio.Future 和 concurrent.futures.Future
- 在回调中抛出未捕获的异常
- 在多线程中共享未加锁的可变状态
- 在 asyncio.run 外部操作事件循环
12.3 完整的生产级示例
import asyncio
import logging
import signal
from typing import Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GracefulShutdown:
"""优雅关闭管理器"""
def __init__(self):
self._shutdown_event = asyncio.Event()
async def wait_shutdown(self):
await self._shutdown_event.wait()
def trigger_shutdown(self):
self._shutdown_event.set()
async def monitor_service(name: str, shutdown: GracefulShutdown):
"""模拟一个需要优雅关闭的后台服务"""
try:
while not shutdown._shutdown_event.is_set():
logger.info(f"[{name}] 服务运行中...")
try:
await asyncio.wait_for(
shutdown._shutdown_event.wait(),
timeout=2.0
)
except asyncio.TimeoutError:
continue # 超时是正常的,继续检查关闭信号
except asyncio.CancelledError:
logger.info(f"[{name}] 服务被取消")
raise
finally:
logger.info(f"[{name}] 服务正在清理资源...")
await asyncio.sleep(0.1) # 模拟清理
logger.info(f"[{name}] 服务已关闭")
async def main():
shutdown = GracefulShutdown()
loop = asyncio.get_running_loop()
# 注册信号处理
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
shutdown.trigger_shutdown
)
# 启动多个服务
services = [
asyncio.create_task(
monitor_service(f"Service-{i}", shutdown)
)
for i in range(3)
]
logger.info("所有服务已启动,等待关闭信号...")
await shutdown.wait_shutdown()
logger.info("收到关闭信号,正在停止所有服务...")
# 优雅关闭:取消所有服务并等待完成
for task in services:
task.cancel()
results = await asyncio.gather(*services, return_exceptions=True)
# 处理结果
for i, r in enumerate(results):
if isinstance(r, asyncio.CancelledError):
logger.info(f"Service-{i}: 已取消")
elif isinstance(r, Exception):
logger.error(f"Service-{i}: 异常 {r}")
else:
logger.info(f"Service-{i}: 完成")
logger.info("程序退出")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("收到 KeyboardInterrupt,退出")
"异步编程的本质不是让程序运行得更快,而是让程序在等待时不做无用功。协程和事件循环共同提供了一种优雅的协作式多任务模型——没有线程切换的开销,没有锁的复杂性,只有在需要时主动让出的谦让。"