跳到主要内容

第 2 章 · 会话并发与终结事件唯一性

这一章讲 Crush 代码里注释最长、最反直觉的部分。如果你只想用 Crush,可以跳过;如果你想写一个生产级的 agent 服务,这章是精华——它回答「同一个会话被并发踩踏时,如何不漏发、不重发、不挂死终结事件」。

2.1 先讲清楚:难在哪

一个会话不是「一次提交跑完再提交下一次」那么干净。现实里:

  • 用户连按两次回车,两条 prompt 几乎同时提交给同一个会话。
  • crush run(非交互)对一个可能正忙的会话提交,它不看实时消息、只死等一个 RunComplete 才退出。
  • 用户随时按 Esc 取消——而取消可能落在:运行中、刚提交还没开始、已排队、已经结束正在交接给下一个排队项……任何一个微妙的时间窗里。

核心契约:每个带 RunID 的提交,必须恰好收到一个 RunComplete(成功/失败/取消都算)。漏发 → 客户端永久挂死;重发或发错轮的 → 客户端读到错误的结果或提前退出。

Crush 的解法围绕三个会话级状态:

状态数据结构含义
激活中activeRequests: Map[sessionID]CancelFunc这个会话正有一轮在跑;Cancel 调它的 cancel func
已接受未激活acceptedRuns: Map[sessionID]int + 单调 acceptSeqGen已派发到 goroutine、但还没完成「派发握手」的 prompt 计数
待取消cancelMark: Map[sessionID]uint64一个 accept 序号「高水位」:序号 ≤ 它的已接受 prompt 都算被取消

以上字段都在 sessionAgent 结构体上(internal/agent/agent.go:153-207),每个字段的注释都长达一段——这本身说明了问题之微妙。

2.2 三态握手:accepted → (取消 / 排队 / 激活)

核心是 SessionAgent.Run 开头的一段(internal/agent/agent.go:550 起)。当一个调用带着 Accepted 句柄(fire-and-forget 的派发路径)进来时,它在 per-session 调度锁(dispatchMu)下做一次三选一的转移:

带 Accepted 句柄的 Run 进来

┌─────────┴─────────┐ 持有 per-session dispatchMu
▼ ▼
canceledBySeq(seq)? (否)IsSessionBusy?
序号 ≤ cancelMark 会话已有别的轮在跑?
│是 │是 │否
▼ ▼ ▼
① 取消-on-entry ② 排队 ③ 成为激活轮
关掉 accept、解锁、 enqueueCall、 注册 activeRequests
persistCanceledTurn、 关 accept、 的 cancel func、关
发 Cancelled RunComplete 解锁、return accept、解锁、开跑

为什么必须在锁里做这个三选一? 因为 Cancel 也抢同一把锁。这样保证:任何一次 Cancel 都至少能观察到三者之一——一个 cancel 标记、一个 activeRequests 条目、或一个它能清掉的队列条目。绝不会出现「cancel 来的时候,这个 prompt 恰好在三个状态的缝隙里、谁都没接住它」。这段逻辑见 internal/agent/agent.go:568-640,Cancel 端见 internal/agent/agent.go:1894

2.3 取消的精确性:用「序号 ≤ 高水位」而非布尔标记

最容易写错的地方:一个会话同时有「正在跑的 A」和「刚提交还没开始的 B」,用户按一次 Esc——应该取消谁?

直觉做法「设一个 canceled=true 布尔」会出错:它会误伤那个在 cancel 之后才提交的 prompt C。Crush 的解法是单调递增的 accept 序号 + 高水位标记:

每次 BeginAccepted 给 prompt 盖一个递增序号: A=1 B=2 C=3
Cancel 发生时,把 cancelMark 抬到「当前已发出的最大序号」: mark=2

之后判定某 prompt 是否被这次 cancel 覆盖:
seq ≤ mark → 被取消 (A=1✓ B=2✓)
seq > mark → 不被取消 (C=3✗,它在 cancel 之后才来,序号更大)

判定函数极简(canceledBySeq,internal/agent/agent.go:477):

func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
mark, ok := a.cancelMark.Get(sessionID)
if !ok || mark == 0 {
return false
}
return seq == 0 || seq <= mark // seq==0 = 进程内调用,无 accept 追踪,任何标记都覆盖它
}

Cancel 抬高水位用的是 max(internal/agent/agent.go:1941),让重复 cancel 幂等、又让后一次 cancel 能把覆盖范围扩展到「这期间新接受的 prompt」。mark 不被消费(不清零),所以被它覆盖的每个兄弟 prompt 都能读到同一个 cancel,而更晚的 prompt 一律忽略它。

2.4 排队与续跑:一个 prompt 一条命的生命周期

会话忙时,新提交进队列(enqueueCall,internal/agent/agent.go:347)。队列在两个地方被「抽干」:

  1. 每个流式步骤前(drainQueueForStep,internal/agent/agent.go:383):被 cancel 覆盖的丢弃;没带 RunID 的「折叠」进当前轮(作为追加用户消息);带 RunID 的留在队列里,各自作为独立一轮跑。
  2. 当前轮结束、交接下一个排队项时(internal/agent/agent.go:1200 起):同样按序号过滤,然后递归调 a.Run 跑下一个。

这里的设计哲学是 internal/agent/agent.go:1249 注释点破的:「每个带 RunID 的提交都有自己独立的生命周期」。所以一轮结束、要交接给一个不同 RunID 的排队项时,这一轮必须当场发出自己的 RunComplete——不能指望递归那一轮(它带的是别的 RunID)替它发:

// internal/agent/agent.go:1258 —— outerOwesRunComplete 判定(简化引用)
outerOwesRunComplete := call.RunID != ""
if outerOwesRunComplete {
for _, q := range queuedMessages {
if q.RunID == call.RunID { // 例外:摘要续跑会把同一个 call(同 RunID)重新入队
outerOwesRunComplete = false
break
}
}
}

那个例外(同 RunID 重新入队)正是「上下文超长 → 摘要 → 续跑原 prompt」的路径,第 4 章会讲。

2.5 「必须送达」的取消通知:不让等待者挂死

一个排队的 prompt 如果被丢弃(被 cancel 覆盖、或被 ClearQueue 清掉),而它带着 RunID,就有人在死等它的 RunComplete。Crush 专门为这种「从未运行就被丢弃」的 prompt 补发一个 Cancelled: true 的终结事件:

// internal/agent/agent.go:419 —— publishCanceledQueueDrops(节选)
// 用脱离 run context 的有界 context,保证 must-deliver 发布即使触发丢弃的
// run context 已被取消也能送达
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, d := range drops {
if d.RunID == "" { continue } // 没 RunID 的本来就没人等,静默丢弃
a.publishRunComplete(ctx, d, notify.RunComplete{
SessionID: d.SessionID, RunID: d.RunID, Cancelled: true,
})
}

注意 publishRunComplete(internal/agent/agent.go:523)用的是 PublishMustDeliver——有界阻塞语义:即使订阅者 channel 暂时满了也不会静默丢这个权威终结事件。这是「不挂死非交互客户端」的最后保险。

2.6 合并重试:Coordinator 把「未授权→重认证→重试」缩成一个事件

上面都是 SessionAgent 层。再往上,Coordinator 还要处理 provider 报 401(未授权)需要刷新 token 再重试的情况。问题:第一次尝试已经发了一个「失败」的 RunComplete,重试又会发一个「成功」的——crush run 看到第一个就退出了。

解法:Coordinator.run 给每次尝试装一个 OnComplete 钩子,把每次尝试的终结事件吞掉、只留最新的,等重试链全部结束后再发一次:

// internal/agent/coordinator.go:259 —— 合并闭包
var latest notify.RunComplete; var hasLatest bool
onComplete := func(rc notify.RunComplete) { latest = rc; hasLatest = true }
// ... 跑重试链 ...
if hasLatest && c.runComplete != nil {
c.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, latest)
MarkRunCompletePublished(ctx) // 告诉派发层别再补发重复的
}

这就是 SessionAgentCall.OnComplete 钩子存在的理由(internal/agent/agent.go:96-108 的长注释)——它让 Coordinator接管终结事件的发布路径,把多次尝试合并成一个用户可见的结果。

2.7 一张图收束:RunComplete 的所有出口

一个带 RunID 的提交,可能从这些出口拿到它(唯一的)RunComplete:

┌─ 取消-on-entry → agent.go:604 Cancelled
├─ 排队后被 cancel 丢弃 → publishCanceledQueueDrops Cancelled
├─ 正常跑完(defer) → agent.go:774 成功/失败
├─ 交接给不同 RunID 时 → agent.go:1286 当场补发本轮的
└─ Coordinator 合并重试 → coordinator.go:306 最终结果

所有出口共用 publishRunComplete(agent.go:523):
有 OnComplete 钩子 → 交给钩子(Coordinator 合并用)
否则 → PublishMustDeliver 到 RunComplete broker

怎么读: 不管这一轮从哪个分支结束,都收敛到同一个 publishRunComplete,而它要么把事件交给上层合并钩子、要么 must-deliver 地发到 broker。「恰好一个」就是靠「每个出口都走它、且互斥地只走一个」来保证的。


下一章:03-tools-and-safety.md —— agent 的手脚:工具怎么执行、权限闸门怎么挡危险操作。