跳到主要内容

第 2 章 · WorkflowSchema 怎样编译成 eino 数据流图

本章讲什么: 这是本库的核心compose.NewWorkflowWorkflowSchema 编译成一张 cloudwego/einocompose.Workflow 数据流图。重点是它怎样把画布上「节点 B 的某字段取自节点 A 的某字段」这种映射,翻译成 eino 的字段级输入边。

2.1 先建直觉:为什么不是简单的 DAG

最朴素的做法是:连线 = 依赖,A→B 的线表示「A 跑完才跑 B,把 A 的整个输出给 B」。但工作流需要更细的粒度

  • B 的 query 字段取自 A 的 user.name,B 的 topK 字段是个写死的常量 5,B 的 history 取自更早的节点 C——一个节点的输入可能来自多个上游的不同字段,还混着静态值
  • 有时「数据来自 A 的字段」但画布上 A 和 B 之间并没有连线(用户只在字段里引用了,没拉线)——这叫间接依赖:要拿 A 的数据,但不要求紧挨在 A 后面执行。

所以编译器要把每个节点的输入拆成几类,分别用 eino 的不同 API 表达。

2.2 一个节点 = 一个 Lambda + 一组输入边

eino 的 compose.Workflow 模型是:每个节点是一个 WorkflowNode,通过 AddInput(fromNode, fieldMappings...) 声明「我的某些字段来自某个上游节点的某些字段」。Coze 的编译就是把 schema 翻译成这些调用。

核心入口 NewWorkflowcompose/workflow.go:83):

// compose/workflow.go:87 —— 基于 eino 的 compose.NewWorkflow 建图,泛型参数是 map[string]any
wf := &Workflow{
workflow: compose.NewWorkflow[map[string]any, map[string]any](compose.WithGenLocalState(GenState())),
...
}

注意泛型:所有节点的输入输出统一是 map[string]any。这是 Coze 的一个关键设计——节点间传的是「字段名→值」的 map,字段映射就是在两个 map 之间搬字段。GenState() 是共享状态(见 04 章)。

建图分两步(workflow.go:117-141):先加所有复合节点(带内层子工作流),再加其余普通节点;遇到 Exit 节点记下它的「终止策略」(返回变量 / 返回文本)。最后 Compile 出一个 Runner

2.3 核心:resolveDependencies——把字段映射分类

每加一个节点,都先调 resolveDependenciesworkflow.go:625)解析它的输入来源 InputSources。每条来源 vo.FieldInfo 要么是静态值Source.Val),要么是引用Source.Ref,指向某节点的某字段路径)。编译器把它们分进这几桶:

含义用什么 eino API
inputs直接依赖:引用某上游字段,且画布上有连线AddInput(from, fieldMappings...)
inputsFull直接依赖且取整个输出(无字段路径)AddInput(from)
inputsNoDirectDependency间接依赖:引用某上游字段,但没连线AddInputWithOptions(..., WithNoDirectDependency())
dependencies只连了线、不取数据(纯排序约束)AddDependency(from)
staticValues写死的常量SetStaticValue(path, val)
variableInfos引用全局/应用/系统变量留到 state 预处理器(见 04 章)

判定「直接 vs 间接」的关键,是看这条引用的源节点在不在连线表里workflow.go:672-692):

// compose/workflow.go:673 —— 引用的源节点若在「指向我的连线」集合里,就是直接依赖;否则是间接依赖
if _, ok := connMap[fromNode]; ok { // direct dependency
inputs[fromNode] = append(inputs[fromNode],
compose.MapFieldPaths(swp.Source.Ref.FromPath, swp.Path))
} else { // indirect dependency
inputsNoDirectDependency[fromNode] = append(inputsNoDirectDependency[fromNode],
compose.MapFieldPaths(swp.Source.Ref.FromPath, swp.Path))
}

为什么要分直接/间接? eino 里「直接依赖」既传数据又定执行先后;「间接依赖」只在该上游恰好跑过时取它的数据、但不强制等它。这样画布上「拉了线」和「只引用了字段」语义不同:拉线 = 我要等你;只引用 = 我用你的数据但不一定排在你后面(典型场景:分支里只有一条会跑的节点,下游用 WithNoDirectDependency 取任意命中那条的数据)。

最后一段(workflow.go:738-767)处理「只连了线、字段映射里却没提到它」的上游:这种纯连线变成 AddDependency,只排序不传数据。

2.4 用一个例子串起来

假设画布是:开始 ──► 大模型 ──► 结束,且「结束」的输出字段 answer 配成「取自 大模型 的 output 字段」。

连线(执行先后) 字段映射(数据流)
开始(START) ───────────────► 大模型 ───────────────► 结束
output ──映射──► answer

编译后,「结束」节点会得到:

  • inputs[大模型] = [ output → answer ](直接依赖:既等大模型跑完,又把它的 output 搬到自己的 answer

对应 addNodeInternalworkflow.go:269)里:

// compose/workflow.go:269 —— 把「直接依赖」桶里的字段映射注册成 eino 输入边
for fromNodeKey, fieldMappings := range deps.inputs {
wNode.AddInput(string(fromNodeKey), fieldMappings...)
}

整图编译完,Compileworkflow.go:318)给入口节点接上 eino 的 START、给 End 接上 Exit 节点,再调底层 compose.Compile 得到可执行 Runner。

2.5 精华一:数组下钻(arrayDrillDown)

字段映射的「源路径」如果穿过了数组会怎样?比如映射「取自 A 的 list.name」,但 list 是个数组——list.name 在类型上讲不通。Coze 的约定是自动取数组第一个元素list[0].name

arrayDrillDownworkflow.go:543)在编译期沿源路径走类型信息,找出哪些段是数组,给这条字段映射装一个自定义提取器:运行时遇到数组就取 arr[0] 再继续往下钻(workflow.go:578-608)。

# 示意,非源码:数组下钻提取器的核心想法
def extract(value, from_path, array_segments):
for i, key in enumerate(from_path):
value = value[key] # 普通字段:直接取
if i in array_segments: # 这一段是数组
value = value[0] # 自动取第一个元素
return value
# 重点看:路径上每遇到一个数组,就默默 [0] 一下

这让用户在画布上把「批处理输出列表」的某字段接到下游时不必手动写 [0],省心但也是个隐藏语义(边界:空数组会报错,workflow.go:598)。

2.6 精华二:复合节点的内层子工作流

循环/批处理是复合节点:一个父节点 + 一个内层子工作流。getInnerWorkflowworkflow.go:331)为内层单独建一张 eino 图,难点是内层节点常要引用父层(甚至父层之外)节点的输出

Coze 的解法是让内层图的 START 节点当代理:内层某节点要父层 X 的字段,就让内层 START 提供一个以父节点 key 打头的字段(workflow.go:704-709),同时把这笔「需求」记进 inputsForParent,由父节点在外层图里替内层把数据取齐、打包,再灌进内层 START。

外层图: ... ──► [Loop 父节点] ──► ...
│ 把内层需要的父层字段打包进 START

内层图: START(代理父层字段) ──► 内层节点A ──► 内层节点B ──► END(回传父节点输出)

字段路径用 # 拼接避免冲突(joinFieldPathworkflow.go:783)。父节点的输出则由 resolveDependenciesAsParentworkflow.go:787)从内层 END 收集。

这套「代理」机制让复合节点对外表现得像一个普通节点(有自己的输入输出字段映射),对内又是一张完整的子图,复用了同一套编译逻辑。

2.7 子工作流节点

「子工作流」节点(引用另一份已发布工作流)走的是另一条路:buildSubWorkflownode_builder.go:102)直接对子工作流的 schema 再调一次 NewWorkflow,把得到的 Runner 包进 subworkflow.SubWorkflow,当成一个普通的可调用节点嵌进父图。递归复用整个编译器。

代码地图

主题文件符号
建图入口internal/compose/workflow.goNewWorkflowWorkflow.Compile
加节点internal/compose/workflow.goaddNodeInternalAddCompositeNode
依赖分类(核心)internal/compose/workflow.goresolveDependenciesdependencyInfo
数组下钻internal/compose/workflow.goarrayDrillDown
内层子工作流internal/compose/workflow.gogetInnerWorkflowresolveDependenciesAsParent
节点实例化internal/compose/node_builder.goNewbuildSubWorkflow
字段路径拼接internal/compose/workflow.gojoinFieldPath(分隔符 #