一、设计模式总览
在构建多代理系统时,子代理之间的协作方式直接影响系统的性能、可维护性和可扩展性。以下是五种核心设计模式的横向对比,帮助开发者快速建立全局认知。
五种模式分别对应不同的任务类型和系统架构需求,从集中式控制到去中心化协作,从串行流水线到并行扇出,各有其独特的适用场景和设计权衡。
五种模式核心对比
| 模式名称 |
控制方式 |
通信模式 |
扩展性 |
复杂度 |
适用场景 |
| Master-Worker |
集中式(Master控制) |
星型(Master-Worker) |
中等 |
低 |
明确的父子任务关系 |
| Peer-to-Peer |
去中心化 |
网状(任意通信) |
高 |
高 |
子代理需要直接协作 |
| Pipeline |
顺序控制 |
链式(阶段传递) |
低 |
低 |
有明确处理顺序的工作流 |
| Fan-out/Fan-in |
扇出/扇入控制 |
星型+聚合 |
很高 |
中等 |
大规模独立任务并行处理 |
| Supervisor Tree |
层次化控制 |
树型(父子层级) |
很高 |
高 |
超大规模任务的层次化分解 |
核心洞察:没有一种模式是万能的。实际项目中往往需要根据任务特点组合多种模式,这也是模式选择的核心价值所在。
二、Master-Worker模式详解
Master-Worker模式是Subagents系统中最基础、最常用的设计模式。其核心思想是引入一个中央控制器(Master)负责任务的分发、调度和结果汇总,多个Worker负责具体任务的执行。
架构示意
┌─────────────┐
│ Master │ (负责任务分发、调度、结果汇总)
└──────┬──────┘
│
┌────────┼────────┐
│ │ │
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│Worker│ │Worker│ │Worker│ (执行具体子任务)
└─────┘ └─────┘ └─────┘
适用场景:明确的父子任务关系,父任务可以拆分为多个独立的子任务,且子任务之间不需要相互通信。
优点
- 控制集中:Master集中管理所有Worker,任务分发和状态监控非常直观。所有决策在单一节点完成,易于实现一致性保证。
- 任务分配简单:不需要复杂的发现机制或协调协议,Worker启动后向Master注册即可获取任务。
- 容错处理相对容易:当某个Worker失败时,Master可以将失败任务重新分配给其他Worker,或在无可用Worker时优雅降级。
- 易于监控和调试:所有通信经过Master,可以在Master层记录完整的任务执行日志和执行链路。
缺点
- Master单点瓶颈:当Worker数量增加时,Master成为通信和调度的瓶颈。所有请求必须经过Master,导致系统的最大吞吐量受限于Master的处理能力。
- 单点故障风险:如果Master崩溃,整个系统随之瘫痪。需要额外的容错机制(如Master主备切换)来保证高可用性。
- Worker间协作困难:Worker之间不能直接通信,如果需要协作必须经过Master中转,增加了通信延迟和Master负载。
典型代码结构
# Master节点:负责任务分发与结果汇总
class Master:
def __init__(self, workers: list[Worker]):
self.workers = workers
self.task_queue = Queue()
def dispatch(self, tasks: list[Task]):
for i, task in enumerate(tasks):
worker = self.workers[i % len(self.workers)]
worker.assign(task)
def collect_results(self) -> list:
return [worker.get_result() for worker in self.workers]
# Worker节点:执行具体任务
class Worker:
def assign(self, task: Task):
self.current_task = task
def execute(self) -> Result:
return self.current_task.process()
三、Peer-to-Peer模式详解
Peer-to-Peer(P2P)模式采用去中心化的设计理念,系统中的每个子代理都是对等的节点,可以直接与其他节点通信和协作。这种模式打破了Master-Worker的集中式控制,赋予了子代理更大的自主性。
架构示意
┌─────────┐
│ Agent │◄─────────►┌─────────┐
│ A │ │ Agent │
└────┬────┘ │ B │
│ └─────────┘
│ │
▼ │
┌─────────┐ │
│ Agent │◄──────────────┘
│ C │
└─────────┘
适用场景:子代理之间需要直接协作和协商,任务本身具有较高的动态性和不确定性,不适合中央调度。
优点
- 去中心化:不存在单点故障和性能瓶颈。每个Agent独立运行,系统的稳健性和可用性显著提高。
- 高度灵活:Agent可以动态加入或离开系统而不影响整体运行,适合高度动态和不确定的任务环境。
- 自然支持协作:Agent之间可以直接通信和协商,非常适合需要多方讨论、投票或达成共识的场景。
- 扩展性好:新增节点不需要修改中央调度逻辑,只需让新节点向其他节点宣告自己的存在即可。
缺点
- 复杂性高:没有中央控制器,协调机制需要分布式协议(如共识算法、选举机制)来保证系统一致性,实现成本高。
- 协调困难:当多个Agent同时访问共享资源或产生冲突决策时,解决冲突需要额外的协议开销。
- 调试困难:分布式系统的异步通信使得问题排查变得复杂,缺乏单一的日志汇聚点和全局状态视图。
- 通信开销大:Agent之间频繁的直接通信可能产生大量的网络流量,在节点数量增多时呈指数级增长。
典型代码结构
class PeerAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.peers = {} # peer_id -> PeerAgent
def register_peer(self, peer: 'PeerAgent'):
self.peers[peer.agent_id] = peer
def send_message(self, to: str, msg: str):
if to in self.peers:
self.peers[to].receive(self.agent_id, msg)
def receive(self, from_id: str, msg: str):
# 处理来自其他Peer的消息
print(f"[{self.agent_id}] received from {from_id}: {msg}")
def broadcast(self, msg: str):
for peer_id in self.peers:
self.send_message(peer_id, msg)
四、Pipeline模式详解
Pipeline(流水线)模式将任务的处理过程分解为多个顺序执行的阶段(Stage),每个阶段由特定的子代理负责处理。数据像流水线上的产品一样依次经过每个阶段,每个阶段对数据进行特定的变换或处理。
架构示意
输入数据
│
▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Stage 1 │───►│ Stage 2 │───►│ Stage 3 │───►│ Stage 4 │───► 输出
│ 预处理 │ │ 分析 │ │ 转换 │ │ 格式化 │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
适用场景:有明确处理顺序的工作流,每个阶段处理特定的任务,且阶段之间存在清晰的前后依赖关系。
优点
- 阶段清晰:每个阶段的职责单一且明确,符合单一职责原则。代码高内聚低耦合,便于独立开发和测试各个阶段。
- 易于理解:处理流程的可见性高,数据流从头到尾一目了然。新成员可以快速理解系统的整体处理逻辑。
- 可插拔:每个阶段可以独立替换、升级或复用,不影响其他阶段。例如可以替换某个阶段而不改变上下游的接口。
- 天然支持并行:不同阶段可以并行处理不同的数据项(类似CPU指令流水线),提高整体吞吐量。
缺点
- 串行瓶颈:整体速度受限于最慢的阶段。如果某个阶段处理耗时显著高于其他阶段,会成为整个流水线的瓶颈。
- 阶段间耦合:每个阶段的输出格式必须严格符合下一阶段的输入要求,接口变更可能引发链式修改。
- 错误传播:前序阶段的错误会沿着流水线向后传播,后期阶段需要处理各种异常输入,增加了错误处理的复杂度。
典型代码结构
class PipelineStage:
def __init__(self, name: str):
self.name = name
self.next_stage = None
def set_next(self, stage: 'PipelineStage'):
self.next_stage = stage
def process(self, data):
# 子类实现具体处理逻辑
result = self.execute(data)
if self.next_stage:
return self.next_stage.process(result)
return result
def execute(self, data):
raise NotImplementedError
# 使用示例
stage1 = PreprocessStage("预处理")
stage2 = AnalyzeStage("分析")
stage3 = FormatStage("格式化")
stage1.set_next(stage2)
stage2.set_next(stage3)
result = stage1.process(input_data)
五、Fan-out/Fan-in模式详解
Fan-out/Fan-in模式是一种并行处理模式,分为两个阶段:Fan-out(扇出)阶段将一个大任务拆分为多个小任务并分发给多个Worker并行处理;Fan-in(扇入)阶段将所有Worker的结果汇集、合并为最终输出。这种模式与Master-Worker有相似之处,但更强调并行执行的彻底性和结果聚合的策略。
架构示意
┌─────────────┐
│ Split │ (Fan-out:任务拆分与分发)
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│Worker│ │Worker│ │Worker│ (并行执行)
└──┬──┘ └──┬──┘ └──┬──┘
│ │ │
└────────────┼────────────┘
│
┌─────▼──────┐
│ Merge │ (Fan-in:结果汇集与合并)
└────────────┘
适用场景:大规模独立任务并行处理,子任务之间没有依赖关系,可以同时执行以显著缩短整体处理时间。
优点
- 高并行度:理论上可以同时启动任意数量的Worker同时处理,充分利用计算资源。处理大任务的时间可以大幅缩短。
- 显著加速:对于可并行化的任务,加速比接近Worker数量(Amdahl定律限制下),比串行处理快数倍甚至数十倍。
- 弹性伸缩:可以根据任务量动态调整Worker数量,在负载高时横向扩展,负载低时释放资源。
- 部分容错:个别Worker失败不影响其他Worker的执行,只需要在Fan-in阶段处理失败的结果即可。
缺点
- 结果汇集复杂:当Worker数量增加时,结果汇集和合并的逻辑变得复杂。需要处理部分失败、超时、结果顺序错乱等问题。
- 资源消耗大:大量并发的Worker可能消耗大量的内存和计算资源,需要合理的资源限制和调度策略。
- 负载均衡挑战:如果任务拆分不均匀,部分Worker可能提前完成而闲置,造成资源浪费和整体效率下降。
典型代码结构
import asyncio
async def fan_out_fan_in(tasks: list[Task], workers: list[Worker]):
# Fan-out:并发分发所有任务
async with asyncio.TaskGroup() as tg:
worker_tasks = []
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
worker_tasks.append(
tg.create_task(worker.execute(task))
)
# Fan-in:等待所有任务完成并收集结果
results = []
for wt in worker_tasks:
try:
result = await wt
results.append(result)
except Exception as e:
log_error(e) # 处理失败的Worker
# 合并结果
return merge_results(results)
六、Supervisor Tree模式详解
Supervisor Tree(监督者树)模式源自Erlang/OTP的经典设计,通过层次化的监督结构来管理子代理。每个监督者(Supervisor)负责管理一组子代理,当子代理发生故障时,监督者根据预定义的策略(重启、停止、升级等)进行处理。这种模式将错误处理提升为系统架构的一等公民。
架构示意
┌──────────────┐
│ Root Supervisor│
└──────┬───────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Supervisor│ │ Supervisor│ │ Supervisor│ (中层监督者)
│ Level 1 │ │ Level 1 │ │ Level 1 │
└──┬───┬───┘ └──┬───┬───┘ └──┬───┬───┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐
│W1│ │W2│ │W3│ │W4│ │W5│ │W6│ (工作节点)
└──┘ └──┘ └──┘ └──┘ └──┘ └──┘
适用场景:超大规模任务的层次化分解,需要模块化管理和强大的错误隔离能力,对系统可靠性要求高的生产环境。
优点
- 模块化:每个子树可以独立开发、测试、部署。系统由多个自治的模块组成,降低了整体复杂性。
- 错误隔离:子代理的崩溃不会扩散到其他子树。错误被限制在局部范围内,由上层监督者处理。
- 高度可扩展:可以通过添加子树来扩展系统,而不影响现有结构。理论上可以管理任意规模的子代理集群。
- 优雅的容错策略:支持多种容错策略:单次重启、多次重启、完全停止、升级等,可以根据业务需求灵活配置。
缺点
- 层级通信开销:跨层级的通信需要经过中间节点转发,增加了延迟。深层嵌套的树结构可能导致显著的通信开销。
- 配置复杂:需要为每个监督者配置监督策略、子代理规格、重启限制等,系统启动和运维的复杂度较高。
- 调试困难:层次化的结构使得问题排查需要逐层追踪,错误可能发生在树中的任何位置。
典型代码结构
class Supervisor:
def __init__(self, strategy: str = "one_for_one"):
self.children = {}
self.strategy = strategy # 重启策略
self.max_restarts = 3
self.restart_count = 0
def add_child(self, child):
self.children[child.name] = child
def handle_failure(self, child_name: str):
if self.restart_count >= self.max_restarts:
self.escalate_failure(child_name) # 上报给父监督者
return
child = self.children[child_name]
new_child = child.restart()
self.children[child_name] = new_child
self.restart_count += 1
def escalate_failure(self, child_name: str):
# 通知父Supervisor处理
if self.parent:
self.parent.handle_failure(self.name)
七、模式选择指南
选择合适的子代理设计模式是构建高效多代理系统的关键决策。下面提供一个结构化的决策框架,帮助开发者根据任务特征做出合理选择。在实际项目中,复杂任务往往需要组合多种模式才能达到最佳效果。
决策树
Q1: 子任务之间是否需要相互通信/协作?
├─ 需要 → Peer-to-Peer 或 Supervisor Tree(规模大时)
└─ 不需要 → 进入 Q2
Q2: 任务是否有明确的处理顺序/阶段?
├─ 有 → Pipeline
└─ 没有 → 进入 Q3
Q3: 子任务是否可以并行执行?
├─ 可以 → 进入 Q4
└─ 不可以 → Master-Worker
Q4: 任务规模是否很大(100+子任务)?
├─ 大 → Fan-out/Fan-in 或 Supervisor Tree
└─ 小 → Master-Worker
组合模式建议
在实际工程项目中,很少有单一模式能完美解决所有问题。以下是一些经过实践验证的模式组合方案:
Master-Worker + Pipeline
Master负责任务的宏观调度,每个阶段内部采用Pipeline流水线处理。适合既有结构又需要并行加速的场景。
Fan-out/Fan-in + Supervisor Tree
使用Fan-out/Fan-in进行大规模并行处理,外层用Supervisor Tree管理Worker的生命周期和容错。兼顾性能与可靠性。
Master-Worker + Peer-to-Peer
Master负责任务调度,但在Worker需要协作的子任务中使用P2P局部网络。平衡了集中控制和灵活性。
Supervisor Tree + Pipeline
每个监督子树内部是一个完整的Pipeline,通过树型结构组织多条流水线。适合超大规模的数据处理系统。
按任务类型快速选择
| 任务类型 |
推荐模式 |
原因 |
| 数据爬取/采集 |
Fan-out/Fan-in |
大量独立URL,天然可并行 |
| 文档处理流水线 |
Pipeline |
解析→分析→生成,阶段清晰 |
| 多模型投票/协商 |
Peer-to-Peer |
Agent之间需要讨论和共识 |
| 批量数据分析 |
Master-Worker |
任务明确,Worker之间无依赖 |
| 企业级微服务编排 |
Supervisor Tree |
需要层次化管理和高可用保障 |
核心要点总结:
1. 没有银弹——根据具体任务特征选择最合适的模式。
2. 复杂任务优先考虑模式组合,而非单一模式。
3. 从简单开始(如Master-Worker),随着系统演进逐步引入更复杂的模式。
4. 始终考虑容错和错误处理,生产环境推荐Supervisor Tree或其变体。
5. 记录和监控是分布式系统的生命线,无论选择哪种模式都要保证可观测性。