← 返回并发编程目录
← 返回学习笔记首页
专题: Python并发编程系统学习
关键词: Python, 并发编程, concurrent.futures, Future, Executor, 线程池, 进程池
一、concurrent.futures概述
concurrent.futures 是 Python 3.2(PEP 3148)引入的标准库模块,提供统一的高级接口来管理线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)。其核心设计遵循 Future 模式——将任务的提交与执行解耦,调用者通过 Future 对象获取异步结果,无需关心底层调度细节。
该模块极大地简化了并发编程:开发者只需专注于业务逻辑(提交任务、收集结果),而将线程/进程的生命周期管理、任务队列调度交给 Executor 框架自动处理。Python 标准库中原有的 threading 和 multiprocessing 模块更偏底层,而 concurrent.futures 提供了更简洁的抽象层。
二、Executor基类与核心API
Executor 是抽象基类,定义了统一的执行器接口。ThreadPoolExecutor 和 ProcessPoolExecutor 是其两个具体实现:前者使用线程池执行调用,适合 I/O 密集型任务;后者使用进程池,适合 CPU 密集型任务。
class Executor:
def submit (fn, /, *args, **kwargs) -> Future
def map (func, *iterables, timeout=None , chunksize=1 ) -> Iterator
def shutdown (wait=True , cancel_futures=False )
def __enter__ () # 支持 with 语句
def __exit__ (...)
核心方法说明:
submit :提交单个任务,立即返回 Future 对象
map :批量提交,按输入顺序返回结果迭代器
shutdown :清理资源,wait=True 等待所有任务完成;cancel_futures=True 取消未启动任务
with 语句 :自动调用 shutdown(wait=True),推荐用法
重要提示: 始终使用 with 语句管理 Executor 生命周期,避免资源泄漏。with 块退出时自动等待所有任务完成。
三、submit与Future对象
submit 是最常用的方法,它将一个可调用对象提交给执行器并立即返回一个 Future 实例。Future 代表一个异步计算的结果——可能还没有完成,但可以在将来某个时刻获取。
from concurrent.futures import ThreadPoolExecutor
def compute (n):
return n * n
with ThreadPoolExecutor() as exe:
future = exe.submit(compute, 10 )
result = future.result() # 获取结果
print(result)
Future 对象提供以下关键方法:
result(timeout=None) :阻塞直到结果就绪或超时
exception(timeout=None) :返回任务抛出的异常,无异常返回 None
done() :非阻塞,立即返回布尔值表示任务是否完成
running() :返回任务是否正在运行
cancel() :尝试取消(仅未启动的任务可取消)
add_done_callback(fn) :任务完成时自动调用回调函数
设计理念: Future 模式将"提交"与"等待"分离。提交后调用者可以继续做其他事情,只有在需要结果时才调用 result() 阻塞等待。这正是并发编程中"异步非阻塞"的核心思想。
四、map批量提交
map 方法提供简洁的批量任务提交方式,类似内置 map 函数但并行执行。它按输入迭代器的顺序返回结果,保持结果顺序与输入顺序一致。
from concurrent.futures import ThreadPoolExecutor
def square (n):
return n ** 2
with ThreadPoolExecutor(max_workers=4 ) as exe:
results = list (exe.map(square, range (10 )))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
map 参数说明:
func :要执行的可调用对象
iterables :可迭代的输入参数(支持多个可迭代对象)
timeout :可选,设置整体超时时间
chunksize :仅 ProcessPoolExecutor 有效,将迭代器分块减少进程间通信开销
注意: map 返回的迭代器是惰性求值的——遍历时才会阻塞等待对应结果。如果某个任务耗时很长,后续结果即使已就绪也会被阻塞(因为顺序保持)。
五、wait等待策略
concurrent.futures.wait 是一个模块级函数,提供精细化的等待控制。它接收一个 Future 集合和返回条件,阻塞直到满足条件。
from concurrent.futures import (
ThreadPoolExecutor, wait, FIRST_COMPLETED
)
def task (n):
return n * 2
with ThreadPoolExecutor(max_workers=4 ) as exe:
futures = [exe.submit(task, i) for i in range (5 )]
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
print(f"已完成: {len(done)}" )
print(f"未完成: {len(not_done)}" )
return_when 可选值:
FIRST_COMPLETED :任意一个任务完成即返回
FIRST_EXCEPTION :任意一个任务抛出异常即返回(若无异常则等价于 ALL_COMPLETED)
ALL_COMPLETED :所有任务完成才返回(默认值)
wait 返回一个命名元组 (done, not_done),分别包含已完成和未完成的 Future 集合。
六、as_completed迭代结果
as_completed 是另一个重要的模块级函数,它返回一个迭代器,按任务完成的先后顺序 yield Future 对象。相比 map 的顺序保持,as_completed 让调用者可以立即处理每一个先完成的任务结果。
from concurrent.futures import (
ThreadPoolExecutor, as_completed
)
import time
def slow_task (n):
time.sleep (n)
return f"任务{n}完成"
with ThreadPoolExecutor(max_workers=3 ) as exe:
futures = {exe.submit(slow_task, i): i for i in range (5 , 0 , -1 )}
for future in as_completed (futures):
result = future.result()
print(result)
上例中任务按 5->4->3->2->1 秒的延迟提交,但 as_completed 会按完成顺序输出:任务1、任务2、任务3、任务4、任务5。这在需要逐步展示结果的场景(如爬虫收集)中非常实用。
七、异常与超时处理
并发编程中异常处理至关重要。Future 对象提供了两种获取异常的方式:exception() 和 result() 的超时机制。
from concurrent.futures import ThreadPoolExecutor
import time
def risky_task (x):
if x < 0 :
raise ValueError ("负数不允许" )
return x ** 0.5
with ThreadPoolExecutor() as exe:
future = exe.submit(risky_task, -1 )
# 方式一:检查异常
exc = future.exception()
if exc:
print(f"发生异常: {exc}" )
# 方式二:捕获 result() 抛出的异常
try :
result = future.result()
except ValueError as e:
print(f"捕获异常: {e}" )
# 超时处理
future2 = exe.submit(time.sleep , 10 )
try :
future2.result(timeout=2 ) # 2秒超时
except TimeoutError :
print("任务超时" )
关键要点:
任务内部未捕获的异常会保存在 Future 对象中,调用 result() 时重新抛出
exception() 方法返回异常对象而非抛出,适合条件判断
result(timeout=N) 超时抛出 TimeoutError,需捕获处理
如果不调用 result() 也不调用 exception(),异常会被静默吞掉——可能导致难以调试的 bug
最佳实践: 要么调用 future.result() 捕获所有异常,要么调用 future.exception() 显式检查。永远不要创建 Future 后对其结果不闻不问。
八、cancel取消与回调
cancel() 方法允许取消尚未开始执行的任务。add_done_callback() 则注册一个回调函数,在 Future 完成(正常完成、异常或取消)时自动调用。
from concurrent.futures import ThreadPoolExecutor
import time
def on_done (future):
# 回调接收 Future 自身作为参数
print(f"任务完成: {future.result()}" )
with ThreadPoolExecutor(max_workers=2 ) as exe:
future = exe.submit(time.sleep , 1 )
future.add_done_callback(on_done)
# 尝试取消一个尚未启动的任务
futures = [exe.submit(time.sleep , 5 ) for _ in range (5 )]
cancelled = [f.cancel () for f in futures]
print(f"取消结果: {cancelled}" ) # 对已完成或正在运行的返回 False
cancel 注意事项:
只有处于 "RUNNING" 之前状态的 Future 才能被取消
cancel() 返回 True 表示取消成功,False 表示无法取消(已在运行或已完成)
ProcessPoolExecutor 中由于进程启动有一定开销,取消窗口更窄
add_done_callback 注意事项:
回调函数接收 Future 对象作为唯一参数
回调中应调用 future.result() 或 future.exception() 获取结果
多个回调按添加顺序执行
回调中抛出的异常不会被传播,会被静默忽略
高级技巧: add_done_callback 与 as_completed 本质上是等效的两种方式——回调是"推送"模式,as_completed 是"拉取"模式。在构建响应式系统或事件驱动架构时,回调模式更为自然。