Subagents设计模式总结

子代理设计模式大全

一、设计模式总览

在构建多代理系统时,子代理之间的协作方式直接影响系统的性能、可维护性和可扩展性。以下是五种核心设计模式的横向对比,帮助开发者快速建立全局认知。

五种模式分别对应不同的任务类型和系统架构需求,从集中式控制到去中心化协作,从串行流水线到并行扇出,各有其独特的适用场景和设计权衡。

五种模式核心对比

模式名称 控制方式 通信模式 扩展性 复杂度 适用场景
Master-Worker 集中式(Master控制) 星型(Master-Worker) 中等 明确的父子任务关系
Peer-to-Peer 去中心化 网状(任意通信) 子代理需要直接协作
Pipeline 顺序控制 链式(阶段传递) 有明确处理顺序的工作流
Fan-out/Fan-in 扇出/扇入控制 星型+聚合 很高 中等 大规模独立任务并行处理
Supervisor Tree 层次化控制 树型(父子层级) 很高 超大规模任务的层次化分解
核心洞察:没有一种模式是万能的。实际项目中往往需要根据任务特点组合多种模式,这也是模式选择的核心价值所在。

二、Master-Worker模式详解

Master-Worker模式是Subagents系统中最基础、最常用的设计模式。其核心思想是引入一个中央控制器(Master)负责任务的分发、调度和结果汇总,多个Worker负责具体任务的执行。

架构示意

┌─────────────┐ │ Master │ (负责任务分发、调度、结果汇总) └──────┬──────┘ │ ┌────────┼────────┐ │ │ │ ▼ ▼ ▼ ┌─────┐ ┌─────┐ ┌─────┐ │Worker│ │Worker│ │Worker│ (执行具体子任务) └─────┘ └─────┘ └─────┘
适用场景:明确的父子任务关系,父任务可以拆分为多个独立的子任务,且子任务之间不需要相互通信。

优点

缺点

典型代码结构

# Master节点:负责任务分发与结果汇总 class Master: def __init__(self, workers: list[Worker]): self.workers = workers self.task_queue = Queue() def dispatch(self, tasks: list[Task]): for i, task in enumerate(tasks): worker = self.workers[i % len(self.workers)] worker.assign(task) def collect_results(self) -> list: return [worker.get_result() for worker in self.workers] # Worker节点:执行具体任务 class Worker: def assign(self, task: Task): self.current_task = task def execute(self) -> Result: return self.current_task.process()

三、Peer-to-Peer模式详解

Peer-to-Peer(P2P)模式采用去中心化的设计理念,系统中的每个子代理都是对等的节点,可以直接与其他节点通信和协作。这种模式打破了Master-Worker的集中式控制,赋予了子代理更大的自主性。

架构示意

┌─────────┐ │ Agent │◄─────────►┌─────────┐ │ A │ │ Agent │ └────┬────┘ │ B │ │ └─────────┘ │ │ ▼ │ ┌─────────┐ │ │ Agent │◄──────────────┘ │ C │ └─────────┘
适用场景:子代理之间需要直接协作和协商,任务本身具有较高的动态性和不确定性,不适合中央调度。

优点

缺点

典型代码结构

class PeerAgent: def __init__(self, agent_id: str): self.agent_id = agent_id self.peers = {} # peer_id -> PeerAgent def register_peer(self, peer: 'PeerAgent'): self.peers[peer.agent_id] = peer def send_message(self, to: str, msg: str): if to in self.peers: self.peers[to].receive(self.agent_id, msg) def receive(self, from_id: str, msg: str): # 处理来自其他Peer的消息 print(f"[{self.agent_id}] received from {from_id}: {msg}") def broadcast(self, msg: str): for peer_id in self.peers: self.send_message(peer_id, msg)

四、Pipeline模式详解

Pipeline(流水线)模式将任务的处理过程分解为多个顺序执行的阶段(Stage),每个阶段由特定的子代理负责处理。数据像流水线上的产品一样依次经过每个阶段,每个阶段对数据进行特定的变换或处理。

架构示意

输入数据 │ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Stage 1 │───►│ Stage 2 │───►│ Stage 3 │───►│ Stage 4 │───► 输出 │ 预处理 │ │ 分析 │ │ 转换 │ │ 格式化 │ └──────────┘ └──────────┘ └──────────┘ └──────────┘
适用场景:有明确处理顺序的工作流,每个阶段处理特定的任务,且阶段之间存在清晰的前后依赖关系。

优点

缺点

典型代码结构

class PipelineStage: def __init__(self, name: str): self.name = name self.next_stage = None def set_next(self, stage: 'PipelineStage'): self.next_stage = stage def process(self, data): # 子类实现具体处理逻辑 result = self.execute(data) if self.next_stage: return self.next_stage.process(result) return result def execute(self, data): raise NotImplementedError # 使用示例 stage1 = PreprocessStage("预处理") stage2 = AnalyzeStage("分析") stage3 = FormatStage("格式化") stage1.set_next(stage2) stage2.set_next(stage3) result = stage1.process(input_data)

五、Fan-out/Fan-in模式详解

Fan-out/Fan-in模式是一种并行处理模式,分为两个阶段:Fan-out(扇出)阶段将一个大任务拆分为多个小任务并分发给多个Worker并行处理;Fan-in(扇入)阶段将所有Worker的结果汇集、合并为最终输出。这种模式与Master-Worker有相似之处,但更强调并行执行的彻底性和结果聚合的策略。

架构示意

┌─────────────┐ │ Split │ (Fan-out:任务拆分与分发) └──────┬──────┘ │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌─────┐ ┌─────┐ ┌─────┐ │Worker│ │Worker│ │Worker│ (并行执行) └──┬──┘ └──┬──┘ └──┬──┘ │ │ │ └────────────┼────────────┘ │ ┌─────▼──────┐ │ Merge │ (Fan-in:结果汇集与合并) └────────────┘
适用场景:大规模独立任务并行处理,子任务之间没有依赖关系,可以同时执行以显著缩短整体处理时间。

优点

缺点

典型代码结构

import asyncio async def fan_out_fan_in(tasks: list[Task], workers: list[Worker]): # Fan-out:并发分发所有任务 async with asyncio.TaskGroup() as tg: worker_tasks = [] for i, task in enumerate(tasks): worker = workers[i % len(workers)] worker_tasks.append( tg.create_task(worker.execute(task)) ) # Fan-in:等待所有任务完成并收集结果 results = [] for wt in worker_tasks: try: result = await wt results.append(result) except Exception as e: log_error(e) # 处理失败的Worker # 合并结果 return merge_results(results)

六、Supervisor Tree模式详解

Supervisor Tree(监督者树)模式源自Erlang/OTP的经典设计,通过层次化的监督结构来管理子代理。每个监督者(Supervisor)负责管理一组子代理,当子代理发生故障时,监督者根据预定义的策略(重启、停止、升级等)进行处理。这种模式将错误处理提升为系统架构的一等公民。

架构示意

┌──────────────┐ │ Root Supervisor│ └──────┬───────┘ │ ┌──────────────┼──────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Supervisor│ │ Supervisor│ │ Supervisor│ (中层监督者) │ Level 1 │ │ Level 1 │ │ Level 1 │ └──┬───┬───┘ └──┬───┬───┘ └──┬───┬───┘ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ │W1│ │W2│ │W3│ │W4│ │W5│ │W6│ (工作节点) └──┘ └──┘ └──┘ └──┘ └──┘ └──┘
适用场景:超大规模任务的层次化分解,需要模块化管理和强大的错误隔离能力,对系统可靠性要求高的生产环境。

优点

缺点

典型代码结构

class Supervisor: def __init__(self, strategy: str = "one_for_one"): self.children = {} self.strategy = strategy # 重启策略 self.max_restarts = 3 self.restart_count = 0 def add_child(self, child): self.children[child.name] = child def handle_failure(self, child_name: str): if self.restart_count >= self.max_restarts: self.escalate_failure(child_name) # 上报给父监督者 return child = self.children[child_name] new_child = child.restart() self.children[child_name] = new_child self.restart_count += 1 def escalate_failure(self, child_name: str): # 通知父Supervisor处理 if self.parent: self.parent.handle_failure(self.name)

七、模式选择指南

选择合适的子代理设计模式是构建高效多代理系统的关键决策。下面提供一个结构化的决策框架,帮助开发者根据任务特征做出合理选择。在实际项目中,复杂任务往往需要组合多种模式才能达到最佳效果。

决策树

Q1: 子任务之间是否需要相互通信/协作?
├─ 需要Peer-to-PeerSupervisor Tree(规模大时)
└─ 不需要 → 进入 Q2
Q2: 任务是否有明确的处理顺序/阶段?
├─ Pipeline
└─ 没有 → 进入 Q3
Q3: 子任务是否可以并行执行?
├─ 可以 → 进入 Q4
└─ 不可以Master-Worker
Q4: 任务规模是否很大(100+子任务)?
├─ Fan-out/Fan-inSupervisor Tree
└─ Master-Worker

组合模式建议

在实际工程项目中,很少有单一模式能完美解决所有问题。以下是一些经过实践验证的模式组合方案:

Master-Worker + Pipeline
Master负责任务的宏观调度,每个阶段内部采用Pipeline流水线处理。适合既有结构又需要并行加速的场景。
Fan-out/Fan-in + Supervisor Tree
使用Fan-out/Fan-in进行大规模并行处理,外层用Supervisor Tree管理Worker的生命周期和容错。兼顾性能与可靠性。
Master-Worker + Peer-to-Peer
Master负责任务调度,但在Worker需要协作的子任务中使用P2P局部网络。平衡了集中控制和灵活性。
Supervisor Tree + Pipeline
每个监督子树内部是一个完整的Pipeline,通过树型结构组织多条流水线。适合超大规模的数据处理系统。

按任务类型快速选择

任务类型 推荐模式 原因
数据爬取/采集 Fan-out/Fan-in 大量独立URL,天然可并行
文档处理流水线 Pipeline 解析→分析→生成,阶段清晰
多模型投票/协商 Peer-to-Peer Agent之间需要讨论和共识
批量数据分析 Master-Worker 任务明确,Worker之间无依赖
企业级微服务编排 Supervisor Tree 需要层次化管理和高可用保障

核心要点总结:

1. 没有银弹——根据具体任务特征选择最合适的模式。

2. 复杂任务优先考虑模式组合,而非单一模式。

3. 从简单开始(如Master-Worker),随着系统演进逐步引入更复杂的模式。

4. 始终考虑容错和错误处理,生产环境推荐Supervisor Tree或其变体。

5. 记录和监控是分布式系统的生命线,无论选择哪种模式都要保证可观测性。