asyncio异步编程基础

Python进阶编程专题 · Python异步IO编程入门

专题:Python进阶编程系统学习

关键词:Python, asyncio, 异步编程, async/await, 协程, Task, Future, 事件循环

一、概述:为什么需要asyncio

在Python开发中,我们经常面临I/O密集型任务——网络请求、文件读写、数据库查询等。传统同步模型下,程序在等待I/O完成时会阻塞CPU,造成资源浪费。多线程虽然可以缓解,但线程切换开销大,且存在GIL(全局解释器锁)的限制。asyncio是Python 3.4引入、3.5开始通过async/await语法完善的标准库异步I/O框架,它采用单线程协作式调度的方式,让程序在等待I/O时主动让出控制权,从而高效利用CPU。

核心思想:asyncio通过事件循环(Event Loop)调度协程(Coroutine),在单线程内实现并发。当一个协程遇到await等待I/O时,事件循环会切换到另一个准备好的协程执行,避免了线程切换的开销和GIL的限制。

asyncio适用场景非常广泛:高并发Web服务(aiohttp、FastAPI底层依赖)、爬虫、消息队列消费、实时数据处理等。但不适用于CPU密集型计算——这种场景应当使用多进程(multiprocessing)。

二、事件循环(Event Loop)

事件循环是asyncio的核心引擎,它负责接收、调度和执行协程任务。可以把它理解为一个永不停止的"调度器":当协程执行到await时,它会被挂起,事件循环从就绪队列中取出下一个协程继续执行。

2.1 获取与设置事件循环

Python 3.10之后,事件循环的管理方式发生了变化。推荐使用asyncio.run(),它会自动创建新的事件循环、运行协程并在完成后关闭循环。在更复杂的场景中,也可以手动管理。

import asyncio # Python 3.7+ 推荐的顶层入口(自动管理事件循环) async def main(): print("Hello") await asyncio.sleep(1) print("World") asyncio.run(main()) # 低层级API(Python 3.10前常见,现在一般不需要手动写) # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # loop.run_until_complete(main()) # loop.close()

2.2 事件循环的底层工作流程

事件循环内部维护了三个关键队列:就绪队列(Ready Queue)存放可以立即执行的协程;等待队列(Waiting Queue)存放正在等待I/O或定时器的协程;回调队列(Callback Queue)存放需要执行的回调函数。每次循环迭代时,事件循环会从就绪队列取任务执行,检查等待队列中的任务是否已完成,然后处理回调。

# 事件循环伪代码示意 class EventLoop: def __init__(self): self._ready = [] # 就绪队列 self._waiting = [] # 等待队列 self._stopping = False def run_forever(self): while not self._stopping: # 1. 执行所有就绪任务 while self._ready: task = self._ready.pop(0) task.step() # 2. 检查等待队列中的任务是否可恢复 for task in self._waiting[:]: if task._fut.done(): self._waiting.remove(task) self._ready.append(task) # 3. 如果没有就绪任务,等待I/O事件 if not self._ready and not self._waiting: break

要点:事件循环不是无限的"忙等待",当没有就绪任务时,它会在操作系统的I/O多路复用接口(如epoll/select)上阻塞等待,直到有事件发生才被唤醒。这正是asyncio高效节能的关键。

三、async/await 协程定义

Python 3.5通过PEP 492引入了async/await语法,使协程的定义和使用变得直观。使用async def定义的函数就是一个协程函数,调用它不会立即执行函数体,而是返回一个协程对象。

import asyncio # 定义一个协程函数 async def fetch_data(url): print(f"开始请求: {url}") await asyncio.sleep(1) # 模拟网络请求 print(f"完成请求: {url}") return f"Data from {url}" # 调用协程函数返回协程对象,不会执行 coro = fetch_data("https://example.com") print(type(coro)) # <class 'coroutine'> # 协程必须被调度才能执行 # 方式一:asyncio.run() # asyncio.run(fetch_data("https://example.com")) # 方式二:await 另一个协程中 async def main(): result = await fetch_data("https://example.com") print(result) asyncio.run(main())

3.1 await关键字的含义

await关键字有两个作用:一是挂起当前协程,将控制权交还给事件循环;二是等待被await的awaitable对象完成并获取其结果。awaitable对象包括:协程(coroutine)、Task、Future以及实现了__await__方法的对象。

常见的错误:初学者容易忘记在协程前加await。如果直接调用async函数而不加await,返回的只是一个协程对象,协程体并不会执行。这通常会导致"RuntimeWarning: coroutine was never awaited"的警告。

import asyncio async def say_after(delay, msg): await asyncio.sleep(delay) print(msg) async def main(): # 错误:忘记await,协程不会执行 # say_after(1, "Hello") # 协程对象被创建但未调度 # 正确:使用await await say_after(1, "Hello") await say_after(2, "World") asyncio.run(main())

四、asyncio.run() 启动协程

asyncio.run()是Python 3.7引入的顶层入口函数,它封装了事件循环的创建、运行和关闭。绝大多数asyncio程序都应当以asyncio.run(main())作为入口。

import asyncio async def main(): print("程序开始") await asyncio.sleep(1) print("程序结束") return 42 # run() 会执行以下操作: # 1. 创建新的事件循环 # 2. 将main()作为入口协程调度执行 # 3. 等待协程和所有任务完成 # 4. 关闭事件循环 # 5. 返回协程的返回值 result = asyncio.run(main()) print(f"返回值: {result}") # 42

注意事项:asyncio.run()在同一个线程中只能调用一次,且不能在事件循环内部再次调用。在Jupyter Notebook或交互式环境中,推荐使用await直接在已有的事件循环中执行协程,或者使用asyncio.run()。

五、create_task 创建任务

await是按顺序执行的——必须等前一个协程完成后才会执行下一个。要实现真正的并发,需要将协程包装为Task(任务)。Task是Future的子类,它会在事件循环中独立调度,不会阻塞创建它的协程。

import asyncio async def fetch_url(url, delay): await asyncio.sleep(delay) print(f"完成: {url}") return url async def main(): # 创建3个任务,它们会并发执行 task1 = asyncio.create_task(fetch_url("url-1", 3)) task2 = asyncio.create_task(fetch_url("url-2", 2)) task3 = asyncio.create_task(fetch_url("url-3", 1)) # 等待所有任务完成 # 注意:此时三个任务已经在后台并发执行了! r1 = await task1 r2 = await task2 r3 = await task3 print(f"结果: {r1}, {r2}, {r3}") # 输出顺序:url-3, url-2, url-1(按延迟从小到大) # 但结果打印会在所有完成后 asyncio.run(main())

任务的执行顺序是不确定的,取决于它们的I/O等待时间和事件循环的调度策略。create_task会在当前事件循环中创建任务,并立刻开始调度——不需要显式await也能执行。但如果不await任务就退出主协程,任务可能被强制取消。

import asyncio async def background_task(): await asyncio.sleep(10) print("后台任务完成") async def main(): task = asyncio.create_task(background_task()) await asyncio.sleep(1) print("主协程结束") # 主协程结束,程序退出,background_task被取消 asyncio.run(main()) # 输出: 主协程结束 (background_task不会完成)

关键点:create_task返回的Task对象必须被引用(赋值给变量),否则它会被垃圾回收。Python 3.11之前,没有被引用的Task会悄无声息地丢失;3.11之后会发出RuntimeWarning。建议对所有Task持有引用。

六、Task对象详解

Task对象提供了丰富的API来管理和监控协程的执行状态。理解这些API对编写健壮的异步程序至关重要。

6.1 Task核心方法

import asyncio async def demo_task(name, delay): try: await asyncio.sleep(delay) return f"{name}完成了" except asyncio.CancelledError: print(f"{name}被取消了") raise async def main(): task = asyncio.create_task(demo_task("demo", 5)) # 检查任务状态 print(f"是否完成: {task.done()}") # False print(f"是否取消: {task.cancelled()}") # False # 添加完成回调 def callback(t): print(f"回调: 任务完成, 结果={t.result()}") task.add_done_callback(callback) # 3秒后取消任务 await asyncio.sleep(3) cancelled = task.cancel() # 请求取消,返回True表示取消成功 print(f"取消请求: {cancelled}") try: result = await task print(f"任务结果: {result}") except asyncio.CancelledError: print("主协程捕获到取消异常") print(f"最终状态 - done: {task.done()}, cancelled: {task.cancelled()}") asyncio.run(main())
方法/属性说明
task.done()返回任务是否已完成(正常完成/取消/异常)
task.cancelled()返回任务是否被取消
task.result()返回任务的结果;如果任务未完成则引发InvalidStateError,如果任务异常则引发对应异常
task.cancel(msg="")请求取消任务,向协程内注入CancelledError。返回True表示取消请求已发送
task.add_done_callback(cb)注册完成回调函数,任务完成时自动调用(不论成功、取消还是异常)
task.remove_done_callback(cb)移除已注册的回调
task.exception()返回任务中引发的异常,没有异常返回None

6.2 任务取消的注意事项

CancelledError是BaseException的子类(不是Exception的子类),因此普通的except Exception无法捕获它。在协程中正确清理资源的模式是使用try/finally或try/except CancelledError。

import asyncio async def task_with_cleanup(): resource = acquire_resource() try: await do_work() except asyncio.CancelledError: print("任务被取消,正在清理...") resource.cleanup() raise # 必须重新抛出,否则任务被视为已完成而非取消 finally: # finally中的代码在取消时也会执行 print("finally清理")

七、Future对象底层机制

Future是asyncio的底层组件,代表一个"未来"的结果。它类似于并发编程中的"占位符":一个Future对象最初是"未完成"(pending)状态,当关联的操作完成时,外部代码通过set_result()或set_exception()将其设置为"已完成"(done)状态,await该Future的协程会恢复执行。

Task继承了Future,但Future更底层。在日常开发中,我们很少直接使用Future——Task已经满足了绝大部分需求。但理解Future的机制对于深入理解asyncio的原理非常有帮助。

import asyncio async def future_example(): # 创建一个Future对象,初始状态为pending future = asyncio.get_running_loop().create_future() print(f"初始状态: {future.done()}") # False # 模拟在另一个地方设置Future的结果 async def set_result_later(fut, delay, value): await asyncio.sleep(delay) fut.set_result(value) # 设置结果,唤醒等待的协程 print(f"Future 结果已设置: {value}") # 启动一个任务稍后设置结果 asyncio.create_task(set_result_later(future, 2, 42)) # 等待Future完成(会阻塞2秒) result = await future print(f"Future 结果: {result}") asyncio.run(future_example())

Future vs Task:Task是Future的子类。Task包装了一个协程,会在事件循环中自动调度执行,并在协程完成时自动设置Future的结果。如果使用底层的Future,你需要手动调用set_result()来标记完成——这在封装回调式API为协程时很有用。

7.1 使用Future适配回调式API

Future的一个重要用途是将基于回调的旧API包装为async/await接口。

import asyncio # 模拟一个基于回调的API def legacy_api(callback): import threading def _run(): import time time.sleep(1) callback("legacy result") threading.Thread(target=_run, daemon=True).start() # 使用Future将其包装为协程 async def async_wrapper(): loop = asyncio.get_running_loop() future = loop.create_future() def callback(result): # 在回调中设置Future的结果 loop.call_soon_threadsafe(future.set_result, result) legacy_api(callback) return await future async def main(): result = await async_wrapper() print(f"包装后的结果: {result}") asyncio.run(main())

理解要点:Future的set_result/set_exception方法会唤醒所有await这个Future的协程。call_soon_threadsafe是线程安全的API,用于从其他线程向事件循环投递任务。这种模式在整合旧代码时非常实用。

八、高层API详解

asyncio提供了多个高层API来简化并发任务的编排。掌握这些工具能大幅提高编码效率。

8.1 asyncio.sleep()

最基础的等待API。与time.sleep()阻塞整个线程不同,asyncio.sleep()会挂起当前协程,让事件循环执行其他任务。

import asyncio async def waiter(n): print(f"等待 {n} 秒开始") await asyncio.sleep(n) print(f"等待 {n} 秒结束") async def main(): # 3个等待并发执行,总耗时约3秒(而非1+2+3=6秒) await asyncio.gather( waiter(3), waiter(2), waiter(1), ) asyncio.run(main())

8.2 asyncio.gather()

gather是最常用的并发API。它接收多个awaitable对象,并发执行它们,并返回所有结果的列表。默认情况下,如果其中任何一个任务抛出异常,gather会取消所有其他任务。

import asyncio async def fetch(n): await asyncio.sleep(n) if n == 2: raise ValueError(f"错误发生在n={n}") return n * 10 async def main(): # 并发执行,返回结果列表 results = await asyncio.gather( fetch(1), fetch(2), fetch(3), return_exceptions=True # 将异常作为结果返回,而非抛出 ) print(results) # [10, ValueError(...), 30] # return_exceptions=False(默认) results = await asyncio.gather( fetch(1), fetch(2), fetch(3), return_exceptions=False # fetch(2)抛异常,gather立即抛出 ) # 注意:上面的调用会直接抛出异常,不会执行到下一行 asyncio.run(main())

8.3 asyncio.wait()

wait接收一个Task集合,提供了更细粒度的控制。通过FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED等常量控制返回时机。

import asyncio async def task(n): await asyncio.sleep(n) return n async def main(): tasks = [asyncio.create_task(task(i)) for i in [1, 3, 5]] # 任一任务完成后立即返回(不会取消其他任务) done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) print(f"已完成: {len(done)} 个") for t in done: print(f" 结果: {t.result()}") print(f"等待中: {len(pending)} 个") # 等待pending中的任务完成(如果不等待,程序退出时它们会被取消) if pending: await asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED) asyncio.run(main())

8.4 asyncio.shield()

shield用于保护协程不被取消。当外部调用task.cancel()时,shield内部的协程不会收到CancelledError,而是继续执行。这在需要确保关键操作(如写数据库)不被中断时非常有用。

import asyncio async def critical_operation(): print("关键操作开始...") await asyncio.sleep(3) print("关键操作完成!") return "important data" async def main(): task = asyncio.create_task(critical_operation()) shielded = asyncio.shield(task) await asyncio.sleep(0.5) # 尝试取消屏蔽的任务 shielded.cancel() # 这会取消"屏蔽"本身,但不会影响内部task try: result = await shielded print(f"结果: {result}") except asyncio.CancelledError: print("屏蔽被取消,但内部任务仍在运行") # 内部任务还在执行,可以再次等待它 result = await task print(f"内部任务最终结果: {result}") asyncio.run(main())

8.5 asyncio.as_completed()

as_completed返回一个迭代器,按照任务完成的先后顺序产出结果(而不是按传入顺序)。它适用于需要"尽快处理已完成任务"的场景。

import asyncio async def worker(n): await asyncio.sleep(n) return f"worker-{n}" async def main(): tasks = [ asyncio.create_task(worker(3)), asyncio.create_task(worker(1)), asyncio.create_task(worker(2)), ] # 按照完成顺序处理结果 for coro in asyncio.as_completed(tasks): result = await coro print(f"处理完成: {result}") # 输出顺序: worker-1, worker-2, worker-3 asyncio.run(main())

九、超时控制

在实际开发中,网络请求、数据库查询等操作可能因各种原因挂起,必须设置超时保护。

9.1 asyncio.timeout()(Python 3.11+)

Python 3.11新增的timeout上下文管理器,提供了简洁的超时控制语法。

import asyncio async def slow_operation(): await asyncio.sleep(10) return "done" async def main(): try: async with asyncio.timeout(3): # 3秒超时 result = await slow_operation() print(result) except asyncio.TimeoutError: print("操作超时!") asyncio.run(main())

9.2 asyncio.wait_for()(Python 3.7+)

wait_for是Python 3.7到3.10间的主要超时方案,在3.11及之后依然可用。

import asyncio async def slow_operation(): await asyncio.sleep(10) return "done" async def main(): try: # 等待协程完成,但最多等待3秒 result = await asyncio.wait_for(slow_operation(), timeout=3) print(result) except asyncio.TimeoutError: print("操作超时!") asyncio.run(main())

重要区别:asyncio.wait_for()在超时后会取消协程(注入CancelledError);而asyncio.timeout()默认也会取消。不需要取消协程时,可以在timeout()内部捕获TimeoutError:

import asyncio async def main(): async def work(): try: await asyncio.sleep(10) except asyncio.CancelledError: print("工作协程被取消,清理中...") raise async with asyncio.timeout(2) as cm: try: await work() except asyncio.TimeoutError: print("超时了,但我不想让work被取消") # 注意:此时work可能已经被取消了 # 如果不想被取消,应该使用asyncio.shield() print(f"是否超时: {cm.expired()}") asyncio.run(main())

十、异步上下文管理器(async with)

异步上下文管理器是实现了__aenter__和__aexit__方法的对象,使用async with语法进入和退出。它最常见的用途是管理数据库连接、文件句柄等需要在异步环境中进行初始化和清理的资源。

import asyncio # 自定义异步上下文管理器 class AsyncResource: async def __aenter__(self): print("异步打开资源...") await asyncio.sleep(0.5) print("资源已打开") return self async def __aexit__(self, exc_type, exc_val, exc_tb): print("异步关闭资源...") await asyncio.sleep(0.5) print("资源已关闭") return False # 不屏蔽异常 async def work(self): print("使用资源工作") await asyncio.sleep(0.5) print("工作完成") async def main(): async with AsyncResource() as res: await res.work() # 离开async with块时自动调用__aexit__ asyncio.run(main()) # 实际应用示例:aiohttp的异步上下文管理器 # async with aiohttp.ClientSession() as session: # async with session.get('https://example.com') as resp: # data = await resp.text()
import asyncio # 使用contextlib简化 from contextlib import asynccontextmanager @asynccontextmanager async def managed_resource(name): print(f"打开: {name}") await asyncio.sleep(0.3) try: yield f"资源-{name}" finally: print(f"关闭: {name}") await asyncio.sleep(0.3) async def main(): async with managed_resource("数据库连接") as res: print(f"使用: {res}") await asyncio.sleep(0.5) asyncio.run(main())

实际应用:数据库连接池(asyncpg、aiomysql)、HTTP会话(aiohttp)、Redis连接(aioredis)等三方库都提供了异步上下文管理器。使用async with可以确保无论是否发生异常,资源都能被正确释放。

十一、异步迭代器(async for)

异步迭代器实现了__aiter__和__anext__方法,使用async for进行迭代。它适用于需要异步等待每一个数据项的场景——比如分页查询、消息队列消费、流式数据处理等。

import asyncio # 自定义异步迭代器 class AsyncCounter: def __init__(self, limit): self.limit = limit self.current = 0 def __aiter__(self): return self async def __anext__(self): if self.current >= self.limit: raise StopAsyncIteration await asyncio.sleep(0.5) # 模拟异步等待 self.current += 1 return self.current async def main(): async for num in AsyncCounter(5): print(f"数字: {num}") asyncio.run(main())
import asyncio # 使用异步生成器(更简洁) async def async_range(n, delay=0.3): for i in range(n): await asyncio.sleep(delay) yield i async def main(): async for value in async_range(5): print(f"值: {value}") asyncio.run(main())

11.1 实用示例:模拟分页数据获取

import asyncio async def fetch_page(page_num): """模拟分页API调用""" await asyncio.sleep(0.5) if page_num > 3: return [] # 没有更多数据 return [f"page-{page_num}-item-{i}" for i in range(3)] async def paginated_loader(): page = 1 while True: items = await fetch_page(page) if not items: return for item in items: yield item page += 1 async def main(): async for item in paginated_loader(): print(f"处理: {item}") # 输出: 共9个item,每0.5秒一页 asyncio.run(main())

适用场景:异步迭代器特别适合流式处理场景——从Kafka/RabbitMQ消费消息、处理大文件的分块读取、遍历数据库游标等。与同步迭代器相比,async for在每次迭代之间不会阻塞事件循环。

十二、实际应用场景

12.1 并发Web请求(aiohttp)

import asyncio import aiohttp async def fetch_url(session, url): try: async with session.get(url, timeout=10) as response: return await response.text() except Exception as e: return f"Error: {e}" async def main(): urls = [ "https://example.com", "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", ] async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for url, result in zip(urls, results): print(f"{url}: {len(result)} 字符") asyncio.run(main())

12.2 异步数据库操作

import asyncio import asyncpg async def query_users(): conn = await asyncpg.connect( user="user", password="pass", database="test", host="localhost" ) try: rows = await conn.fetch("SELECT id, name FROM users LIMIT 10") for row in rows: print(f"用户: {row['name']}") finally: await conn.close() async def batch_queries(): conn = await asyncpg.connect( user="user", password="pass", database="test", host="localhost" ) async with conn.transaction(): # 在同一个事务中并发执行多个查询 results = await asyncio.gather( conn.fetch("SELECT * FROM users"), conn.fetch("SELECT * FROM orders"), ) await conn.close() return results # asyncio.run(query_users())

12.3 异步文件读写

标准open()是同步阻塞的,不应当在协程中直接使用。使用aiofiles库可以异步操作文件。

import asyncio import aiofiles async def read_large_file(): async with aiofiles.open("large_data.txt", mode="r") as f: async for line in f: await process_line(line) # 处理每一行 async def process_line(line): # 模拟行级处理 await asyncio.sleep(0.01) _ = line.strip() async def write_output(data_list): async with aiofiles.open("output.txt", mode="w") as f: for item in data_list: await f.write(f"{item}\n") # asyncio.run(read_large_file())

最佳实践:在实际项目中,使用连接池管理数据库连接(如asyncpg.create_pool);对HTTP请求设置合理的超时和重试策略;文件IO使用aiofiles避免阻塞事件循环。这些实践能确保你的异步程序在面对真实负载时保持稳定和高效。

十三、核心要点总结

概念要点
事件循环asyncio的核心调度器,单线程内通过协作式调度实现并发。Python 3.10后推荐统一使用asyncio.run()管理
async/awaitasync def定义协程函数;await挂起当前协程等待结果返回。协程不await就不会执行
Task通过asyncio.create_task()创建,将协程包装为独立调度单元,实现真正并发。需持有引用避免被回收
Future底层占位符,代表将来的结果。Task是其子类。手动set_result可适配回调式API
gather最常用的并发聚合API,并发执行多个awaitable,返回结果列表
wait提供更细粒度的等待策略控制(FIRST_COMPLETED等),同时返回done/pending集合
as_completed按完成顺序处理结果,适用于流式处理场景
shield保护关键操作不被取消
超时控制3.11+使用asyncio.timeout()上下文管理器;3.7-3.10使用asyncio.wait_for()
async with异步上下文管理器,用于管理连接、文件等资源的生命周期
async for异步迭代器/异步生成器,适用于分页加载、流式处理

性能优化:asyncio适用于I/O密集型任务——一个包含100个网络请求的程序,使用asyncio可以在单线程内并发完成,总耗时约等于最慢的单个请求。但对于CPU密集型计算(如大矩阵运算、图像处理),asyncio无济于事,应当使用多进程(multiprocessing、concurrent.futures.ProcessPoolExecutor)来处理。

十四、进一步思考

掌握asyncio是Python进阶道路上的重要里程碑。在掌握了本文的基础知识后,可以进一步探索以下几个方向:

"异步编程不是让程序跑得更快,而是让程序在等待时不会闲着。"——asyncio的设计哲学,本质上是对I/O等待时间的高效复用。