跳到主要内容

Beam — 多模型并行(scatter)与融合(gather)

本章讲什么: Beam 是 big-AGI 的杀手锏:同一个问题,让 N 个模型并行作答,再融合成一个最好的答案。它的实现优雅地建立在第 1 章的 AIX 之上——每条「光线(ray)」就是一次普通的 AIX 流式调用。本章拆解它的 scatter/gather 引擎、状态切片和融合指令链。

1. 它要解决的小问题(零基础)

单个模型会幻觉、会有盲区。**「问三个模型、综合一下」**直觉上更可靠,但手动做很烦:开三个标签页、复制粘贴、再让第四个模型总结。

Beam 把这套流程产品化:

  • Scatter(散射): 你选 N 个模型,它们并行对同一段对话历史作答。每个答案叫一条 ray
  • Gather(聚合): 选一个模型 + 一个「融合配方(factory)」,把 N 条 ray 的答案融合成一个。

类比:Beam 像「专家会诊 + 主治医生拍板」——scatter 是几位专家各自给意见,gather 是主治医生把意见综合成一个诊断。

2. 顶层全景:scatter/gather + 每对话一个 store

你的对话历史 (inputHistory)

┌──────┴───────────── SCATTER(并行)─────────────────┐
▼ ▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│ray 1│ │ray 2│ │ray 3│ │ray 4│ 每条 ray =
│GPT │ │Claude │Gemini│ │... │ 一次标准 AIX 调用
└──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘
│ 各自流式作答(复用 aixChatGenerateContent) │
└──────┬───────┴───────┬───────┴───────┬──────┘
▼ rayMessages[](有内容的那些)
┌────────────── GATHER(融合)──────────────┐
│ 选 1 个模型 + 1 个 fusion factory │
│ 跑「异步指令链」: 指令1 → 指令2 → … │
│ (Gather 指令把所有 ray 拼进 prompt 再问) │
└───────────────────┬────────────────────────┘

一个融合后的答案 → 写回对话

各部件一句话职责:

部件干什么文件
BeamStore每对话一个 vanilla Zustand store,组合三个切片store-beam_vanilla.ts
Root 切片生命周期:open / terminate / 加载配置store-beam_vanilla.ts
Scatter 切片管理 rays[],启停每条 ray 的 AIX 调用scatter/beam.scatter.ts
Gather 切片管理 fusions[],跑融合指令链gather/beam.gather.ts
指令框架把融合拆成可串联的异步指令gather/instructions/beam.gather.execution.tsx

3. 核心原理(逐个机制)

3.1 状态:vanilla Zustand 的「切片」组合

它要解决的小问题: Beam 状态复杂(多条 ray、多个 fusion、各自的运行状态),又要每个对话独立一份高性能、不绑死 React

思路:vanilla Zustand(不带 React hook 的纯 store),createBeamVanillaStore() 每次造一个新实例(store-beam_vanilla.ts:20)。状态按「切片(slice)」拆开,再组合——这是 Zustand 官方的 slices 模式:

// 真实源码节选 store-beam_vanilla.ts:18-26
export type BeamStore = RootStoreSlice & GatherStoreSlice & ScatterStoreSlice;

export const createBeamVanillaStore = (): StoreApi<BeamStore> =>
createVanillaStore<BeamStore>()((...a) => ({
...createRootSlice(...a), // 生命周期
...createScatterSlice(...a), // rays
...createGatherSlice(...a), // fusions
}));

关键细节: ConversationHandler 在构造时 createBeamVanillaStore() 并持有它(ConversationHandler.ts:41)——每个对话一个独立 Beam store,多窗格(multi-pane)互不干扰。open() 时如果已经开着,直接返回不覆盖(store-beam_vanilla.ts:80-83),保护正在跑的 beam。

3.2 Scatter:每条 ray 是一次 AIX 调用

它要解决的小问题: N 个模型并行作答,每个的流式 token 要实时更新到对应卡片。

思路: 一条 ray 就是一次 aixChatGenerateContent_DMessage_FromConversation 调用,它的流式回调直接 _rayUpdate 写进该 ray 的 message。看 rayScatterStart(scatter/beam.scatter.ts:45-122):

// 真实源码节选(简化)scatter/beam.scatter.ts:81-99
aixChatGenerateContent_DMessage_FromConversation(
llmId, scatterSystemInstruction, scatterInputHistory,
'beam-scatter', ray.rayId,
{ abortSignal: abortController.signal,
throttleParallelThreads: getLabsHighPerformance() ? 0 : !playNice ? 1 : rays.length },
onMessageUpdated, // 流式回调 -> _rayUpdate 写进这条 ray
)
.then((status) => {
_rayUpdate(ray.rayId, { // 终态:success / stopped / error
status: status.outcome === 'completed' ? 'success'
: status.outcome === 'aborted' ? 'stopped'
: status.outcome === 'failed' ? 'error' : 'empty',
genAbortController: undefined,
});
});

精华一:并行节流。 throttleParallelThreadsrays.length——并行 ray 越多,每条的 UI 更新节流越狠(避免 N 条流同时刷爆 React)。高性能模式(getLabsHighPerformance())则传 0 不节流。这个值最终喂给 AIX 客户端的 withDecimator(见第 1 章 L1)。

精华二:ray 的状态机。 BRay.status'empty' | 'scattering' | 'success' | 'stopped' | 'error'(:24)。_syncRaysStateToScatter(:453)从所有 ray 派生出 isScattering(还有没有在跑的)和 raysReady(几条可选了),供 UI 用。

精华三:导入已有消息当 ray。 importRays(:252)能把对话里已有的 assistant 消息「收编」成 ray(标 imported: true),复用模型信息——这样你能把历史答案也纳入融合。

3.3 Gather:融合配方与「异步指令链」

它要解决的小问题: 「融合」不是一句 prompt 能搞定的——有时要先让模型列个 checklist、再综合;不同融合策略(找共识 / 挑最好 / 自定义)流程不同。

思路: 把一次融合建模成一条指令链(instruction chain):BFusion 持有 instructions: Instruction[](gather/beam.gather.ts:36-37),按顺序执行,前一条的输出喂给后一条。指令有两类(beam.gather.execution.tsx:36):

指令类型干什么
gather把所有 ray + 对话拼进 prompt,调一次模型
user-input-checklist让模型产出清单、等用户勾选,再继续

融合的执行就是一条 promise 链 reduce(beam.gather.execution.tsx:96-128):

// 真实源码节选(简化)beam.gather.execution.tsx:99-127
let promiseChain: Promise<string> = Promise.resolve('');
for (const instruction of instructions) {
promiseChain = promiseChain.then((precedingValue) => {
// 重置中间消息为占位符,更新进度 UI("2/3 · 标签 ...")
inputState.intermediateDMessage.fragments = [createPlaceholderVoidFragment(GATHER_PLACEHOLDER)];
switch (instruction.type) {
case 'gather':
return executeGatherInstruction(instruction, inputState, precedingValue);
case 'user-input-checklist':
return executeUserInputChecklistInstruction(instruction, inputState, precedingValue);
}
});
}

精华:链式收尾。 链跑完后,.thenstage: 'success',.catch 区分用户中止(stage: 'stopped',不报错)和真错误(stage: 'error'),.finallyintermediateDMessage 定为最终输出(:131-161)。一条 promise 链同时承载了「顺序执行 + 进度更新 + 错误分类 + 输出落地」。

融合配方(factory) 决定一个 fusion 初始有哪些指令(FUSION_FACTORIES,beam.gather.factories.ts)。CUSTOM_FACTORY_ID 是可编辑的自定义配方,fusionRecreateAsCustom(beam.gather.ts:189)能把内置配方克隆成可改的自定义版。

3.4 终止与「保留设置」

它要解决的小问题: 关掉 beam 或重开时,要停掉所有在跑的 ray/fusion,但不能丢用户选的模型和配方

思路: terminateKeepingSettings(store-beam_vanilla.ts:128-133)重置运行态,但通过 reInitScatterStateSlice(state.rays)reInitGatherStateSlice(...) 保留 ray 的模型选择和 fusion 配方:

// 真实源码节选 store-beam_vanilla.ts:128-133
terminateKeepingSettings: () =>
_set(state => ({
...initRootStateSlice(),
...reInitScatterStateSlice(state.rays), // 重建空 ray,但沿用同样的 llm
...reInitGatherStateSlice(state.fusions, state.currentGatherLlmId),
})),

reInitScatterStateSlice(beam.scatter.ts:172)会先 prevRays.forEach(rayScatterStop) 停掉所有在跑的,再造同样数量、同样模型的空 ray。「停掉但记住配置」是 Beam 的一致退出语义。

3.5 reattach:融合 / ray 也能续传

scatter 和 gather 都支持 upstream reattach(第 1 章 3.7):rayReattach(beam.scatter.ts:351)和 fusionReattach(beam.gather.ts:329)在 ray/fusion 的 generator 带 upstreamHandle 时,用 beamReattachStream 重连到厂商后台仍在跑的运行(主要是 Gemini Deep Research 跨页刷新)。乐观地进 scattering/fusing 态(header 显示 Stop),Stop = 脱离(detach),后台运行存活。

4. 巧妙之处(可借鉴)

  • 每条 ray 复用标准 AIX 调用——Beam 几乎没有「自己的网络代码」,并行能力是 AIX 抽象的红利。(beam.scatter.ts:81)
  • vanilla Zustand slices + 每对话一实例:复杂状态拆切片组合,多窗格天然隔离。(store-beam_vanilla.ts:20ConversationHandler.ts:41)
  • 融合 = promise 链 reduce,一条链承载顺序/进度/错误/输出。(beam.gather.execution.tsx:99)
  • 并行节流随 ray 数自适应,防止 N 条流刷爆 UI。(beam.scatter.ts:86)
  • **「停掉但记住设置」**作为统一退出语义。(store-beam_vanilla.ts:128)

5. 边界与局限

  • 融合质量取决于配方与模型:Beam 提供机制(并行 + 链式融合),不保证融合答案一定优于单个最佳 ray。
  • 指令类型目前只有两种(gather / user-input-checklist),default 分支直接 reject(beam.gather.execution.tsx:124-125)。
  • reattach 仅对带 upstreamHandle 的运行有效,即少数支持后台运行的协议。
  • Beam 是 UI 重型功能:N 路并行流 + 实时卡片,移动端/弱机上是性能压力点(故有 high-performance labs 开关)。

6. 横向对比

Beam 是 chat-agents 货架里少见的「多模型 de-hallucination」功能。它和第 1 章的关系是纯增量:

  • 01-aix-streaming.md:每条 ray / 每次融合都走 AIX 的 aixChatGenerateContent_*
  • 03-chat-and-storage.md:ray 和 fusion 的输出都是 DMessage,融合结果通过 beamInvoke 里的 onBeamSuccess 回调写回对话。

7. 代码地图(导航索引)

主题文件关键符号
Beam store 组合src/modules/beam/store-beam_vanilla.tscreateBeamVanillaStoreBeamStoreterminateKeepingSettingsopen
Scatter(rays)src/modules/beam/scatter/beam.scatter.tscreateScatterSlicerayScatterStartBRay_syncRaysStateToScatterrayReattach
Gather(fusions)src/modules/beam/gather/beam.gather.tscreateGatherSliceBFusiontoggleFusionGatheringfusionReattach
融合指令链src/modules/beam/gather/instructions/beam.gather.execution.tsxgatherStartFusionInstructionExecutionInputState
融合配方src/modules/beam/gather/instructions/beam.gather.factories.tsFUSION_FACTORIESCUSTOM_FACTORY_ID
模块级 Beam 配置src/modules/beam/store-module-beam.tsxuseModuleBeamStoreupdateBeamLastConfig
对话侧接入src/common/chat-overlay/ConversationHandler.tsConversationHandlerbeamInvokegetBeamStore