跳到主要内容

03 · 两个核心算法:verifyEvents 与 defaultApplyEvents

这是 AG-UI 客户端的精华两章合一。verifyEvents 把「协议的顺序规则」变成可执行的状态机;defaultApplyEvents 把「一串增量事件」变成「完整的消息列表 + 共享状态」。

3.1 verifyEvents:协议规则的可执行形式

要解决的小问题: 后端吐的事件流可能是非法的(没 START 就 CONTENT、没闭合就 RUN_FINISHED、RUN_ERROR 后还接着发)。客户端需要在折叠成状态之前就拦下这些,给出清晰报错。

思路: 维护一组「当前打开了什么」的集合,逐事件做状态转移;不合法就 throwError

它在闭包里维护这些游标(verify.ts:11-21):

let activeMessages = new Map<string, boolean>(); // 哪些 messageId 还没 END
let activeToolCalls = new Map<string, boolean>(); // 哪些 toolCallId 还没 END
let activeSteps = new Map<string, boolean>(); // 哪些 step 还没 FINISHED
let runStarted = false, runFinished = false, runError = false;
let firstEventReceived = false;

核心规则,逐条对应代码:

  • 首事件必须是 RUN_STARTED(否则报错;RUN_ERROR 例外):verify.ts:67-71
  • RUN_ERROR 是绝对终止态:一旦置位,后续任何事件都报「run has already errored」:verify.ts:43-50
  • RUN_FINISHED 后只能 RUN_STARTED 开新轮;新轮会 resetRunState() 清空所有游标:verify.ts:53-87
  • 活动中不能重开同名:TEXT_MESSAGE_START 撞已存在的 messageId 报错:verify.ts:96-103
  • 闭合校验:TEXT_MESSAGE_CONTENT/_END 必须找得到活动消息(verify.ts:113verify.ts:129),工具同理。
  • RUN_FINISHED 前必须全闭合:还有活动 step / message / tool 就报错,并把未闭合的名字列进错误信息:verify.ts:233-263

用状态图看一条 run 的宏观状态(mermaid,因为有「新轮回到 STARTED」的回边,ASCII 画不清):

巧妙之处:协议的「合法顺序」没有单独的规范文档,它就是这段状态机。规则即代码,客户端天然成了一致性的「裁判」。代价是:规则要改得动多语言时,得在每个 SDK 各写一遍这套状态机。

3.2 defaultApplyEvents:把事件折叠成状态的 reducer

要解决的小问题: 经过 verify 的事件流仍是「增量」(一个 delta、一个 patch)。要得到「完整消息 + 完整状态」,需要一个把它们累积起来的 reducer。

这正是 defaultApplyEvents(packages/client/src/apply/default.ts:94)。它用 rxjs concatMap 逐事件处理(保证顺序),内部维护本地的 messages / state,每个事件 case 做两件事:先跑订阅者钩子(可改写/拦截),再做默认折叠

几个有代表性的折叠逻辑:

文本累积——按 messageId 找到消息,把 delta 续到 content 后面:

// packages/client/src/apply/default.ts:226-229(TEXT_MESSAGE_CONTENT)
const existingContent =
typeof targetMessage.content === "string" ? targetMessage.content : "";
targetMessage.content = `${existingContent}${delta}`;

状态增量——STATE_DELTA 套用 JSON Patch,失败只 console.warn 不崩流(default.ts:556-568);STATE_SNAPSHOT 直接整体替换(default.ts:529)。

工具参数的「半成品 JSON」——参数是字符串增量,流到一半不是合法 JSON。框架在喂给订阅者前用 untruncate-json 把截断的 JSON 补全成可解析的部分对象(default.ts:352),让 UI 能边流边渲染还没写完的参数:

// packages/client/src/apply/default.ts:349-352(TOOL_CALL_ARGS,节选)
let partialToolCallArgs = {};
try {
partialToolCallArgs = untruncateJson(toolCallBuffer); // 补全截断 JSON
} catch (error) {}

3.3 三处「不显然但很重要」的折叠决策

这几处是读源码才能发现的、踩过坑的设计:

① 工具结果要插在「发起它的助手消息」后面,而不是末尾。 因为 chat→tool→chat 循环里,后续助手文本可能先于 tool result 到达;若直接 append,历史会变成 assistant(tool_call) → text → tool,违反「assistant 的 tool_call 必须紧跟其 tool result」的 provider 契约,下游会 400。所以代码找到 owner 助手消息,插在它(及已有的并行 tool result)之后(default.ts:476-489):

// packages/client/src/apply/default.ts:476-489(TOOL_CALL_RESULT,节选)
const ownerIndex = messages.findIndex(
(m) => m.role === "assistant" &&
m.toolCalls?.some((tc) => tc.id === toolCallId));
if (ownerIndex === -1) messages.push(toolMessage);
else { /* 跳过已有的 tool 结果,插在它们之后 */ }

② MESSAGES_SNAPSHOT 是「编辑式合并」,不是粗暴替换。 后端给的快照常常不含「客户端专属」消息(activity 永远是客户端的;reasoning 多数后端不回传)。所以合并时:保留这些客户端专属消息、用快照版本替换重名消息、再补上快照里新增的(default.ts:596-629)。reasoning 还有个微妙判断——只有当快照本身带 reasoning 时,才把快照当 reasoning 的权威,否则保留本地流式版本,避免「同一段推理渲染两遍」(default.ts:612-614)。

③ CHUNK 事件到这里必须已被展开。 apply 里对 TEXT_MESSAGE_CHUNK/TOOL_CALL_CHUNK/REASONING_MESSAGE_CHUNK 直接 throw(default.ts:942-948default.ts:1097):「must be transformed before being applied」。这是工序顺序(ch02 的 ②→⑤)的硬约束,用抛错把契约钉死。

3.4 折叠的输出:增量 mutation

每个 case 末尾调 emitUpdates()(default.ts:117-124):只有当本轮真的改了 messagesstate 才发出一个 mutation,否则发 EMPTY。上游 processApplyEvents(ch02)拿到 mutation 就写回 agent.messages/agent.state 并通知订阅者。整个折叠是「事件进、增量状态出」的纯流式过程。

3.5 代码地图

主题文件符号名
顺序校验状态机packages/client/src/verify/verify.tsverifyEvents
事件→状态 reducerpackages/client/src/apply/default.tsdefaultApplyEvents
工具调用挂哪条助手消息packages/client/src/apply/default.tsresolveOrCreateAssistantMessage
截断 JSON 容错解析packages/client/src/apply/default.tsuntruncateJson(依赖 untruncate-json)
JSON Patch 套用packages/client/src/apply/default.tsjsonpatch.applyPatch(依赖 fast-json-patch)
增量 mutation 发射packages/client/src/apply/default.tsemitUpdates