← 返回Python进阶编程目录
← 返回学习笔记首页
专题: Python进阶编程系统学习
关键词: Python, asyncio, 异步编程, async/await, 协程, Task, Future, 事件循环
一、概述:为什么需要asyncio
在Python开发中,我们经常面临I/O密集型任务——网络请求、文件读写、数据库查询等。传统同步模型下,程序在等待I/O完成时会阻塞CPU,造成资源浪费。多线程虽然可以缓解,但线程切换开销大,且存在GIL(全局解释器锁)的限制。asyncio是Python 3.4引入、3.5开始通过async/await语法完善的标准库异步I/O框架,它采用单线程协作式调度的方式,让程序在等待I/O时主动让出控制权,从而高效利用CPU。
核心思想: asyncio通过事件循环(Event Loop)调度协程(Coroutine),在单线程内实现并发。当一个协程遇到await等待I/O时,事件循环会切换到另一个准备好的协程执行,避免了线程切换的开销和GIL的限制。
asyncio适用场景非常广泛:高并发Web服务(aiohttp、FastAPI底层依赖)、爬虫、消息队列消费、实时数据处理等。但不适用于CPU密集型计算——这种场景应当使用多进程(multiprocessing)。
二、事件循环(Event Loop)
事件循环是asyncio的核心引擎,它负责接收、调度和执行协程任务。可以把它理解为一个永不停止的"调度器":当协程执行到await时,它会被挂起,事件循环从就绪队列中取出下一个协程继续执行。
2.1 获取与设置事件循环
Python 3.10之后,事件循环的管理方式发生了变化。推荐使用asyncio.run(),它会自动创建新的事件循环、运行协程并在完成后关闭循环。在更复杂的场景中,也可以手动管理。
import asyncio
# Python 3.7+ 推荐的顶层入口(自动管理事件循环)
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
# 低层级API(Python 3.10前常见,现在一般不需要手动写)
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(main())
# loop.close()
2.2 事件循环的底层工作流程
事件循环内部维护了三个关键队列:就绪队列 (Ready Queue)存放可以立即执行的协程;等待队列 (Waiting Queue)存放正在等待I/O或定时器的协程;回调队列 (Callback Queue)存放需要执行的回调函数。每次循环迭代时,事件循环会从就绪队列取任务执行,检查等待队列中的任务是否已完成,然后处理回调。
# 事件循环伪代码示意
class EventLoop:
def __init__(self):
self._ready = [] # 就绪队列
self._waiting = [] # 等待队列
self._stopping = False
def run_forever(self):
while not self._stopping:
# 1. 执行所有就绪任务
while self._ready:
task = self._ready.pop(0)
task.step()
# 2. 检查等待队列中的任务是否可恢复
for task in self._waiting[:]:
if task._fut.done():
self._waiting.remove(task)
self._ready.append(task)
# 3. 如果没有就绪任务,等待I/O事件
if not self._ready and not self._waiting:
break
要点: 事件循环不是无限的"忙等待",当没有就绪任务时,它会在操作系统的I/O多路复用接口(如epoll/select)上阻塞等待,直到有事件发生才被唤醒。这正是asyncio高效节能的关键。
三、async/await 协程定义
Python 3.5通过PEP 492引入了async/await语法,使协程的定义和使用变得直观。使用async def定义的函数就是一个协程函数,调用它不会立即执行函数体,而是返回一个协程对象。
import asyncio
# 定义一个协程函数
async def fetch_data(url):
print(f"开始请求: {url}")
await asyncio.sleep(1) # 模拟网络请求
print(f"完成请求: {url}")
return f"Data from {url}"
# 调用协程函数返回协程对象,不会执行
coro = fetch_data("https://example.com")
print(type(coro)) # <class 'coroutine'>
# 协程必须被调度才能执行
# 方式一:asyncio.run()
# asyncio.run(fetch_data("https://example.com"))
# 方式二:await 另一个协程中
async def main():
result = await fetch_data("https://example.com")
print(result)
asyncio.run(main())
3.1 await关键字的含义
await关键字有两个作用:一是挂起当前协程,将控制权交还给事件循环;二是等待被await的awaitable对象完成并获取其结果。awaitable对象包括:协程(coroutine)、Task、Future以及实现了__await__方法的对象。
常见的错误: 初学者容易忘记在协程前加await。如果直接调用async函数而不加await,返回的只是一个协程对象,协程体并不会执行。这通常会导致"RuntimeWarning: coroutine was never awaited"的警告。
import asyncio
async def say_after(delay, msg):
await asyncio.sleep(delay)
print(msg)
async def main():
# 错误:忘记await,协程不会执行
# say_after(1, "Hello") # 协程对象被创建但未调度
# 正确:使用await
await say_after(1, "Hello")
await say_after(2, "World")
asyncio.run(main())
四、asyncio.run() 启动协程
asyncio.run()是Python 3.7引入的顶层入口函数,它封装了事件循环的创建、运行和关闭。绝大多数asyncio程序都应当以asyncio.run(main())作为入口。
import asyncio
async def main():
print("程序开始")
await asyncio.sleep(1)
print("程序结束")
return 42
# run() 会执行以下操作:
# 1. 创建新的事件循环
# 2. 将main()作为入口协程调度执行
# 3. 等待协程和所有任务完成
# 4. 关闭事件循环
# 5. 返回协程的返回值
result = asyncio.run(main())
print(f"返回值: {result}") # 42
注意事项: asyncio.run()在同一个线程中只能调用一次,且不能在事件循环内部再次调用。在Jupyter Notebook或交互式环境中,推荐使用await直接在已有的事件循环中执行协程,或者使用asyncio.run()。
五、create_task 创建任务
await是按顺序执行的——必须等前一个协程完成后才会执行下一个。要实现真正的并发,需要将协程包装为Task(任务)。Task是Future的子类,它会在事件循环中独立调度,不会阻塞创建它的协程。
import asyncio
async def fetch_url(url, delay):
await asyncio.sleep(delay)
print(f"完成: {url}")
return url
async def main():
# 创建3个任务,它们会并发执行
task1 = asyncio.create_task(fetch_url("url-1", 3))
task2 = asyncio.create_task(fetch_url("url-2", 2))
task3 = asyncio.create_task(fetch_url("url-3", 1))
# 等待所有任务完成
# 注意:此时三个任务已经在后台并发执行了!
r1 = await task1
r2 = await task2
r3 = await task3
print(f"结果: {r1}, {r2}, {r3}")
# 输出顺序:url-3, url-2, url-1(按延迟从小到大)
# 但结果打印会在所有完成后
asyncio.run(main())
任务的执行顺序是不确定的,取决于它们的I/O等待时间和事件循环的调度策略。create_task会在当前事件循环中创建任务,并立刻开始调度——不需要显式await也能执行。但如果不await任务就退出主协程,任务可能被强制取消。
import asyncio
async def background_task():
await asyncio.sleep(10)
print("后台任务完成")
async def main():
task = asyncio.create_task(background_task())
await asyncio.sleep(1)
print("主协程结束")
# 主协程结束,程序退出,background_task被取消
asyncio.run(main())
# 输出: 主协程结束 (background_task不会完成)
关键点: create_task返回的Task对象必须被引用(赋值给变量),否则它会被垃圾回收。Python 3.11之前,没有被引用的Task会悄无声息地丢失;3.11之后会发出RuntimeWarning。建议对所有Task持有引用。
六、Task对象详解
Task对象提供了丰富的API来管理和监控协程的执行状态。理解这些API对编写健壮的异步程序至关重要。
6.1 Task核心方法
import asyncio
async def demo_task(name, delay):
try:
await asyncio.sleep(delay)
return f"{name}完成了"
except asyncio.CancelledError:
print(f"{name}被取消了")
raise
async def main():
task = asyncio.create_task(demo_task("demo", 5))
# 检查任务状态
print(f"是否完成: {task.done()}") # False
print(f"是否取消: {task.cancelled()}") # False
# 添加完成回调
def callback(t):
print(f"回调: 任务完成, 结果={t.result()}")
task.add_done_callback(callback)
# 3秒后取消任务
await asyncio.sleep(3)
cancelled = task.cancel() # 请求取消,返回True表示取消成功
print(f"取消请求: {cancelled}")
try:
result = await task
print(f"任务结果: {result}")
except asyncio.CancelledError:
print("主协程捕获到取消异常")
print(f"最终状态 - done: {task.done()}, cancelled: {task.cancelled()}")
asyncio.run(main())
方法/属性 说明
task.done() 返回任务是否已完成(正常完成/取消/异常)
task.cancelled() 返回任务是否被取消
task.result() 返回任务的结果;如果任务未完成则引发InvalidStateError,如果任务异常则引发对应异常
task.cancel(msg="") 请求取消任务,向协程内注入CancelledError。返回True表示取消请求已发送
task.add_done_callback(cb) 注册完成回调函数,任务完成时自动调用(不论成功、取消还是异常)
task.remove_done_callback(cb) 移除已注册的回调
task.exception() 返回任务中引发的异常,没有异常返回None
6.2 任务取消的注意事项
CancelledError是BaseException的子类(不是Exception的子类),因此普通的except Exception无法捕获它。在协程中正确清理资源的模式是使用try/finally或try/except CancelledError。
import asyncio
async def task_with_cleanup():
resource = acquire_resource()
try:
await do_work()
except asyncio.CancelledError:
print("任务被取消,正在清理...")
resource.cleanup()
raise # 必须重新抛出,否则任务被视为已完成而非取消
finally:
# finally中的代码在取消时也会执行
print("finally清理")
七、Future对象底层机制
Future是asyncio的底层组件,代表一个"未来"的结果。它类似于并发编程中的"占位符":一个Future对象最初是"未完成"(pending)状态,当关联的操作完成时,外部代码通过set_result()或set_exception()将其设置为"已完成"(done)状态,await该Future的协程会恢复执行。
Task继承了Future,但Future更底层。在日常开发中,我们很少直接使用Future——Task已经满足了绝大部分需求。但理解Future的机制对于深入理解asyncio的原理非常有帮助。
import asyncio
async def future_example():
# 创建一个Future对象,初始状态为pending
future = asyncio.get_running_loop().create_future()
print(f"初始状态: {future.done()}") # False
# 模拟在另一个地方设置Future的结果
async def set_result_later(fut, delay, value):
await asyncio.sleep(delay)
fut.set_result(value) # 设置结果,唤醒等待的协程
print(f"Future 结果已设置: {value}")
# 启动一个任务稍后设置结果
asyncio.create_task(set_result_later(future, 2, 42))
# 等待Future完成(会阻塞2秒)
result = await future
print(f"Future 结果: {result}")
asyncio.run(future_example())
Future vs Task: Task是Future的子类。Task包装了一个协程,会在事件循环中自动调度执行,并在协程完成时自动设置Future的结果。如果使用底层的Future,你需要手动调用set_result()来标记完成——这在封装回调式API为协程时很有用。
7.1 使用Future适配回调式API
Future的一个重要用途是将基于回调的旧API包装为async/await接口。
import asyncio
# 模拟一个基于回调的API
def legacy_api(callback):
import threading
def _run():
import time
time.sleep(1)
callback("legacy result")
threading.Thread(target=_run, daemon=True).start()
# 使用Future将其包装为协程
async def async_wrapper():
loop = asyncio.get_running_loop()
future = loop.create_future()
def callback(result):
# 在回调中设置Future的结果
loop.call_soon_threadsafe(future.set_result, result)
legacy_api(callback)
return await future
async def main():
result = await async_wrapper()
print(f"包装后的结果: {result}")
asyncio.run(main())
理解要点: Future的set_result/set_exception方法会唤醒所有await这个Future的协程。call_soon_threadsafe是线程安全的API,用于从其他线程向事件循环投递任务。这种模式在整合旧代码时非常实用。
八、高层API详解
asyncio提供了多个高层API来简化并发任务的编排。掌握这些工具能大幅提高编码效率。
8.1 asyncio.sleep()
最基础的等待API。与time.sleep()阻塞整个线程不同,asyncio.sleep()会挂起当前协程,让事件循环执行其他任务。
import asyncio
async def waiter(n):
print(f"等待 {n} 秒开始")
await asyncio.sleep(n)
print(f"等待 {n} 秒结束")
async def main():
# 3个等待并发执行,总耗时约3秒(而非1+2+3=6秒)
await asyncio.gather(
waiter(3),
waiter(2),
waiter(1),
)
asyncio.run(main())
8.2 asyncio.gather()
gather是最常用的并发API。它接收多个awaitable对象,并发执行它们,并返回所有结果的列表。默认情况下,如果其中任何一个任务抛出异常,gather会取消所有其他任务。
import asyncio
async def fetch(n):
await asyncio.sleep(n)
if n == 2:
raise ValueError(f"错误发生在n={n}")
return n * 10
async def main():
# 并发执行,返回结果列表
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3),
return_exceptions=True # 将异常作为结果返回,而非抛出
)
print(results) # [10, ValueError(...), 30]
# return_exceptions=False(默认)
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3),
return_exceptions=False # fetch(2)抛异常,gather立即抛出
)
# 注意:上面的调用会直接抛出异常,不会执行到下一行
asyncio.run(main())
8.3 asyncio.wait()
wait接收一个Task集合,提供了更细粒度的控制。通过FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED等常量控制返回时机。
import asyncio
async def task(n):
await asyncio.sleep(n)
return n
async def main():
tasks = [asyncio.create_task(task(i)) for i in [1, 3, 5]]
# 任一任务完成后立即返回(不会取消其他任务)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"已完成: {len(done)} 个")
for t in done:
print(f" 结果: {t.result()}")
print(f"等待中: {len(pending)} 个")
# 等待pending中的任务完成(如果不等待,程序退出时它们会被取消)
if pending:
await asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)
asyncio.run(main())
8.4 asyncio.shield()
shield用于保护协程不被取消。当外部调用task.cancel()时,shield内部的协程不会收到CancelledError,而是继续执行。这在需要确保关键操作(如写数据库)不被中断时非常有用。
import asyncio
async def critical_operation():
print("关键操作开始...")
await asyncio.sleep(3)
print("关键操作完成!")
return "important data"
async def main():
task = asyncio.create_task(critical_operation())
shielded = asyncio.shield(task)
await asyncio.sleep(0.5)
# 尝试取消屏蔽的任务
shielded.cancel() # 这会取消"屏蔽"本身,但不会影响内部task
try:
result = await shielded
print(f"结果: {result}")
except asyncio.CancelledError:
print("屏蔽被取消,但内部任务仍在运行")
# 内部任务还在执行,可以再次等待它
result = await task
print(f"内部任务最终结果: {result}")
asyncio.run(main())
8.5 asyncio.as_completed()
as_completed返回一个迭代器,按照任务完成的先后顺序产出结果(而不是按传入顺序)。它适用于需要"尽快处理已完成任务"的场景。
import asyncio
async def worker(n):
await asyncio.sleep(n)
return f"worker-{n}"
async def main():
tasks = [
asyncio.create_task(worker(3)),
asyncio.create_task(worker(1)),
asyncio.create_task(worker(2)),
]
# 按照完成顺序处理结果
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"处理完成: {result}")
# 输出顺序: worker-1, worker-2, worker-3
asyncio.run(main())
九、超时控制
在实际开发中,网络请求、数据库查询等操作可能因各种原因挂起,必须设置超时保护。
9.1 asyncio.timeout()(Python 3.11+)
Python 3.11新增的timeout上下文管理器,提供了简洁的超时控制语法。
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
async with asyncio.timeout(3): # 3秒超时
result = await slow_operation()
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())
9.2 asyncio.wait_for()(Python 3.7+)
wait_for是Python 3.7到3.10间的主要超时方案,在3.11及之后依然可用。
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
# 等待协程完成,但最多等待3秒
result = await asyncio.wait_for(slow_operation(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())
重要区别: asyncio.wait_for()在超时后会取消协程(注入CancelledError);而asyncio.timeout()默认也会取消。不需要取消协程时,可以在timeout()内部捕获TimeoutError:
import asyncio
async def main():
async def work():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("工作协程被取消,清理中...")
raise
async with asyncio.timeout(2) as cm:
try:
await work()
except asyncio.TimeoutError:
print("超时了,但我不想让work被取消")
# 注意:此时work可能已经被取消了
# 如果不想被取消,应该使用asyncio.shield()
print(f"是否超时: {cm.expired()}")
asyncio.run(main())
十、异步上下文管理器(async with)
异步上下文管理器是实现了__aenter__和__aexit__方法的对象,使用async with语法进入和退出。它最常见的用途是管理数据库连接、文件句柄等需要在异步环境中进行初始化和清理的资源。
import asyncio
# 自定义异步上下文管理器
class AsyncResource:
async def __aenter__(self):
print("异步打开资源...")
await asyncio.sleep(0.5)
print("资源已打开")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("异步关闭资源...")
await asyncio.sleep(0.5)
print("资源已关闭")
return False # 不屏蔽异常
async def work(self):
print("使用资源工作")
await asyncio.sleep(0.5)
print("工作完成")
async def main():
async with AsyncResource() as res:
await res.work()
# 离开async with块时自动调用__aexit__
asyncio.run(main())
# 实际应用示例:aiohttp的异步上下文管理器
# async with aiohttp.ClientSession() as session:
# async with session.get('https://example.com') as resp:
# data = await resp.text()
import asyncio
# 使用contextlib简化
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource(name):
print(f"打开: {name}")
await asyncio.sleep(0.3)
try:
yield f"资源-{name}"
finally:
print(f"关闭: {name}")
await asyncio.sleep(0.3)
async def main():
async with managed_resource("数据库连接") as res:
print(f"使用: {res}")
await asyncio.sleep(0.5)
asyncio.run(main())
实际应用: 数据库连接池(asyncpg、aiomysql)、HTTP会话(aiohttp)、Redis连接(aioredis)等三方库都提供了异步上下文管理器。使用async with可以确保无论是否发生异常,资源都能被正确释放。
十一、异步迭代器(async for)
异步迭代器实现了__aiter__和__anext__方法,使用async for进行迭代。它适用于需要异步等待每一个数据项的场景——比如分页查询、消息队列消费、流式数据处理等。
import asyncio
# 自定义异步迭代器
class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟异步等待
self.current += 1
return self.current
async def main():
async for num in AsyncCounter(5):
print(f"数字: {num}")
asyncio.run(main())
import asyncio
# 使用异步生成器(更简洁)
async def async_range(n, delay=0.3):
for i in range(n):
await asyncio.sleep(delay)
yield i
async def main():
async for value in async_range(5):
print(f"值: {value}")
asyncio.run(main())
11.1 实用示例:模拟分页数据获取
import asyncio
async def fetch_page(page_num):
"""模拟分页API调用"""
await asyncio.sleep(0.5)
if page_num > 3:
return [] # 没有更多数据
return [f"page-{page_num}-item-{i}" for i in range(3)]
async def paginated_loader():
page = 1
while True:
items = await fetch_page(page)
if not items:
return
for item in items:
yield item
page += 1
async def main():
async for item in paginated_loader():
print(f"处理: {item}")
# 输出: 共9个item,每0.5秒一页
asyncio.run(main())
适用场景: 异步迭代器特别适合流式处理场景——从Kafka/RabbitMQ消费消息、处理大文件的分块读取、遍历数据库游标等。与同步迭代器相比,async for在每次迭代之间不会阻塞事件循环。
十二、实际应用场景
12.1 并发Web请求(aiohttp)
import asyncio
import aiohttp
async def fetch_url(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
return f"Error: {e}"
async def main():
urls = [
"https://example.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, result in zip(urls, results):
print(f"{url}: {len(result)} 字符")
asyncio.run(main())
12.2 异步数据库操作
import asyncio
import asyncpg
async def query_users():
conn = await asyncpg.connect(
user="user", password="pass",
database="test", host="localhost"
)
try:
rows = await conn.fetch("SELECT id, name FROM users LIMIT 10")
for row in rows:
print(f"用户: {row['name']}")
finally:
await conn.close()
async def batch_queries():
conn = await asyncpg.connect(
user="user", password="pass",
database="test", host="localhost"
)
async with conn.transaction():
# 在同一个事务中并发执行多个查询
results = await asyncio.gather(
conn.fetch("SELECT * FROM users"),
conn.fetch("SELECT * FROM orders"),
)
await conn.close()
return results
# asyncio.run(query_users())
12.3 异步文件读写
标准open()是同步阻塞的,不应当在协程中直接使用。使用aiofiles库可以异步操作文件。
import asyncio
import aiofiles
async def read_large_file():
async with aiofiles.open("large_data.txt", mode="r") as f:
async for line in f:
await process_line(line) # 处理每一行
async def process_line(line):
# 模拟行级处理
await asyncio.sleep(0.01)
_ = line.strip()
async def write_output(data_list):
async with aiofiles.open("output.txt", mode="w") as f:
for item in data_list:
await f.write(f"{item}\n")
# asyncio.run(read_large_file())
最佳实践: 在实际项目中,使用连接池管理数据库连接(如asyncpg.create_pool);对HTTP请求设置合理的超时和重试策略;文件IO使用aiofiles避免阻塞事件循环。这些实践能确保你的异步程序在面对真实负载时保持稳定和高效。
十三、核心要点总结
概念 要点
事件循环 asyncio的核心调度器,单线程内通过协作式调度实现并发。Python 3.10后推荐统一使用asyncio.run()管理
async/await async def定义协程函数;await挂起当前协程等待结果返回。协程不await就不会执行
Task 通过asyncio.create_task()创建,将协程包装为独立调度单元,实现真正并发。需持有引用避免被回收
Future 底层占位符,代表将来的结果。Task是其子类。手动set_result可适配回调式API
gather 最常用的并发聚合API,并发执行多个awaitable,返回结果列表
wait 提供更细粒度的等待策略控制(FIRST_COMPLETED等),同时返回done/pending集合
as_completed 按完成顺序处理结果,适用于流式处理场景
shield 保护关键操作不被取消
超时控制 3.11+使用asyncio.timeout()上下文管理器;3.7-3.10使用asyncio.wait_for()
async with 异步上下文管理器,用于管理连接、文件等资源的生命周期
async for 异步迭代器/异步生成器,适用于分页加载、流式处理
性能优化: asyncio适用于I/O密集型任务——一个包含100个网络请求的程序,使用asyncio可以在单线程内并发完成,总耗时约等于最慢的单个请求。但对于CPU密集型计算(如大矩阵运算、图像处理),asyncio无济于事,应当使用多进程(multiprocessing、concurrent.futures.ProcessPoolExecutor)来处理。
十四、进一步思考
掌握asyncio是Python进阶道路上的重要里程碑。在掌握了本文的基础知识后,可以进一步探索以下几个方向:
异步Web框架: 深入学习FastAPI/Starlette的异步视图与依赖注入系统,理解异步路由的实现原理
深入事件循环: 阅读CPython中asyncio/base_events.py的源码,理解事件循环的具体实现
uvloop: 基于libuv的高性能事件循环替代实现,可替换asyncio默认的事件循环(约2倍性能提升)
异步队列: asyncio.Queue用于协程间的通信和数据同步,适合生产者-消费者模式
异步同步原语: asyncio.Lock、Semaphore、Event等用于控制协程的并发访问
结构化并发: Python 3.11引入的TaskGroup和Timeout(基于PEP 654的ExceptionGroup),提供了更健壮的并发管理模型
"异步编程不是让程序跑得更快,而是让程序在等待时不会闲着。"——asyncio的设计哲学,本质上是对I/O等待时间的高效复用。