扇出/扇入模式(Fan-out/Fan-in)

扇出扇入并行处理模式

一、扇出/扇入模式概述

扇出/扇入模式(Fan-out/Fan-in)是一种经典的并行计算设计模式,在子代理架构中具有广泛的应用价值。该模式的核心思想是将一个大型任务分解为多个可以独立执行的子任务(Fan-out),然后将这些子任务的结果汇集整合为最终输出(Fan-in),从而实现高效的并行处理。

Fan-out(扇出):指将一个主任务拆分为多个并行执行的子任务,并分配给不同的子代理同时处理。这是一种一对多的通信模式,主代理将任务分发出去后,各子代理在各自独立的上下文中并行工作,互不干扰。

Fan-in(扇入):指将多个子代理执行完毕后返回的结果进行汇集和整合,形成统一的最终输出。这是一种多对一的通信模式,主代理等待所有子任务完成后,收集结果并进行合并、排序、投票或加权等处理。

扇出/扇入模式本质上是一种 M:1 或 多:1 的通信模式,它特别适合大规模并行处理场景。与简单的串行处理相比,扇出/扇入能够充分利用多代理系统的计算资源,大幅缩短整体任务的处理时间。在子代理架构中,这种模式是实现水平扩展和并行加速的基础。

核心价值:扇出/扇入模式的最大优势在于通过并行化减少总处理时间。如果串行执行需要 T 时间,扇出到 N 个子代理并行执行,理想情况下 Fan-out 阶段只需 T/N 时间,再加上 Fan-in 汇集的少量开销。这在大规模数据处理、代码生成、内容分析等场景中效果尤为显著。

适用场景

设计原则:扇出/扇入模式的核心设计原则是"子任务之间无依赖"。只有当子任务可以完全独立并行执行时,扇出/扇入才能达到最优效果。如果子任务之间存在数据依赖,则需要考虑使用流水线模式或其他串行化的处理方式。

二、Fan-out 扇出策略

Fan-out 扇出的核心问题在于如何将主任务合理地拆分为多个可并行执行的子任务。不同的拆分策略适用于不同的场景,选择合适的扇出策略直接影响并行处理的效率和最终结果的质量。以下是几种常见的扇出策略:

按数据分割
将大文件或大数据集按逻辑边界拆分为多个数据块,每个子代理处理一个数据块。适合文本分析、日志处理、数据转换等场景。
按功能分割
根据功能模块进行拆分,每个子代理负责一个独立的功能单元。适合代码生成、系统设计、模块开发等场景。
按维度分割
从不同维度或角度对同一任务进行分析,每个子代理负责一个分析维度。适合多维度评估、全方位审查、多角度分析等场景。
按策略分割
采用不同的处理策略或算法执行同一任务,由各子代理独立尝试。适合需要探索多种方案、对比不同方法优劣的场景。

按数据分割

按数据分割是最常见也最直观的扇出策略。它将输入数据按照某种规则切分为若干数据块,每个子代理处理一个数据块。这种策略适用于大文件处理、批量数据转换、日志分析等场景。数据分割的关键在于找到合适的分割点,确保每个数据块的大小相对均衡,并且数据块的边界不会破坏数据的完整性。

例如,在处理一个大型文本文件时,可以按章节、段落或固定行数进行分割。每个子代理读取自己负责的数据块,进行独立的分析和处理。最后在 Fan-in 阶段将所有子代理的处理结果按原始顺序拼接起来。

按功能分割

按功能分割是将一个完整的系统或产品按功能模块进行拆分,每个子代理负责一个独立的功能单元。这种策略特别适合代码生成、系统架构设计等场景。每个子代理可以专注于自己负责的功能模块,生成高质量的代码或设计文档。

例如,在生成一个 Web 应用的代码时,可以将任务拆分为前端页面、后端 API、数据库模型、配置部署等子任务,分别分配给不同的子代理。每个子代理在自己的专业领域内工作,最终在 Fan-in 阶段将各模块集成在一起。

按维度分割

按维度分割是对同一事物从不同视角或维度进行分析。每个子代理采用相同的数据源,但从不同的分析维度出发,得出各自的结论。这种策略适合需要全方位评估的场景。

例如,在评估一段代码质量时,可以同时派发多个维度的审查任务:代码安全性审查、性能优化审查、可维护性审查、代码规范审查等。每个子代理从各自专业角度给出评估意见,最后通过 Fan-in 汇集为一份综合报告。

扇出的深度和宽度控制

扇出的深度(递归层级)和宽度(并行子代理数量)需要根据实际情况进行合理控制。扇出宽度越大,并行度越高,但同时也带来更大的管理开销和资源消耗。扇出深度越深,任务分解越细,但协调复杂度也随之增加。

在实际应用中,扇出宽度的选择需要综合考虑以下因素:可用子代理数量、任务的粒度、系统资源限制、预期的加速比。通常建议扇出宽度控制在 3-10 个子代理之间,既能获得明显的加速效果,又不会引入过度的管理开销。对于有嵌套需求的任务,扇出深度一般不超过 2-3 层,以避免过度复杂的任务依赖关系。

关键要点:扇出策略的选择决定了并行处理的效率和效果。按数据分割适合数据密集型任务,按功能分割适合系统集成型任务,按维度分割适合分析评估型任务。在实际应用中,也可以混合使用多种扇出策略以达到最佳效果。

三、Fan-in 汇集策略

Fan-in 汇集阶段负责将多个子代理的输出结果整合为统一的最终结果。不同的汇集策略适用于不同的场景,选择合适的汇集策略直接影响输出结果的质量和可用性。以下是四种常见的 Fan-in 汇集策略:

简单合并(Concatenation)

简单合并是最直接的汇集策略,即将多个子代理的输出结果按照预定义的顺序直接拼接在一起。这种策略适用于子任务之间没有重叠且结果可以线性组合的场景。

典型应用:并行代码生成后,将各子代理生成的模块代码按照项目结构合并为一个完整的代码库;大文件分块处理后,将各块的处理结果按原始顺序拼接为完整输出。简单合并的优点是实现简单、开销小,但要求子任务之间必须有清晰的分界线且结果之间没有冲突。

// 简单合并的伪代码 function fanInSimpleMerge(results) { // 按子任务编号排序 results.sort((a, b) => a.taskId - b.taskId); // 直接拼接所有结果 return results.map(r => r.data).join(''); }

投票决策(Voting)

投票决策策略适用于需要从多个候选结果中选出最优方案的场景。多个子代理对同一任务以相同或不同的方式进行处理,各自得到一份输出。Fan-in 阶段对多份输出进行比较和投票,选择得票最高的结果作为最终输出。

典型应用:代码审查场景中,多个子代理独立审查同一段代码,各自提出修改建议。Fan-in 阶段统计各条建议的出现频率,只有被多数子代理认可的建议才被采纳。投票机制可以有效过滤掉个别子代理的偏差或错误,提高最终结果的可靠性。

// 投票决策的伪代码 function fanInVoting(results, threshold = 0.5) { // 统计每个建议的支持票数 const voteCount = {}; for (const result of results) { for (const suggestion of result.suggestions) { voteCount[suggestion.id] = (voteCount[suggestion.id] || 0) + 1; } } // 只采纳超过阈值的建议 const minVotes = Math.ceil(results.length * threshold); return Object.entries(voteCount) .filter(([, count]) => count >= minVotes) .map(([id]) => id); }

加权汇总(Weighted Aggregation)

加权汇总策略为不同子代理分配不同的权重,最终的汇集结果按照权重组合各子代理的输出。这种策略适用于各子代理的专业能力或可信度不同的场景。

典型应用:在评估打分场景中,主代理可以根据各子代理的历史准确率、专业领域匹配度等因素为其分配权重。某个子代理在特定领域的专业度越高,其评估结果的权重就越高。最终的综合评分是各子代理评分的加权平均。

// 加权汇总的伪代码 function fanInWeighted(results, weights) { let totalScore = 0; let totalWeight = 0; for (const result of results) { const weight = weights[result.agentId] || 1.0; totalScore += result.score * weight; totalWeight += weight; } // 返回加权平均分 return totalScore / totalWeight; }

冲突解决(Conflict Resolution)

冲突解决策略处理的是多个子代理输出结果存在矛盾或冲突的情况。当子代理各自得出相互矛盾的结论时,Fan-in 阶段需要有一套机制来解决这些冲突,生成一致的最终结果。

典型应用:在数据清洗场景中,多个子代理对同一数据字段给出不同的清洗建议。Fan-in 阶段需要根据预先定义的规则(如信任度优先、多数原则、最新原则等)决定采纳哪个子代理的建议。对于无法自动解决的冲突,可以将冲突信息标记出来,交由人工裁决。

策略选择指南:简单合并适用于结果可线性拼接的场景;投票决策适用于需要质量过滤的场景;加权汇总适用于子代理能力不均的场景;冲突解决适用于结果可能相互矛盾的场景。实践中可以组合使用多种汇集策略,例如先用投票过滤低质量结果,再用加权汇总计算最终输出。

四、扇出扇入的实现

本节详细描述在子代理架构中如何具体实现扇出/扇入模式。实现分为四个核心阶段:主代理创建和分发子任务、各子代理并行执行、子代理提交结果、主代理汇集整合。

阶段一:Master 创建 Worker

主代理(Master)根据扇出策略将主任务拆分为多个子任务,并为每个子任务创建一个子代理(Worker)实例。每个子代理被赋予独立的执行上下文,包括:任务 ID、任务描述、输入数据、系统提示词(System Prompt)以及独立的内存空间。

// Master 创建 Worker 的伪代码 async function fanOut(mainTask, fanOutStrategy) { // 根据扇出策略生成子任务列表 const subTasks = decomposeTask(mainTask, fanOutStrategy); const workers = []; for (const subTask of subTasks) { // 为每个子任务创建一个 Worker const worker = createSubAgent({ taskId: subTask.id, description: subTask.description, input: subTask.data, systemPrompt: buildPrompt(subTask), context: { independent: true } }); workers.push(worker); } // 启动所有 Worker 并行执行 const results = await Promise.allSettled( workers.map(worker => worker.execute()) ); // 进入 Fan-in 阶段 return fanIn(results, mainTask.fanInStrategy); }

阶段二:Worker 独立执行

每个 Worker 在完全独立的上下文中执行自己的子任务。Worker 之间不通信、不共享状态,从而避免了竞态条件和同步问题。这种隔离设计是扇出/扇入模式能够安全并行的基础。每个 Worker 拥有独立的 Token 预算和执行时间配额,互不影响。

阶段三:Worker 提交结果

当某个 Worker 完成执行后,它会更新自己的任务状态为"已完成",并将执行结果提交给主代理。任务状态通常包括:pending(待执行)、running(执行中)、completed(已完成)、failed(失败)、timeout(超时)。主代理会持续跟踪所有 Worker 的状态变化。

阶段四:Master 执行 Fan-in

主代理等待所有 Worker 完成(或达到超时限制)后,根据预定义的汇集策略对所有结果进行整合。这是扇出/扇入模式的最后一步,也是最关键的一步——将分散的并行结果转化为统一的、有价值的最终输出。

// Fan-in 汇集的完整实现 async function fanIn(results, strategy) { // 分离成功和失败的结果 const succeeded = results.filter(r => r.status === 'fulfilled'); const failed = results.filter(r => r.status === 'rejected'); // 根据策略选择不同的汇集方式 switch (strategy.type) { case 'merge': return mergeResults(succeeded.map(r => r.value)); case 'voting': return voteOnResults(succeeded.map(r => r.value), strategy.threshold); case 'weighted': return weightedAggregate(succeeded.map(r => r.value), strategy.weights); case 'conflict-resolve': return resolveConflicts(succeeded.map(r => r.value), strategy.rules); default: throw new Error(`Unknown Fan-in strategy: ${strategy.type}`); } }

关键要点:扇出扇入的实现核心在于四个阶段的有序衔接。Master 要确保扇出时子任务互不依赖,Worker 要保证独立执行的隔离性,结果提交通常采用异步回调或状态轮询,Fan-in 阶段的汇集策略需要根据任务特点预先配置。使用 Promise.allSettled 而非 Promise.all 可以确保部分 Worker 失败时不影响其他结果的汇集。

五、扇出扇入的异常处理

在实际运行中,扇出/扇入模式不可避免地会遇到各种异常情况。一个健壮的扇出/扇入实现必须包含完善的异常处理机制,确保在部分子代理失败的情况下系统仍能产生可用的结果。

部分 Worker 失败时的处理

当部分 Worker 执行失败时,系统不应直接放弃整个任务,而是应该采取以下策略:

// 带重试机制的 Worker 执行 async function executeWithRetry(worker, maxRetries = 3) { for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await worker.execute(); } catch (error) { // 致命错误不重试 if (error.isFatal) throw error; console.log(`Worker ${worker.taskId} 第 ${attempt} 次重试`); // 退避等待:指数递增 await sleep(Math.pow(2, attempt) * 1000); } } throw new Error(`Worker ${worker.taskId} 重试 ${maxRetries} 次后仍失败`); }

部分结果可用时的处理

当无法获得全部子任务的结果时,系统应充分利用已成功完成的部分结果。这种"尽最大努力"的策略在许多场景下比完全失败更优:

某个 Worker 异常慢的处理

在并行系统中,最慢的 Worker 决定了整体任务的完成时间(木桶效应)。异常慢的 Worker(Straggler)会严重拖累整个扇出/扇入的执行效率。针对这种情况,有以下处理方案:

// 带超时和备份机制的扇出执行 async function fanOutWithTimeout(workers, timeoutMs = 30000) { const results = await Promise.allSettled(workers.map(worker => Promise.race([ executeWithRetry(worker), timeout(timeoutMs, `Worker ${worker.taskId} 超时`) ]) )); const succeeded = results.filter(r => r.status === 'fulfilled'); const failed = results.filter(r => r.status === 'rejected'); console.log(`成功: ${succeeded.length}/${workers.length}`); // 如果有足够的成功结果,继续汇集 if (succeeded.length >= Math.ceil(workers.length * 0.5)) { return await fanIn(succeeded.map(r => r.value)); } // 成功结果不足,抛出异常 throw new Error(`成功率过低: ${succeeded.length}/${workers.length}`); }
注意事项:异常处理的设计要在"健壮性"和"准确性"之间取得平衡。过于宽容的异常处理可能导致低质量的结果被采纳;过于严格的异常处理可能导致任务频繁失败。建议根据任务的关键程度设置不同的异常容忍度——关键任务要求 100% 成功,非关键任务可以接受部分结果。