concurrent.futures:Future模式与Executor框架

Python并发编程专题 · 统一线程池与进程池的高级接口

专题:Python并发编程系统学习

关键词:Python, 并发编程, concurrent.futures, Future, Executor, 线程池, 进程池

一、concurrent.futures概述

concurrent.futures 是 Python 3.2(PEP 3148)引入的标准库模块,提供统一的高级接口来管理线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)。其核心设计遵循 Future 模式——将任务的提交与执行解耦,调用者通过 Future 对象获取异步结果,无需关心底层调度细节。

该模块极大地简化了并发编程:开发者只需专注于业务逻辑(提交任务、收集结果),而将线程/进程的生命周期管理、任务队列调度交给 Executor 框架自动处理。Python 标准库中原有的 threading 和 multiprocessing 模块更偏底层,而 concurrent.futures 提供了更简洁的抽象层。

二、Executor基类与核心API

Executor 是抽象基类,定义了统一的执行器接口。ThreadPoolExecutor 和 ProcessPoolExecutor 是其两个具体实现:前者使用线程池执行调用,适合 I/O 密集型任务;后者使用进程池,适合 CPU 密集型任务。

class Executor: def submit(fn, /, *args, **kwargs) -> Future def map(func, *iterables, timeout=None, chunksize=1) -> Iterator def shutdown(wait=True, cancel_futures=False) def __enter__() # 支持 with 语句 def __exit__(...)

核心方法说明:

重要提示:始终使用 with 语句管理 Executor 生命周期,避免资源泄漏。with 块退出时自动等待所有任务完成。

三、submit与Future对象

submit 是最常用的方法,它将一个可调用对象提交给执行器并立即返回一个 Future 实例。Future 代表一个异步计算的结果——可能还没有完成,但可以在将来某个时刻获取。

from concurrent.futures import ThreadPoolExecutor def compute(n): return n * n with ThreadPoolExecutor() as exe: future = exe.submit(compute, 10) result = future.result() # 获取结果 print(result)

Future 对象提供以下关键方法:

设计理念:Future 模式将"提交"与"等待"分离。提交后调用者可以继续做其他事情,只有在需要结果时才调用 result() 阻塞等待。这正是并发编程中"异步非阻塞"的核心思想。

四、map批量提交

map 方法提供简洁的批量任务提交方式,类似内置 map 函数但并行执行。它按输入迭代器的顺序返回结果,保持结果顺序与输入顺序一致。

from concurrent.futures import ThreadPoolExecutor def square(n): return n ** 2 with ThreadPoolExecutor(max_workers=4) as exe: results = list(exe.map(square, range(10))) print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

map 参数说明:

注意:map 返回的迭代器是惰性求值的——遍历时才会阻塞等待对应结果。如果某个任务耗时很长,后续结果即使已就绪也会被阻塞(因为顺序保持)。

五、wait等待策略

concurrent.futures.wait 是一个模块级函数,提供精细化的等待控制。它接收一个 Future 集合和返回条件,阻塞直到满足条件。

from concurrent.futures import ( ThreadPoolExecutor, wait, FIRST_COMPLETED ) def task(n): return n * 2 with ThreadPoolExecutor(max_workers=4) as exe: futures = [exe.submit(task, i) for i in range(5)] done, not_done = wait(futures, return_when=FIRST_COMPLETED) print(f"已完成: {len(done)}") print(f"未完成: {len(not_done)}")

return_when 可选值:

wait 返回一个命名元组 (done, not_done),分别包含已完成和未完成的 Future 集合。

六、as_completed迭代结果

as_completed 是另一个重要的模块级函数,它返回一个迭代器,按任务完成的先后顺序 yield Future 对象。相比 map 的顺序保持,as_completed 让调用者可以立即处理每一个先完成的任务结果。

from concurrent.futures import ( ThreadPoolExecutor, as_completed ) import time def slow_task(n): time.sleep(n) return f"任务{n}完成" with ThreadPoolExecutor(max_workers=3) as exe: futures = {exe.submit(slow_task, i): i for i in range(5, 0, -1)} for future in as_completed(futures): result = future.result() print(result)

上例中任务按 5->4->3->2->1 秒的延迟提交,但 as_completed 会按完成顺序输出:任务1、任务2、任务3、任务4、任务5。这在需要逐步展示结果的场景(如爬虫收集)中非常实用。

七、异常与超时处理

并发编程中异常处理至关重要。Future 对象提供了两种获取异常的方式:exception() 和 result() 的超时机制。

from concurrent.futures import ThreadPoolExecutor import time def risky_task(x): if x < 0: raise ValueError("负数不允许") return x ** 0.5 with ThreadPoolExecutor() as exe: future = exe.submit(risky_task, -1) # 方式一:检查异常 exc = future.exception() if exc: print(f"发生异常: {exc}") # 方式二:捕获 result() 抛出的异常 try: result = future.result() except ValueError as e: print(f"捕获异常: {e}") # 超时处理 future2 = exe.submit(time.sleep, 10) try: future2.result(timeout=2) # 2秒超时 except TimeoutError: print("任务超时")

关键要点:

最佳实践:要么调用 future.result() 捕获所有异常,要么调用 future.exception() 显式检查。永远不要创建 Future 后对其结果不闻不问。

八、cancel取消与回调

cancel() 方法允许取消尚未开始执行的任务。add_done_callback() 则注册一个回调函数,在 Future 完成(正常完成、异常或取消)时自动调用。

from concurrent.futures import ThreadPoolExecutor import time def on_done(future): # 回调接收 Future 自身作为参数 print(f"任务完成: {future.result()}") with ThreadPoolExecutor(max_workers=2) as exe: future = exe.submit(time.sleep, 1) future.add_done_callback(on_done) # 尝试取消一个尚未启动的任务 futures = [exe.submit(time.sleep, 5) for _ in range(5)] cancelled = [f.cancel() for f in futures] print(f"取消结果: {cancelled}") # 对已完成或正在运行的返回 False

cancel 注意事项:

add_done_callback 注意事项:

高级技巧:add_done_callback 与 as_completed 本质上是等效的两种方式——回调是"推送"模式,as_completed 是"拉取"模式。在构建响应式系统或事件驱动架构时,回调模式更为自然。