跳到主要内容

AIX — 把 20+ 家厂商流式协议统一成「粒子流」

本章讲什么: AIX(AI eXchange)是 big-AGI 工程含量最高的子系统。它解决一个具体的难题:20 多家厂商的流式聊天 API,格式各不相同,怎么让上层只面对一种统一的流? 答案是「粒子(particle)」这层中间表示。读完你会懂这条流水线的每一站。

1. 它要解决的小问题(零基础)

你想让应用同时支持 OpenAI、Anthropic、Gemini……但它们的流式响应长得完全不一样:

  • OpenAI ChatCompletions:SSE,每个 chunk 是 choices[].delta,以 [DONE] 结尾。
  • Anthropic:SSE,有 content_block_start / content_block_delta / message_stop 等命名事件。
  • Gemini:自己的 JSON 流格式;还有个跑后台任务的 Interactions API。
  • Bedrock:二进制 aws-eventstream

如果让 UI 直接处理这些差异,那是一场灾难。AIX 的思路是插一层「粒子」:无论厂商怎么发,服务端都把它翻译成同一套小消息(粒子);客户端只认粒子,把它们重组成片段。厂商差异被关进服务端一个 switch 里,上层永远干净。

2. 顶层全景:三段式,粒子是中间货币

AIX 的官方架构注释是 Client <-- (intake) --> Server <-- (dispatch) --> AI Service(aix.router.ts:35)。展开成数据流:

┌──────────────────────── 浏览器 ────────────────────────┐
│ aix.client.ts │
│ L2 DMessage 层 → L1 流式循环 → ContentReassembler │
│ (对话转请求) (重试/续传) (粒子 → 片段) │
└───────────────┬─────────────────────────▲──────────────┘
│ tRPC mutate (stream) │ 粒子流
│ 或 CSF 浏览器直连 │ AixWire_Particles
┌───────────────▼─────────────────────────┴──────────────┐
│ AIX Server (Edge Runtime) │
│ │
│ dispatch ──按 dialect 选厂商──► adapter (造请求体) │
│ executor ──fetch + 心跳 + 重试──► AI 厂商 │
│ demuxer ──字节流 → 事件[] │
│ parser ──事件 → 调 Transmitter 造粒子 │
│ Transmitter ──排队 → yield 粒子 │
└───────────────┬─────────────────────────────────────────┘
│ HTTP(S)

真实 AI 厂商 API

各部件一句话职责:

部件干什么文件
aixRoutertRPC 入口,三个流式 mutation(生成/重连/删除)aix.router.ts
createChatGenerateDispatchdialect 分派到厂商,决定请求体/解析器/demuxerchatGenerate.dispatch.ts
executeChatGenerateDispatch连接 + fetch(带心跳)+ 消费流 + 调解析器chatGenerate.executor.ts
*.parser.ts把某厂商的事件翻译成对 Transmitter 的调用parsers/anthropic.parser.ts
ChatGenerateTransmitter把解析器的调用变成粒子并排队 yieldChatGenerateTransmitter.ts
ContentReassembler客户端:把粒子流重组DMessageFragment[]client/ContentReassembler.ts
aixChatGenerateContent_*客户端 L2/L1:对话→请求,流式循环,重试/续传client/aix.client.ts

3. 核心原理(逐个机制)

3.1 「粒子」到底是什么

它要解决的小问题: 需要一种协议无关的最小消息单元,既能表达「又来了一段文本」,也能表达「开始一个工具调用」「这是一张图」「流结束了」。

思路: 用一个带判别字段的联合类型(discriminated union)。粒子分三类,靠首字段区分:t(文本)、p(各种 part)、cg(控制/生命周期)。

看真实定义(aix.wiretypes.ts:700-805,namespace AixWire_Particles):

// 真实源码节选 aix.wiretypes.ts:775-805
export type TextParticleOp =
| { t: string }; // 增量文本(最高频,故用最短的键 't')

export type PartParticleOp =
| { p: 'tr_', _t: string, ... } // 推理文本增量(reasoning)
| { p: 'trs', signature: string } // 推理签名
| { p: 'fci', id: string, name: string, ... } // 函数调用开始 (function call invocation)
| { p: '_fci', _args: string } // 函数调用参数增量
| { p: 'ii', mimeType: string, i_b64: string, ... } // 内联图片
| { p: 'ia', mimeType: string, a_b64: string, ... } // 内联音频
| { p: 'urlc', title: string, url: string, ... } // URL 引用

关键细节: 键名刻意极短(tp_ti_b64)——因为这些粒子要在网络上高频传输,字节越省越好。下划线前缀(_t_fci_args)约定表示「增量/续接」,不带下划线的是「新起一段」。这是整个 AIX 的通用货币:厂商无关,UI 无关。

3.2 服务端 dispatch:厂商差异被锁进一个 switch

它要解决的小问题: 来了一个 access.dialect(如 'anthropic'),要决定:打哪个 URL、请求体长什么样、用哪个解析器、流是 SSE 还是二进制?

思路: 一个大 switch (dialect),每个分支返回一个 ChatGenerateDispatch 配置对象(请求 + demuxer 格式 + 解析器)。这是「策略对象」模式:把「怎么跟这家厂商打交道」打包成数据,而非散落各处的 if。

看 Anthropic 分支(chatGenerate.dispatch.ts:78-101,createChatGenerateDispatch):

// 真实源码节选 chatGenerate.dispatch.ts:89-101
return {
request: {
...anthropicAccess(access, ANTHROPIC_API_PATHS.messages, hostedFeatures),
method: 'POST',
body: anthropicBody, // adapter 造好的请求体
},
demuxerFormat: streaming ? 'fast-sse' : null, // 流 → 用 SSE 解复用;非流 → null
chatGenerateParse: streaming
? createAnthropicMessageParser() // 选对应解析器
: createAnthropicMessageParserNS(),
particleTransform: ...,
};

精华: 注意 default: 后面挂了一长串 case(chatGenerate.dispatch.ts:239-256:alibaba/azure/deepseek/groq/mistral/moonshot/openai/openrouter/xai/zai…)——十几家厂商共用 OpenAI 协议,所以它们 fall through 到同一段代码,只在 access 层换 key/URL。新增一家 OpenAI 兼容厂商,几乎零成本。而 const _exhaustiveCheck: never = dialect 这行(:240)用 TypeScript 的 never 强制穷尽检查:漏处理一个 dialect,编译就报错。

同一文件里还有两个对称的分派器:createChatGenerateResumeDispatch(重连到进行中的 upstream 运行,:317)和 executeChatGenerateDelete(删除 upstream 存储的运行,:407)。

3.3 解析器 + Transmitter:厂商事件 → 粒子

它要解决的小问题: 厂商发来一个事件(比如 Anthropic 的 content_block_delta),怎么变成粒子?

思路: 职责分离。解析器只懂厂商格式,它读出「这是一段文本」后,调用一个统一接口 IParticleTransmitter(如 appendText('...'));Transmitter 不懂厂商,它只负责把这些调用排队成粒子并 yield

看 Transmitter 的 appendText(ChatGenerateTransmitter.ts:227-237):

// 真实源码节选 ChatGenerateTransmitter.ts:227-237
appendText(textChunk: string) {
if (this.currentPart) // 若正在攒别的 part,先收尾排队
this.endMessagePart();
this.currentText = { t: textChunk }; // 造一个文本粒子 { t: ... }
this._queueParticleS(); // 推进发送队列
}

巧妙之处一:<think> 标签自动识别。 有些开源模型把推理过程包在 <think>...</think> 里,但 API 不标注它是推理。appendAutoText_weak(ChatGenerateTransmitter.ts:279-315)在会话最开头探测 <think>,命中就把里面的内容当推理文本发(appendReasoningText(..., { weak: 'tag' })),而非普通文本。一个启发式 hotfix,代码里诚实标了「likely to happen if the tokenizer has been trained for it」。

巧妙之处二:终止原因分两路。 Transmitter 区分 setDialectEnded(厂商正常说完,:189)和 setDispatchEnded(我们这边的 RPC 出错,:164),并把错误细分成 dispatch-prepare/fetch/read/parse 四个阶段(:176-184)。错误也是粒子(cg: 'issue'),走同一条流——这样错误展示和内容展示是同一套机制

3.4 executor:fetch + 心跳 + 把流泵成粒子

它要解决的小问题: Edge 运行时有超时(Vercel 300s 硬杀),长请求(Opus 大请求 >25s)期间连接可能被中间层掐断;还要把厂商的字节流切成事件、逐个解析。

思路: executeChatGenerateDispatch(chatGenerate.executor.ts:40)是一个 async generator,边消费厂商流边 yield 粒子。它做三件关键事:

  1. 连接时发心跳。 _connectToDispatch(:116)在等待 fetch 返回期间,用 heartbeatsWhileAwaiting 持续 yield 心跳粒子,避免连接被判死。
  2. 流式消费 + 解复用。 _consumeDispatchStream(:250)循环:read() 一个 chunk → decodedemuxer.demux(chunk) 切成事件[] → 对每个事件调 dispatchParser(...)yield* chatGenerateTx.emitParticles()
  3. 终止兜底。 流关了但厂商没发终止信号 → 打 done-dispatch-closed 并 warn「response may be truncated」(:373-379)。

看泵循环的核心(chatGenerate.executor.ts:328-359,_consumeDispatchStream):

// 真实源码节选(已简化)chatGenerate.executor.ts:328-359
for (const demuxedItem of demuxedEvents) {
if (chatGenerateTx.isEnded) break; // 终止后来的事件丢弃
if (demuxedItem.data === '[DONE]') { // OpenAI 的结束标记
chatGenerateTx.setDialectEnded('done-dialect');
break;
}
dispatchParser(chatGenerateTx, demuxedItem.data, demuxedItem.name); // 解析 → 调 Transmitter
if (!chatGenerateTx.isEnded)
yield* chatGenerateTx.emitParticles(); // 把排好的粒子 yield 出去
}

精华: router 里有个 _armSlowRequestWatchdog(aix.router.ts:25-28),在 285s 时打日志报出是哪个 model/context 快超时了。注释解释了为什么它必须在生成器体内而非 tRPC 中间件:中间件的 await next() 在流开始之前就 resolve 了,看不到长流(aix.router.ts:20-24)。这是踩过坑后的经验。

3.5 客户端三层:DMessage → 流式循环 → 重组

客户端 aix.client.ts 分三层(L2/L1/重组),自上而下:

L2 — aixChatGenerateContent_DMessage_orThrow(:586): 面向「一条 DMessage」。它把 llmId 解析成 access + model,造一条带 pendingIncomplete: true 的 generator,调 L1,最后 finalize(算 metrics/costs、解析 Gemini 引用重定向链接 :666)。

L1 — _aixChatGenerateContent_LL(:860): 流式循环 + 重试状态机。它创建 ContentReassembler,然后 while (true) 里跑粒子流,关键约束写在注释里(:1009-1019):消费循环必须同步、不能 await——否则 tRPC 会在你阻塞时关连接。所以处理放进 reassembler 的后台 promise 链:

// 真实源码节选 aix.client.ts:1021-1031
for await (const particle of particleStream)
reassembler.enqueueWireParticle(particle); // 只入队,不处理(同步、不 await)

abortSignal.throwIfAborted(); // CSF 下生成器干净结束,这里补抛 abort
sendContentUpdate?.stop?.();
await reassembler.waitForWireComplete(); // 流读完后,再等后台处理完

重组 — ContentReassembler: 见下一节。

3.6 ContentReassembler:粒子 → 片段(并发安全)

它要解决的小问题: 粒子是一个个碎片地来(且来得很快),要重组成 DMessageFragment[],还要边重组边触发 UI 更新,且不能丢序、不能因为某个粒子处理慢就卡住读流

思路: 入队 + 后台 promise 链enqueueWireParticle(ContentReassembler.ts:125)只把粒子推进 backlog 并触发处理;真正处理在 #processWireBacklog(:263)里逐个 await #reassembleParticle,通过把 .then 链回 this.processingPromise 保证严格串行(:257)。

核心分派是 #reassembleParticle 里的大 switch(:330-434):

particle 进来

├── 't' in op ──► 追加/续接文本片段

├── 'p' in op ──► switch(op.p):
│ 'tr_' 推理文本 · 'fci'/'_fci' 工具调用 · 'ii' 图片 · 'ia' 音频
│ 'urlc' 引用 · 'hres' 托管资源 · 'svs' 厂商私有状态 …

└── 'cg' in op ──► switch(op.cg):
'end' 终止 · 'issue' 错误 · 'set-metrics' 指标
'set-model' 模型名 · 'set-upstream-handle' 续传句柄
'aix-retry-reset' 重试时清理已发内容

精华:终止分类延迟到 finalize。 流里的 end 粒子只存原始数据,真正「这次是 completed / aborted / failed?」的判断推迟到 finalizeReassembly(:149)。它会:清掉占位符(仅干净退出时)、把未完成的操作标 error、算最终 metrics、干净完成时清除 upstreamHandle(:158-160)。把「分类」集中在一处,流处理期间不用反复决策。

3.7 弹性:重试、重连、续传

AIX 有三种「续上」的机制,容易混:

机制触发干什么代码
连接重试(server)连厂商时 429/503/502服务端 fetchWithAbortableConnectionRetry 重连,发 aix-retry-reset 粒子(scope srv-dispatch)executor.ts:147-158
客户端重试(reconnect)流中途网络断L1 状态机 AixStreamRetry 决定重连;清空已发内容重来(rClearStrategy: 'all')aix.client.ts:1049-1063
upstream 重连(reattach)upstreamHandle 时断线/刷新页用句柄 GET-poll 重连到厂商后台仍在跑的运行(OpenAI Responses、Gemini Deep Research)aix.client.ts:733dispatch.ts:317

reattach 的两种模式(AixReattachMode,aix.client.ts:723):'replay'(SSE 从头重放事件序列)和 'snapshot'(一次性 GET 当前快照)。注释诚实写了:snapshot 用于「SSE 端点上游坏了但资源本身还能读」的兜底。

3.8 CSF:浏览器直连厂商

它要解决的小问题: 走 tRPC 要绕服务端,多一跳延迟;Edge 还有 5 分钟超时。

思路: 客户端直连厂商(Client-Side Fetch)。L1 里若 aixAccess.clientSideFetch 为真,就动态加载 CSF 模块,用同样的 dispatch/parser 在浏览器里跑(aix.client.ts:906-910:965-1007)。粒子模型不变,所以重组逻辑完全复用。代价:服务端的 csfUnsafe 粒子变换(如需要服务端 fetch 的 Anthropic 文件内联)要在客户端用 ReassemblerParticleTransforms 补上(:913-918)。

4. 巧妙之处(可借鉴)

  • 「粒子」作为协议无关中间表示。 这是把 N 家厂商问题降维成「N 个 parser + 1 套统一流」的关键。新增厂商 = 写一个 parser + dispatch 加一个 case,UI/存储零改动。(aix.wiretypes.ts:700chatGenerate.dispatch.ts:74)
  • 十几家 OpenAI 兼容厂商共用一条 fall-through 路径,never 穷尽检查兜底。(dispatch.ts:239-256)
  • 读流必须同步、处理放后台 promise 链——这是和 tRPC 流式语义死磕出来的硬约束,注释里写得明明白白。(aix.client.ts:1009-1022)
  • 错误即粒子:错误、心跳、内容走同一条流、同一套重组,展示逻辑统一。(ChatGenerateTransmitter.ts:363-376)
  • 超时看门狗放在生成器体内,因为中间件看不到流。(aix.router.ts:20-28)
  • 终止分类延迟到 finalize,流处理期间不反复决策。(ContentReassembler.ts:149:990)

5. 边界与局限

  • reattach(upstream 重连)只支持少数协议:OpenAI Responses、Gemini Interactions;其余 dialect 在 createChatGenerateResumeDispatch 里直接 throw(dispatch.ts:374-375)。
  • Edge 5 分钟硬超时:超长运行靠 CSF(绕过 Edge)或 upstream reattach 兜底,而非服务端长连。
  • <think> 推理识别是启发式:通用 tokenizer 下会漏(代码自承,ChatGenerateTransmitter.ts:16-21)。
  • 资源化能力(resumability)在客户端仍被注释禁用:aix.client.ts:898-899enableResumability 标了「disabled until clearly working」。

6. 横向对比

相比 chat-agents 货架里直接调单一 SDK 的项目,big-AGI 的取舍是多投资一层「粒子」抽象换来厂商无关性。代价是这层管线复杂(64 个文件),但收益是 Beam(下一章)这种「多模型并行」能力几乎免费——因为每个 ray 就是一次标准 AIX 调用。可对照:

7. 代码地图(导航索引)

主题文件关键符号
tRPC 入口(3 个流式 mutation)src/modules/aix/server/api/aix.router.tsaixRouter_armSlowRequestWatchdog
粒子类型定义src/modules/aix/server/api/aix.wiretypes.tsAixWire_ParticlesTextParticleOpPartParticleOp
厂商分派(核心 switch)src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.tscreateChatGenerateDispatchcreateChatGenerateResumeDispatchexecuteChatGenerateDelete
流执行器(fetch/心跳/泵流)src/modules/aix/server/dispatch/chatGenerate/chatGenerate.executor.tsexecuteChatGenerateDispatch_consumeDispatchStream_connectToDispatch
粒子发射器src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.tsChatGenerateTransmitterappendTextappendAutoText_weaksetDialectEnded
厂商解析器(示例)src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.tscreateAnthropicMessageParser
客户端 L2/L1src/modules/aix/client/aix.client.tsaixChatGenerateContent_DMessage_orThrow_aixChatGenerateContent_LLaixReattachContent_DMessage_orThrow
客户端重组src/modules/aix/client/ContentReassembler.tsContentReassemblerenqueueWireParticlefinalizeReassembly
厂商抽象/注册src/modules/llms/vendors/vendors.registry.tsIModelVendor.tsMODEL_VENDOR_REGISTRYIModelVendor