跳到主要内容

第 1 章 · 流式 agent 循环

这一章讲:一条 prompt 进来后,核心循环到底怎么转。看懂这个,后面的并发/摘要/工具都是在它周围加防护。

1.1 先建直觉:会话就是一条消息流

Crush 里没有「对话上下文」这种抽象的内存对象——一个会话 = SQLite 里一串按时间排序的消息。每条消息有个 Role:

Role谁产生装什么
User用户文本 prompt + 附件
Assistant模型文本、推理(thinking)、工具调用(tool_use)
Tool工具执行后工具结果(tool_result)

跑一轮就是:把这串消息喂给模型 → 模型续写新的 assistant 消息 → 如果它要调工具,执行后追加一条 tool 消息 → 再喂回去。直到模型说「我说完了」。

这条流是唯一事实来源:UI 显示的是它,下一轮发给模型的也是它,崩溃后恢复靠的还是它。所以本章的核心,就是「模型每吐一片东西,如何正确地追加进这条流」。

1.2 谁来转这个循环:fantasy.Agent.Stream

Crush 不自己写「多步工具循环」——那部分(调 provider、解析流式 SSE、判断该不该再调一轮)交给了 charm.land/fantasy 这个库。Crush 的做法是:构造一个 fantasy.NewAgent,然后调 agent.Stream(...),通过一大堆回调函数挂钩到循环的每个事件上

fantasy.Agent.Stream 内部循环 Crush 挂的回调(都在 agent.go 的 Run 里)
─────────────────────────── ──────────────────────────────────
┌─ 准备这一步的消息/工具 ──────────────▶ PrepareStep: 注入 todo 提醒、加缓存标记、
│ 创建一条空 assistant 消息
│ 调 provider,开始流式吐 token
│ ├─ 吐推理(thinking)增量 ─────────▶ OnReasoningDelta / OnReasoningEnd
│ ├─ 吐正文增量 ────────────────────▶ OnTextDelta: 追加到 assistant.Content
│ ├─ 决定调某工具 ──────────────────▶ OnToolCall: 把 tool_use 记进 assistant
│ ├─ 执行该工具,拿到结果 ──────────▶ OnToolResult: 新建一条 Tool 消息
│ └─ 这一步结束 ────────────────────▶ OnStepFinish: 记 usage/cost、存会话

└─ 该不该再来一步? ──────────────────▶ StopWhen: 超长?死循环?→ 停

怎么读这张图: 左边是 fantasy 跑的循环骨架,右边是 Crush 注入的「副作用」——几乎每个回调干的事都是「把流式增量写进消息流」。挂钩点见 internal/agent/agent.go:790 起的 agent.Stream(genCtx, fantasy.AgentStreamCall{...})

1.3 PrepareStep:每一步开始前做什么

PrepareStep 在每个流式步骤前被调用(internal/agent/agent.go:801PrepareStep 字段)。它干三件正经事:

  1. 清掉历史消息的 provider options,然后用最新的工具列表(a.tools.Copy())——因为 MCP 工具可能在会话中途变化。
  2. 加 Anthropic 缓存控制:给系统消息和最后两条消息打上 ephemeral 缓存标记(见 getCacheControlOptions,第 4 章细讲)。
  3. 创建一条空的 assistant 消息并把它的 ID 塞进 context,后续所有增量回调都往这条消息上追加:
// internal/agent/agent.go:857 —— PrepareStep 内
assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
Role: message.Assistant,
Parts: []message.ContentPart{},
Model: largeModel.ModelCfg.Model,
Provider: largeModel.ModelCfg.Provider,
})
// ...
currentAssistant = &assistantMsg // 后续 On* 回调都改这个指针指向的消息

注意 currentAssistant 是个在每一步被重新赋值的指针——多步循环里,每一步都是一条新的 assistant 消息。

1.4 增量落库:OnTextDelta / OnToolCall / OnToolResult

这三个回调是「把模型吐的东西变成消息流」的主力。它们都很短,核心都是「改 currentAssistant 然后 Update」。

正文增量 —— 边吐边追加,UI 实时刷新:

// internal/agent/agent.go:901 —— OnTextDelta
if len(currentAssistant.Parts) == 0 {
text = strings.TrimPrefix(text, "\n") // 去掉开头多余换行(非交互模式很显眼)
}
currentAssistant.AppendContent(text)
return a.messages.Update(genCtx, *currentAssistant)

工具调用 —— 模型决定调某工具,把它记进 assistant 消息(OnToolCall,internal/agent/agent.go:927)。这里有个细节:输入 JSON 会先过 sanitizeToolInput 校验,若不是合法 JSON 就替换成 {} 并标记,避免「截断的工具参数」把会话卡死(sanitizeToolInput,internal/agent/agent.go:2198)。

工具结果 —— 工具执行完,新建一条 Tool 消息(OnToolResult,internal/agent/agent.go:944)。注意它故意用父 ctx 而非 genCtx:

// internal/agent/agent.go:952 —— OnToolResult,注释原文
// Use parent ctx instead of genCtx to ensure the message is created
// even if the request is canceled mid-stream
_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, ...)

这是个反复出现的模式:流式生成用可取消的 genCtx,但「把已经发生的事实落库」用不可取消的父 ctx——即使用户中途按了取消,已经跑完的工具结果也不能丢,否则消息流会出现「有工具调用但没有结果」的破损状态(第 4 章会讲这种破损如何被修复)。

1.5 一步结束:OnStepFinish 记账

每个流式步骤结束时,OnStepFinish(internal/agent/agent.go:960)把 fantasy 的完成原因翻译成 Crush 的枚举,并记账:

// internal/agent/agent.go:993 —— 估算 usage(provider 没给时回退到自己数 token)
usage, estimated := fallbackStepUsage(stepMessages, stepResult)
a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)

一个值得记的细节:「工具结果可以主动结束整轮」。正常情况下「这一步以工具调用结束」意味着还要再跑一轮(把结果喂回模型);但如果某个工具结果带了 StopTurn(比如 hook 拦截、权限被拒),就把完成原因改成「结束」,让 UI 停下来:

// internal/agent/agent.go:977
if finishReason == message.FinishReasonToolUse {
for _, tr := range stepResult.Content.ToolResults() {
if tr.StopTurn {
finishReason = message.FinishReasonEndTurn
break
}
}
}

1.6 什么时候停:StopWhen 两个条件

fantasy 默认在「模型不再调工具」时停。Crush 额外塞了两个 StopWhen 条件(internal/agent/agent.go:1003),都是「主动喊停」:

条件① 上下文快满了 ── 触发自动摘要
remaining = 上下文窗口 - 已用 token
若 remaining ≤ 阈值 → shouldSummarize=true, 停
(大窗口阈值 = 固定 20k buffer;小窗口阈值 = 窗口的 20%)

条件② 检测到死循环 ── hasRepeatedToolCalls
最近 10 步里,同一个「工具调用+结果」签名出现 > 5 次 → 停

条件①的阈值逻辑在 internal/agent/agent.go:1003-1024,用到顶部三个常量(largeContextWindowThreshold = 200k、largeContextWindowBuffer = 20k、smallContextWindowRatio = 0.2)。条件②细看下一节。

1.7 死循环检测:用「调用+结果」的哈希签名

LLM 偶尔会卡进「反复跑同一条 grep、拿同样的空结果、又跑同一条 grep」的死循环。Crush 的检测很朴素但有效:给每一步的工具交互算一个 SHA-256 签名(把工具名、输入、输出拼起来哈希),在最近 10 步的窗口里数同一签名的出现次数。

// internal/agent/loop_detection.go:45 —— getToolInteractionSignature
h := sha256.New()
for _, tc := range toolCalls {
output := ""
if tr, ok := resultsByID[tc.ToolCallID]; ok {
output = toolResultOutputString(tr.Result)
}
io.WriteString(h, tc.ToolName); io.WriteString(h, "\x00")
io.WriteString(h, tc.Input); io.WriteString(h, "\x00")
io.WriteString(h, output); io.WriteString(h, "\x00")
}
return hex.EncodeToString(h.Sum(nil))

关键设计:签名同时包含输入和输出。如果模型反复调同一个工具但每次输入不同(在真正探索),签名各异、不会误判;只有「输入相同、输出也相同」地空转,才算死循环(hasRepeatedToolCalls,internal/agent/loop_detection.go:19,窗口 10、阈值 5,见常量 loopDetectionWindowSize / loopDetectionMaxRepeats)。

1.8 退出前:flush + 发出唯一的 RunComplete

Run 用一个 defer(internal/agent/agent.go:743)保证无论成功/失败/panic,退出前都:

  1. 脱离 run context 的短超时 context flush 所有缓冲的消息更新(a.messages.FlushAll)——因为 workspace 关闭会取消 run context,但缓冲里的最后几片增量必须落盘。
  2. 发出这一轮的权威 RunComplete 事件,带上最终 assistant 消息的 ID 和文本。

这个「flush 再发 RunComplete」的顺序很讲究:它让规矩的客户端有最大机会先看到最终消息事件、再看到 RunComplete;而 RunComplete内嵌了 Text 字段,给那些乱序收到事件的客户端兜底(internal/agent/agent.go:756-774)。RunComplete 的唯一性是下一章的主题。


下一章:02-concurrency.md —— 当同一个会话被并发提交和取消时,上面这个干净的循环要套上多么繁复的并发外壳。