跳到主要内容

第 3 章 · 单个节点运行时的通用骨架

本章讲什么: 节点的具体逻辑各不相同(LLM 调模型、选择器算条件、代码跑脚本),但它们外面都套着同一个壳 nodeRunner,统一处理输入转换、超时、重试、错误兜底、回调。这一章讲这个壳。

3.1 节点只需实现一个接口

一个节点的逻辑作者只要实现 8 个接口之一(internal/nodes/node.go:31 起),按「输入/输出是不是流式」分四组、每组有「带 NodeOption / 不带」两版:

接口输入输出谁实现
InvokableNode非流非流大多数节点(插件、选择器…)
StreamableNode(WOpt)非流LLM(边生成边吐)
CollectableNode非流目前无(设计预留)
TransformableNode变量聚合

eino 会根据上下游是否流式,自动选用节点的哪种方法(这套「自动适配流式范式」的能力来自 eino 本身)。比如选择器只实现了 Invokeselector/selector.go:120)。

3.2 套壳:toNode

编译时,toNodenode_runner.go:205)拿到节点实例,反射它实现了哪几个接口,再用 compose.AnyLambda 把它包成一个 eino Lambda(node_runner.go:528)。包之前先收集这个节点的「附加能力」:

  • 实现了 CallbackInputConverted / CallbackOutputConverted → 注册输入/输出回调转换器(把内部 map 转成适合 UI 显示的结构)。
  • 实现了 Initializer → 注册初始化钩子。

3.3 运行壳干的事(以 Invoke 为例)

nodeRunConfig.invoke()node_runner.go:301)返回的闭包,就是 eino 实际调用的函数。它把一次节点执行编排成固定流水线:

init 钩子 ──► preProcess(类型转换/填零)──► onStart 回调 ──► 真正的 Invoke ──► postProcess ──► onEnd 回调
│ 出错

onError(抛错/异常分支/默认值)

各步对应:

  • preProcess:先做输入类型转换 preTypeConverternode_runner.go:911,把 JSON 来的字符串/数字按声明类型规整),再剔除流式完成标记,若节点 PreFillZero 还给缺失字段填零值(node_runner.go:94-100)。
  • onStart / onEnd:触发 eino 回调,若有转换器就先转成 UI 友好结构再发(node_runner.go:554:777)。这些回调最终汇成推给前端的事件。
  • postProcess:若节点 PostFillNil,给输出缺失字段补 nil(node_runner.go:102-105)。

哪些节点要填零/补 nil,由 node_meta.go 里每个节点的 ExecutableMetaPreFillZero / PostFillNil)声明。

3.4 超时与重试

壳在真正调 Invoke 时包了超时和重试。超时由 newNodeRunnernode_runner.go:542)用 context.WithTimeout 实现,时长来自节点的 ExceptionConfig.TimeoutMS。重试在 invokenode_runner.go:616)里循环:

// node_runner.go:629 —— 出错时:若是「中断重跑」错误直接返回;否则在重试上限内自增计数重试
if err != nil {
if _, ok := compose.IsInterruptRerunError(err); ok {
r.interrupted = true
return nil, err
}
if r.maxRetry > n {
n++
...
continue
}
return nil, err
}

注意它区分「中断」和「真失败」:问答节点等用户输入时抛的是 InterruptRerunError,这不是失败,不能重试,要原样上抛让引擎挂起(见 04 章)。

3.5 精华:错误三策略

用户能在节点上配「出错了怎么办」,对应 onErrornode_runner.go:820)的三种 errProcessType

策略行为给下游什么
Throw(默认)直接抛错,整个工作流失败
ReturnDefaultData吞掉错误,返回用户预设的默认输出默认数据 + errorBody + isSuccess=false
ExceptionBranch吞掉错误,但走「异常分支」空输出 + errorBody,且让分支判定走 branch_error

后两种策略下,节点输出里会被塞一个 isSuccess 标志(成功时 onEndtruenode_runner.go:778)。这个标志正是异常分支的判定依据——分支条件 GetExceptionBranchbranch_schema.go:103)就是看 isSuccess 是不是 false 来决定走异常口还是正常口。

# 示意,非源码:异常分支怎么判
def pick_branch(node_output, exception_targets, default_targets):
if node_output.get("isSuccess") is False: # 节点出错且配了异常分支
return exception_targets # 走 branch_error 口
return default_targets # 正常往下

巧妙之处:用数据通道传错误信号。 出错信息不是靠抛异常中断,而是被转成普通的节点输出(带 isSuccess / errorBody),顺着正常数据流走给下游。这样「节点失败」就能像「条件分支」一样被画布编排,用户能画一条「失败时走这边」的线。

3.6 流式版本

stream / collect / transformnode_runner.go:363/418/472)是同一套流水线的流式变体,差别在:错误兜底要把默认输出包成单元素流(node_runner.go:383)、isSuccess 标志要 merge 进输出流(node_runner.go:797)、回调用流式版(OnEndWithStreamOutput)。重试时输入流要先 Copy(流只能读一次,node_runner.go:692)。

代码地图

主题文件符号
节点接口(8 种)internal/nodes/node.goInvokableNodeStreamableNodeWOptTransformableNode
套壳internal/compose/node_runner.gotoNodenodeRunConfig.toNode
Invoke 流水线internal/compose/node_runner.gonodeRunConfig.invokenodeRunner
输入类型转换internal/compose/node_runner.gopreTypeConverterkeyFinishedMarkerTrimmer
超时/重试internal/compose/node_runner.gonewNodeRunnernodeRunner.invoke
错误三策略internal/compose/node_runner.goonError
异常分支判定internal/schema/branch_schema.goGetExceptionBranch
节点行为元数据entity/node_meta.goExecutableMetaPreFillZero/PostFillNil/MayUseChatModel
示例节点internal/nodes/selector/selector.goSelector.InvokeConfig.BuildBranch