跳到主要内容

第 01 章 · 流式 AI 管线

本章讲:一句话(聊天或 agent)进了主进程之后,这条「流」是怎么被登记、执行、广播、落库的,以及三种「往一条已经在跑的流里再塞一句话」的语义。读完你能讲清 Cherry Studio 最核心的运行时骨架。

1. 先建直觉:为什么要「一个 topicId 一条流」

它要解决的小问题。 一个聊天会话(topic)可能被多个窗口同时打开;模型在流式吐字;用户随时可能关窗、重连、再发一句、或点停止。如果让 renderer 持有这条流,关窗就断流、数据就丢。

思路。流的所有权放在主进程。主进程维护一张表:topicId → 一条活动流。窗口只是订阅者(listener),平权,谁都可以 attach/detach,但谁都不「拥有」这条流。落库也在主进程做——所以关窗不断流、不丢数据

这条不变量(invariant)在仓库内部文档里被反复强调:

  • Topic 级寻址: 每个 IPC、广播、共享缓存项都按 topicId 编址;一个 topic 至多一条活动流,订阅者平权,没有「owner 窗口」。
  • Main 拥有持久化: renderer 关闭或崩溃既不会中止流,也不丢数据。

(见 docs/references/ai/core-architecture.mdInvariants 节,和 AiStreamManager 的实现。)

2. 顶层流程图

renderer: useChat.sendMessages
│ IPC Ai_Stream_Open { topicId, trigger, userMessageParts, ... }

AiStreamManager.onInit 注册的 handler
│ 把发送方包成 WebContentsListener(订阅者)

dispatchStreamRequest(manager, subscriber, req)
│ ① 挑第一个 canHandle(topicId) 命中的 ChatContextProvider
│ ② provider.prepareDispatch:解析模型 / 落库用户消息 / 组 listeners
│ ③ 决定 start 还是 inject(见 §5)

AiStreamManager.send(input)
│ start:新建 ActiveStream,每个模型一个 StreamExecution

runExecutionLoop(每个 execution)
│ → AiService.streamText(request, signal)
│ ├─ 普通聊天: new Agent(...).stream() → @ai-sdk/* → 模型
│ └─ runtime.kind==='agent-session':
│ → AgentSessionRuntimeService.openTurnStream() (见第 02 章)

pipeStreamLoop:把 chunk 流「一读两用」
├─ 广播 → WebContents / SSE / ChannelAdapter / Persistence
└─ readUIMessageStream → 累积成 CherryUIMessage 快照

终态(done/error/aborted/awaiting-approval)
└─ PersistenceListener 落库 → renderer 重新 useQuery 拿最终行

这张图与仓库 docs/references/ai/core-architecture.mdSequence: a fresh chat turn 一一对应,可对照核对。

3. 三种 ChatContextProvider:差异收在一处

它要解决的小问题。 「普通聊天」「临时对话(不落库)」「agent 会话」三者入口完全一样(都是 Ai_Stream_Open),但在「要不要落库、用哪个模型、要不要交给 driver」上不同。把这些差异散在 AiStreamManager 里会很脏。

思路。provider 模式:每个 provider 实现 canHandle(topicId) + prepareDispatch(...),分发器挑第一个命中的。

dispatchStreamRequest
├─ PersistentChatContextProvider 普通聊天:落库用户行,普通 Agent 循环
├─ TemporaryChatContextProvider 临时对话:不落库
└─ AgentChatContextProvider agent 会话:校验工作区,交给 runtime host

实现都在 src/main/ai/streamManager/context/(PersistentChatContextProvider.ts / TemporaryChatContextProvider.ts / AgentChatContextProvider.ts,基类 ChatContextProvider.ts,调度 dispatch.ts)。

AgentChatContextProvider 的关键差异:它在 prepareDispatch校验 agent 会话(必须有 agent、工作区、模型、已注册 driver),原子地落库「一条 user 行 + 一条 pending assistant 行」,然后调 AgentSessionRuntimeService.beginTurn(...)——而不是去构造一个普通 Agent。这一步是第 01 章和第 02 章的接缝。

4. pipeStreamLoop:一条 chunk 流「一读两用」

它要解决的小问题。 模型吐出的 UIMessageChunk 流只能读一遍,但我们要同时做两件事:广播给所有订阅者、累积成一条完整消息用于落库。

思路(原理演示,示意非源码):

// 示意,非源码:把一条只能读一次的流「读一次,喂两边」
async function pipeStreamLoop(chunkStream, listeners, accumulate) {
for await (const chunk of chunkStream) { // 只读一遍
for (const l of listeners) l.onChunk(chunk) // ① 广播给窗口/SSE/IM/落库累积器
accumulate.push(chunk) // ② 喂给 readUIMessageStream 累积
}
// 终态:把 done/error/aborted 这一个事件分发给每个 listener 的对应回调
}

真实实现是 src/main/ai/streamManager/pipeStreamLoop.ts,并配合 readUIMessageStream(AI SDK 提供)把 chunk 累积成 CherryUIMessage 快照。值得注意的是:Main 和 renderer 用的是同一套累积函数——renderer 侧的 useExecutionOverlay 也跑一遍 readUIMessageStream 做乐观渲染,Main 侧跑一遍做落库,两边对齐(见 docs/references/ai/execution-overlay.md)。

listener 是终态的统一抽象。 每个 listener 实现 onChunk / onDone / onError / onPaused / isAlive(类型见 src/main/ai/streamManager/types.tsStreamListener)。不同消费者只是不同 listener:

listener干什么文件
WebContentsListener把 chunk 转发给某个窗口的 webContentslisteners/WebContentsListener.ts
PersistenceListener终态把最终消息写库listeners/PersistenceListener.ts
SseListener给 HTTP/SSE 客户端(API 网关)推listeners/SseListener.ts
ChannelAdapterListener推到 IM 频道(Slack/Telegram/…)listeners/ChannelAdapterListener.ts
TraceFlushListener终态刷 OTel tracelisteners/TraceFlushListener.ts

这套抽象的妙处:后台任务 agent(第 02 章的 runAgentTask)就是塞了一个自定义 sentinel listener 去累积文本、再塞 ChannelAdapterListener 把结果推到 IM——完全复用同一条管线,不需要另开一套。

5. steering:往一条在跑的流里再塞一句话

这是本章最容易踩错的概念。用户在 AI 还在回答时又发了一句话,该怎么办?Cherry Studio 对聊天agent 会话给了两套不同语义,且都不是「打断重来」

5.1 聊天:排队 + 让步 + 续接(不是打断)

直觉。 不粗暴 abort 当前回答(那会丢半截输出、还要重发),而是让当前这一步干净地收尾,再接着回答新消息。

用户在流中再发一句


PersistentChatContextProvider.hasLiveStream 分支
落库为普通 user 行 → 返回「只入队」的 PreparedDispatch(models 空)


manager.enqueuePendingSteer(topicId, id) 把这行压入 pendingSteers FIFO
send() 看到有活动流 → 只 upsert 订阅者(inject)


运行中的 turn:steerYield 停止条件看到 hasPendingSteer
在下一个 step 边界「干净停」→ 持久化为 success(不是 paused)


onExecutionDone 看到队列里有 steer
不收尾 topic,而是 chain 一个 steer-continuation
(startNextChatTurn,带着上一轮的 renderer listeners)
FIFO 每完成一轮 drain 一条

关键不变量(都能在源码核对):

  • steer 续接只在干净 done 之后才发生。若 turn 被 Stop 中止或报错,队列被丢弃,已落库的 user 行作为「悬挂消息」留在历史里让你重发(onExecutionPaused / onExecutionErrorpendingSteers)。
  • turn 因 steer 让步而停时,持久化状态是 success 而非 paused——因为它是被设计地、在 step 边界主动停的。

(机制见 docs/references/ai/stream-manager.mdSteering 节;停止条件 steerYieldsrc/main/ai/runtime/aiSdk/params/features/steerYield.ts。)

注意一个历史变化。 仓库里 agent-loop.md 提到聊天 steering 曾是「abort-and-restart」,而 stream-manager.md 描述的现行实现是「enqueue + yield + chain」。as-of 本 commit,现行聊天 steering 是 enqueue+yield+chain(stream-manager.md 更新),agent-loop.md 的措辞是旧的。两份内部文档此处略有不一致,以 stream-manager.md + steerYield 源码为准。

5.2 agent 会话:redirect 注入 / 否则排队(永不打断)

agent 会话从不为了应用一句 steer 去 abort 当前 turn(用户 Stop 是现在唯一的 abort 源)。

enqueueUserMessage(sessionId, message)
├─ 有活动 turn 且 driver 能 steer
│ → connection.redirect({ message, systemReminder:true })
│ (Claude Code 用 PreToolUse hook,在下次工具调用前注入)
│ 折进当前 turn:不开新 turn、不入队
│ 若 turn 在注入前就结束 → 连接发 steer-undelivered,host 改为入队
└─ 没有活动 turn,或 driver 不支持 steer
→ 压入 entry.pendingTurns(并记到 steerMessageIds 让下一轮包 system-reminder)
→ scheduleNextTurn

实现见 AgentSessionRuntimeService.enqueueUserMessage(src/main/ai/agentSession/AgentSessionRuntimeService.ts)。这套机制的细节(尤其是 steer-boundary 怎么把一条回答「滚」成两行)留到第 02 章和第 04 章讲。

两套 steering 的对照:

聊天agent 会话
是否打断当前 turn否(step 边界让步)否(redirect 注入 / 排队)
触发后做什么入队 + chain 续接 turn折进当前 turn 或排进 pendingTurns
唯一的 abort 源用户 Stop / 报错用户 Stop
注入载体下一轮 LLM 调用driver 的 redirect()(Claude 的 PreToolUse hook)

6. 巧妙之处

  • 窗口平权 + 主进程独占流。 没有「owner 窗口」的设计,使「关一个窗口、另一个窗口接着看、后台跑完照样落库」全部自然成立——复杂度集中在一处(AiStreamManager),而不是散在每个窗口。
  • 同一条管线服务一切。 普通聊天、临时对话、agent 会话、API 网关、IM 推送、后台定时任务,统统是「不同的 ChatContextProvider + 不同的 listener 组合」。新增一种消费方式 = 写一个 listener,不动管线。
  • steering 不打断。 把「用户中途补话」实现成排队+让步而非 abort,既不丢输出、又有干净的 turn 边界——这是把「对话连续性」当成一等公民的设计。

7. 边界与局限

  • 一个 topic 至多一条活动流:同一会话不能并行两路回答(多模型 fan-out 是一条流里的 N 个 execution,不是 N 条流)。
  • steer 续接依赖干净 done:中止/报错会丢弃队列里未答的 steer(它们以悬挂 user 行留在历史)。
  • 聊天 steering 的 steerYieldstep 边界才停,所以补话不是「立刻」生效,而是「这一步算完」生效。

8. 代码地图

主题文件符号
活动流登记 + stream IPCsrc/main/ai/streamManager/AiStreamManager.tsAiStreamManager
分发选型 + start/injectsrc/main/ai/streamManager/context/dispatch.tsdispatchStreamRequest
三种 providersrc/main/ai/streamManager/context/PersistentChatContextProvider, TemporaryChatContextProvider, AgentChatContextProvider
chunk 一读两用src/main/ai/streamManager/pipeStreamLoop.tspipeStreamLoop
listener 抽象src/main/ai/streamManager/types.tsStreamListener
落库 listenersrc/main/ai/streamManager/listeners/PersistenceListener.tsPersistenceListener
聊天 steer 让步条件src/main/ai/runtime/aiSdk/params/features/steerYield.tssteerYield
agent steer 入口src/main/ai/agentSession/AgentSessionRuntimeService.tsAgentSessionRuntimeService.enqueueUserMessage

下一章:这条管线在 agent 会话上把控制权交给了一个 driver——第 02 章讲 host/driver 怎么分层、Claude Code 怎么接进来。