Task与Future深入:任务创建、取消与回调

Python并发编程专题 · 协程任务的执行、组合与取消管理

专题:Python并发编程系统学习

关键词:Python, 并发编程, Task, Future, create_task, gather, cancel, shield

一、Task的概念

在asyncio中,Task是协程(coroutine)的包装器,它将一个协程对象封装成一个可调度的任务,并自动将其注册到事件循环中执行。可以把Task理解为"正在运行的协程"——当你调用一个异步函数时,你得到的是一个协程对象,但只有将它包装成Task后,它才会被实际调度执行。

Task与协程的核心区别在于:协程只是一个可暂停的函数,而Task是一个已经提交到事件循环、正在被调度执行的任务单元。Task继承自Future,因此它也具有Future的所有特性,比如查询完成状态、获取结果、添加回调等。

核心理解:Coroutine(协程)是"待执行的代码",Task(任务)是"正在执行的协程"。创建Task意味着将协程提交给事件循环去调度执行。

二、创建Task的三种方式

1. asyncio.create_task() — Python 3.7+ 推荐方式

这是最现代、最推荐的方式。它接收一个协程对象,返回一个Task对象,并将该协程自动调度到当前事件循环中执行。

import asyncio async def worker(n): await asyncio.sleep(n) return f"worker {n} 完成" async def main(): task = asyncio.create_task(worker(2)) result = await task print(result) asyncio.run(main())

2. loop.create_task() — 低阶API

通过事件循环对象创建任务。需要先获取当前事件循环实例。

async def main(): loop = asyncio.get_event_loop() task = loop.create_task(worker(3)) await task

3. asyncio.ensure_future() — 兼容旧版本

在Python 3.7之前的主要方式,现在主要用于兼容旧代码。它比create_task更灵活,可以接受协程、Future或awaitable对象。

task = asyncio.ensure_future(worker(4))

建议:在新代码中始终使用 asyncio.create_task(),语义清晰且是官方推荐的标准方式。

三、Task的状态管理

Task在整个生命周期中会经历不同的状态。理解这些状态对于正确管理任务至关重要。

Task的四种核心状态:

Task提供了两个状态检查方法:

async def check_status(): task = asyncio.create_task(worker(1)) print("刚创建:", task.done()) # False await task print("执行后:", task.done()) # True print("结果:", task.result()) # worker 1 完成

四、取消任务

在实际应用中,我们经常需要取消一个正在运行的任务,比如超时控制或用户中断请求。asyncio通过 task.cancel() 方法实现任务取消。

cancel() 的工作原理

调用 task.cancel() 会在任务内部抛出一个 asyncio.CancelledError 异常。这个异常会在任务的下一个await点被触发。如果任务正在执行一个await操作(如sleep、网络请求等),它会立即被中断。

async def long_running(): try: print("任务开始...") await asyncio.sleep(10) print("任务完成") # 被取消后不会执行到这里 except asyncio.CancelledError: print("任务被取消,执行清理...") raise # 必须重新抛出,否则asyncio认为任务正常完成 async def main(): task = asyncio.create_task(long_running()) await asyncio.sleep(1) task.cancel() # 1秒后取消任务 try: await task except asyncio.CancelledError: print("主函数捕获取消异常")

取消传播链

当一个Task被取消时,CancelledError会沿着await链传播。如果TaskA正在等待TaskB,那么取消TaskA也会级联影响到TaskB。理解这一点对于设计健壮的取消逻辑非常重要。

重要:捕获CancelledError后必须重新抛出(raise),除非你确实有充分的理由抑制取消。否则asyncio无法正确判断任务状态,可能导致意外行为。

五、asyncio.gather并发执行

asyncio.gather() 是最常用的并发执行函数,它接收多个awaitable对象,并发地运行它们,并等待所有任务完成。

async def fetch_data(url, delay): await asyncio.sleep(delay) return f"数据来自 {url}" async def main(): results = await asyncio.gather( fetch_data("url1", 3), fetch_data("url2", 1), fetch_data("url3", 2), ) print(results) # ['数据来自 url1', '数据来自 url2', '数据来自 url3'] # 注意:结果顺序与传入顺序一致,而非完成顺序

return_exceptions 参数

默认情况下,gather中任何一个任务抛出异常都会传播给调用者,其他未完成的任务仍然会继续执行。通过设置 return_exceptions=True,可以将异常作为结果返回,而不是抛出。

results = await asyncio.gather( task1, task2, task3, return_exceptions=True # 异常被包含在结果列表中 )

特点:gather的结果列表顺序与输入协程的顺序一致,不受完成时间影响。如果一个任务被取消,gather会抛出CancelledError。

六、asyncio.wait等待指定条件

asyncio.wait() 提供了比gather更细粒度的控制,它允许你指定返回条件(何时认为等待完成),并且返回已完成和未完成两组任务集合。

wait的三个返回条件(通过 return_when 参数指定):

async def main(): tasks = [ asyncio.create_task(worker(5)), asyncio.create_task(worker(2)), asyncio.create_task(worker(3)), ] # 等待第一个任务完成 done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) print("已完成:", len(done)) print("待完成:", len(pending)) # 取消剩余未完成的任务 for task in pending: task.cancel()

gather vs wait 选择指南:需要同时获取所有结果 → 用gather;需要控制返回条件或处理部分结果 → 用wait。gather更适合批量等待,wait更适合竞速模式或超时控制。

七、asyncio.shield保护任务

有时我们希望某个关键任务即使在外层被取消的情况下也能继续执行。asyncio.shield() 就是用来保护任务的——它为任务创建一个"保护罩",防止CancelledError传播到被保护的任务。

async def critical_operation(): await asyncio.sleep(5) return "关键操作完成" async def main(): # 使用shield保护关键操作 task = asyncio.create_task(critical_operation()) shielded = asyncio.shield(task) # 取消shield外围任务 — 实际任务不会被取消 shielded.cancel() try: result = await shielded print(result) # 仍然会输出"关键操作完成" except asyncio.CancelledError: print("shield被取消,但内部任务继续")

shield的注意事项

shield并不是绝对安全——它只保护被包裹的任务不被取消。如果被保护任务自身调用了cancel(),或者事件循环被停止,任务仍然会被取消。此外,shield只阻挡CancelledError的传播,并不阻挡其他异常。

典型场景:数据库写入操作、日志持久化、资源清理等关键操作适合用shield保护,确保即使客户端断开连接,这些操作也能完成。

八、add_done_callback回调

Task完成时可以自动调用回调函数,这通过 add_done_callback() 方法实现。当任务进入完成状态(包括正常完成、被取消或抛出异常)时,注册的回调函数会被自动调用。

def on_task_done(task): try: result = task.result() print(f"回调收到: {result}") except Exception as e: print(f"回调收到异常: {e}") async def main(): task = asyncio.create_task(worker(2)) task.add_done_callback(on_task_done) # 注册回调 await task

回调的执行特性

提醒:回调在事件循环线程中同步执行,如果回调中有耗时操作,会阻塞事件循环。需要执行耗时操作时,应考虑使用 loop.run_in_executor() 将其移到线程池中执行。

九、综合示例

下面的综合示例展示了Task创建、取消、回调、gather和shield的协同使用:

import asyncio async def safe_write(data): """模拟受保护的写操作""" await asyncio.sleep(2) print(f"已安全写入: {data}") return "写入成功" def log_callback(task): print(f"[日志] 任务完成: {task.result()}") async def main(): # 创建多个任务 write_task = asyncio.create_task(safe_write("日志数据")) calc_task = asyncio.create_task(worker(1)) # 为写操作添加shield保护 shielded_write = asyncio.shield(write_task) shielded_write.add_done_callback(log_callback) # 使用gather并发等待,但写操作受shield保护 results = await asyncio.gather( shielded_write, calc_task, return_exceptions=True ) print("最终结果:", results) asyncio.run(main())