跳到主要内容

第 4 章 · 贯穿全局的三个机制:流式、中断/恢复、共享 State

本章讲什么: 前三章是「编译 + 单节点执行」。这一章讲三个跨节点、跨整图的机制:① 流式要不要传播怎么判定;② 问答节点怎样中断、等用户回答再恢复;③ 一份共享 State 里都存了什么、起什么作用。

4.1 共享 State:节点之间的「公告板」

所有节点共享一个 Statecompose/state.go:39),由 eino 的 WithGenLocalState 注入(GenStatestate.go:251)。它是节点间协作的公告板,存了:

字段存什么给谁用
ExecutedNodes哪些节点已经跑过间接依赖判定、恢复
Inputs节点输入(中断时持久化)恢复后还原输入
ResumeData用户对「问答」的回答节点恢复时读取
SourceInfos输入字段的流式来源信息流式判定
NestedWorkflowStates循环/批处理的中间状态复合节点恢复
IntermediateResult中间结果(如分支选了哪条)动态流式判定

节点跑完,statePostHandlerstate.go:509)会把该节点标记进 ExecutedNodes。这些处理器(pre/post handler)是 eino 提供的钩子,在节点执行前后改写输入输出 + 读写 State。

4.2 变量解析:在 pre-handler 里偷偷注入

02 章 提过:引用「全局变量 / 应用变量 / 系统变量」的字段,编译时不当成依赖,而是攒进 variableInfos。它们在哪兑现?

答案是 statePreHandlerForVarsstate.go:357)——节点执行,这个 pre-handler 去变量存储里把变量值取出来,塞进节点输入 map 的对应字段:

// compose/state.go:399 —— 按变量类型(系统/用户/应用)从对应存储取值,写进输入
case vo.GlobalSystem, vo.GlobalUser:
v, err = varStoreHandler.Get(ctx, *input.Source.Ref.VariableType, input.Source.Ref.FromPath, opts...)
...
nodes.SetMapValue(out, input.Path, v)

巧妙之处:变量不进数据流图。 把「读全局变量」做成执行前的旁路注入,而不是图里的一条边——这样变量来源(数据库、应用配置)变化不影响图结构,节点也无需知道值从哪来。

4.3 流式传播:一次图遍历定生死

并非所有工作流都要流式。判定逻辑 doRequireStreamingworkflow_schema.go:274)很直接:

  1. 标出所有生产者CanGeneratesStream,能真正产流,如 LLM)和消费者RequireStreamingInput,偏好流式输入的节点)。
  2. 若任一方为空 → 不需要流式。
  3. 否则按字段映射建数据流邻接图,从每个生产者 BFS,只要能到达某个消费者,就需要流式
# 示意,非源码:流式判定的核心
def require_streaming(producers, consumers, adjacency):
for p in producers: # 从每个「能产流」的节点出发
for node in bfs(p, adjacency): # 沿数据流向下走
if node in consumers: # 走到了「要吃流」的节点
return True # 全图启用流式
return False

判定结果存进 requireStreaming,决定 AsyncRunStream 还是 Invokeworkflow.go:160-171)。这一步在编译期算好,避免运行时反复判断。

哪些节点标了流式特性,看 node_meta.goExecutableMeta.IncrementalOutput(如 ExitOutputEmitter 把流真正吐给用户)。

4.4 中断与恢复:让工作流「停下来等人」

这是工作流构建器的杀手锏:运行到「问答节点」时,工作流要停下、把问题推给用户、等用户回答、再从断点继续。靠 eino 的中断 + 检查点实现。

中断

PersistInputOnInterrupt 的节点(问答、循环、批处理,见 node_meta.go)在中断时要保住自己的输入。statePreHandlerstate.go:271)专门处理:第一次进来把输入存进 State.Inputs,恢复时再读回来。

问答节点执行时若需要提问,会抛出 eino 的 InterruptRerunError03 章 讲过,nodeRunner.invoke 识别这个错误后设 interrupted=true 并原样上抛(node_runner.go:630),不当失败、不重试。eino 收到后把整张图的状态序列化进检查点存起来(编译时若 requireCheckpoint 为真就挂了 WithCheckPointStoreworkflow.go:144)。

能序列化整图状态的前提是 State 里所有类型都注册了序列化器——这就是 state.go:51 那一长串 RegisterSerializableType 的用途。

恢复

用户回答后走 AsyncResumeexecutable_impl.go:789)。关键是把用户的回答喂回对应节点:GenStateModifierByEventTypestate.go:719)造一个状态修改器,在恢复执行前把回答写进 State.ResumeData[nodeKey]

// compose/state.go:723 —— 恢复时把用户回答塞进 State,让目标节点能读到
stateModifier = func(ctx context.Context, path compose.NodePath, state any) (err error) {
state.(*State).ResumeData[nodeKey] = resumeData
return nil
}

节点恢复执行时通过 GetAndClearResumeDatastate.go:233,实现了 InterruptEventStore 接口)取走回答并清空。整图从检查点反序列化回来,从断点继续跑。

跑到问答节点


抛 InterruptRerunError ──► eino 把整图 State 序列化进检查点 ──► 推「提问」事件给用户
⋮(工作流挂起,等待)
用户回答 ──► AsyncResume ──► StateModifier 写 ResumeData ──► 从检查点恢复 ──► 节点取回答继续

4.5 执行入口与事件

三个对外入口在 executable_impl.goSyncExecute(同步,等结果)、AsyncExecute(异步,后台跑)、StreamExecute(流式,返回事件流)。它们的骨架一致(executable_impl.go:52 起):取工作流实体 → 反序列化画布 → CanvasToWorkflowSchemaNewWorkflow → 转换输入 → NewWorkflowRunner(...).Prepare 准备执行上下文和事件通道 → SyncRun/AsyncRun

执行期间节点回调汇成事件,WorkflowRunner.Prepareworkflow_run.go:107)起一个 goroutine 用 HandleExecuteEvent 消费事件通道,处理中断事件的存取(event_handle.goSaveInterruptEvents / PopFirstInterruptEvent),并把最后一个事件回传(workflow_run.go:293-312)。前端据此实时更新画布。

4.6 边界

  • 中断/恢复依赖检查点存储,只有 schema 里确实含会中断的节点(requireCheckPoint=true)才挂检查点,避免无谓开销。
  • 复合节点(循环/批处理)的中断恢复更复杂,靠 NestedWorkflowStates 单独存内层进度。
  • 流式判定是「全图开关」:一旦判定需要流式,整图走 Stream,非流式节点由 eino 自动适配。

代码地图

主题文件符号
共享状态结构internal/compose/state.goStateGenState
序列化注册internal/compose/state.goRegisterSerializableType(init 块)
执行前后钩子internal/compose/state.gostatePreHandlerstatePostHandler
变量旁路注入internal/compose/state.gostatePreHandlerForVars
恢复数据写入internal/compose/state.goGenStateModifierByEventTypeGetAndClearResumeData
流式判定internal/schema/workflow_schema.godoRequireStreamingRequireStreaming
流式特性声明entity/node_meta.goExecutableMeta.IncrementalOutputStreamConfig
执行入口backend/domain/workflow/service/executable_impl.goSyncExecuteAsyncExecuteStreamExecuteAsyncResume
事件与中断处理internal/compose/workflow_run.gointernal/execute/event_handle.goWorkflowRunner.PrepareHandleExecuteEvent
中断存储接口internal/nodes/interrupt.goInterruptEventStore