跳到主要内容

02 · 客户端运行时(runAgent 的 rxjs 流水线)

这一章讲前端 SDK 怎么把事件流「跑起来」。核心就一个类 AbstractAgent,核心就一个方法 runAgent()。读完你能看懂集成层(LangGraph/Mastra…)是怎么挂上来的。

2.1 一个抽象基类,一个抽象方法

所有 agent 都继承 AbstractAgent(packages/client/src/agent/agent.ts:51)。它持有四样可变状态:

// packages/client/src/agent/agent.ts:51-65(节选)
abstract class AbstractAgent {
public threadId: string; // 会话 id(跨 run 持久)
public messages: Message[]; // 累积的消息列表
public state: State; // 共享状态(任意 JSON)
public pendingInterrupts: Interrupt[] = []; // 上一轮留下的未解决中断
abstract run(input: RunAgentInput): Observable<BaseEvent>; // ← 子类只需实现这个
}

子类唯一必须实现的就是 run(input):返回一个 Observable<BaseEvent>(一条事件流)。 HttpAgent 实现成「发 POST + 解码 SSE」(http.ts:64);测试用的 agent 可以直接 of(...events)。其余流水线全在基类里,所有子类共享。

2.2 runAgent 的 5 道工序

runAgent() 的心脏是一个 rxjs pipe(...)(agent.ts:194-243)。把它读成一条流水线:

runAgent()
│ prepareRunAgentInput() 打包 threadId/messages/state/tools/context
│ onInitialize() 校验未解决的中断;跑 onRunInitialized 订阅者

① (中间件链).run(input) 取得事件流(子类的 run,被 middleware 包裹)
② transformChunks CHUNK 事件 → 标准 START/CONTENT/END
③ verifyEvents 顺序合法性校验(非法即 throw)
④ takeUntil(activeRunDetach$) 收到 detach 信号就立刻停
⑤ apply → processApplyEvents 事件折叠成 messages/state,并写回 + 通知订阅者
│ catchError → onError 出错走订阅者的 onRunFailed,abort 类错误静默吞掉
│ finalize → onFinalize 收尾:isRunning=false,跑 onRunFinalized

返回 { result, newMessages } newMessages = 本轮新增的消息

真实代码(精简后的 pipe 结构):

// packages/client/src/agent/agent.ts:194(runAgent 内)
const pipeline = pipe(
() => chainedAgent.run(input), // ① 中间件链最终调到子类 run
transformChunks(this.debugLogger), // ②
verifyEvents(this.debugLogger), // ③
(s$) => s$.pipe(takeUntil(this.activeRunDetach$!)), // ④
(s$) => this.apply(input, s$, subscribers), // ⑤ reduce
(s$) => this.processApplyEvents(input, s$, subscribers), // ⑤ 写回+通知
catchError(/* … onError … */),
finalize(/* … onFinalize … */),
);
await lastValueFrom(pipeline(of(null)));

工序 ② 和 ③ 的顺序很关键:先把 chunk 展开成标准事件,再做顺序校验——否则 verify 看到 chunk 会无所适从。详见 ch04。

2.3 订阅者:在工序间插钩子

AgentSubscriber(packages/client/src/agent/subscriber.ts:57)是消费事件的主要方式。它有两层钩子:

  • 粗粒度:onEvent(每个事件都触发)。
  • 细粒度:每种事件一个,如 onTextMessageContentEventonStateDeltaEventonRunFinishedEvent……

钩子不只是「通知」,还能改写状态:订阅者可以返回一个 { messages?, state?, stopPropagation? } 的 mutation。框架按顺序执行所有订阅者并累积他们的改动(runSubscribersWithMutation,subscriber.ts:310)。两个要点:

  • stopPropagation: true中断后续订阅者并跳过默认处理——例如你想完全接管某类事件的渲染。
  • 订阅者拿到的是共享引用,开发/测试模式下框架会 deepFreeze 输入,谁敢原地改就抛 TypeError(subscriber.ts:316-319)——强制「改动只能通过返回值表达」。这是个防呆设计。
// 示意,非源码:一个会改写状态的订阅者
const sub = {
onToolCallEndEvent: ({ toolCallName, toolCallArgs }) => {
if (toolCallName === "set_theme") {
return { state: { theme: toolCallArgs.value } }; // 返回 mutation,框架替你合并
}
},
};

2.4 中间件:包裹整个 run

订阅者是「看/改事件」,中间件是「包裹整条事件流」。agent.use(middleware) 注册;runAgentreduceRight 把它们串成洋葱(agent.ts:201-213):每个中间件的 run(input, nextAgent) 决定要不要、怎么调用下一层。

框架自带几个向后兼容中间件,根据 SDK 版本自动插入(agent.ts:113-127):

中间件干什么
BackwardCompatibility_0_0_39旧版事件形态适配
BackwardCompatibility_0_0_45把弃用的 THINKING_* 翻成 REASONING_*
BackwardCompatibility_0_0_47把旧 BinaryInputContent 映射成 image/audio/video/document 类型

这是协议演进的真实策略:老格式不直接删,而是用中间件在客户端就地翻译成新格式,上层逻辑只需面对新形态。

2.5 connect / detach / abort:运行的边界控制

除了 runAgent(跑一轮),基类还提供:

  • connectAgent(agent.ts:258):用于「连接到一个已经在跑的 run」(如断线重连)。它走 connect(input) 而不是 run,且默认实现抛 AGUIConnectNotImplementedError——不支持就静默返回 EMPTY(agent.ts:300-303)。
  • detachActiveRun(agent.ts:329):往 activeRunDetach$ 发信号,工序 ④ 的 takeUntil立刻停止处理当前流(但不一定取消底层请求),并 await 收尾完成。
  • abortRun(HttpAgent 覆写,http.ts:47):真正 abortController.abort() 取消 HTTP 请求。onError 里专门识别 abort 类错误(AbortError"component unmounted" 等)并静默吞掉,不当成真错误(agent.ts:494-503)。

2.6 中断的把关在哪

上一章说 RUN_FINISHED 可能带 interrupt。客户端的把关在 onInitialize(agent.ts:396-412):新一轮开始前,如果还有 pendingInterrupts 没被本轮 resume 覆盖,直接抛错;过期的中断也抛错。这保证「人没回应就别想往下跑」。

pendingInterrupts 的写入发生在 apply 阶段处理 RUN_FINISHED 时(apply/default.ts:877-880)——只有订阅者没拦截时才更新,且做了防御性拷贝。

2.7 代码地图

主题文件符号名
运行时基类packages/client/src/agent/agent.tsAbstractAgent
跑一轮(主流水线)packages/client/src/agent/agent.tsrunAgent
打包运行输入packages/client/src/agent/agent.tsprepareRunAgentInput
中断把关packages/client/src/agent/agent.tsonInitialize
立即脱离当前 runpackages/client/src/agent/agent.tsdetachActiveRun
HTTP 实现packages/client/src/agent/http.tsHttpAgent
订阅者接口packages/client/src/agent/subscriber.tsAgentSubscriber
订阅者执行/累积 mutationpackages/client/src/agent/subscriber.tsrunSubscribersWithMutation
中间件链packages/client/src/middleware/middleware.tsMiddlewareFunctionMiddleware