跳到主要内容

Coze Studio 工作流构建器 — 架构与原理

30 秒导读: Coze Studio 的「工作流」让你在画布上拖出一串节点(开始 → 大模型 → 选择器 → 结束……),连上线、配好「这个节点的输入取自那个节点的哪个字段」,点运行就能跑。本文讲清这套东西底层到底怎么实现:前端画布(基于 flowgram 自由布局编辑器)画出来的 JSON,怎样被后端编译成一张 cloudwego/eino 的数据流图,再以字段级依赖驱动并发执行,全程支持流式输出、运行中向用户提问(中断/恢复)、节点出错兜底。

1. 这是什么(零基础也能懂)

一句话定义: 一个可视化、低代码的 AI 编排器——你在画布上把「大模型 / 插件 / 代码 / 条件判断 / 循环」等积木拖出来连成图,平台就能按图执行,产出结果。

解决什么问题 / 给谁用:

假设你要做一个「智能客服」:先用大模型判断用户意图,意图是「退款」就查数据库订单、走退款插件;是「闲聊」就直接让大模型回复。如果纯手写代码,你要自己处理调用顺序、并发、流式输出、出错重试、把上一步的某个字段喂给下一步……很烦。

工作流构建器把这些编排逻辑变成拖拽:节点是积木,连线是「先后/数据流向」,每个节点的输入框里选「取自 XX 节点的 YY 字段」。给不想写后端编排代码的产品/运营/开发者用。

它能做什么:

  • 提供 ~50 种内置节点:大模型、插件、子工作流、代码、知识库检索、数据库增删改查、HTTP 请求、条件选择器、循环/批处理、问答、变量聚合等(见 backend/domain/workflow/entity/node_meta.go:259NodeTypeMetas)。
  • 支持流式输出(大模型边生成边吐字)、运行中向用户提问(问答节点会中断、等用户回答再恢复)、节点出错兜底(重试 / 走异常分支 / 返回默认值)。
  • 一份工作流可以被别的工作流当子节点嵌套调用,也可以被 Agent 当工具调用。

用起来什么样:

最小的一条工作流就是「开始 → 大模型 → 结束」。画布存成 JSON(节点列表 + 连线列表),后端拿到后跑一遍,从「开始」节点喂入用户输入,流到「大模型」,再流到「结束」输出。运行时前端能看到每个节点高亮、实时显示输入输出。

一句话直觉/类比:

把它想成可视化的 Unix 管道:每个节点是一个命令,连线是管道 |,但比管道强——管道只能把上一个的 stdout 整个塞给下一个;这里能精确指定「把节点 A 输出的 user.name 字段,接到节点 B 输入的 query 字段」,而且多个上游可以汇入一个下游。

2. 顶层全景(它大概怎么转)

整套系统横跨前端画布后端执行引擎两大块,中间用一份画布 JSONvo.Canvas)做契约。

┌─────────────────────── 前端(浏览器,React + flowgram)──────────────────────┐
│ │
│ 用户拖拽节点 / 连线 / 配字段映射 │
│ │ │
│ ▼ │
│ flowgram 自由布局画布 ──serialize──► 画布 JSON(节点[] + 连线[]) │
│ (workflow/playground) │
│ │ ▲ │
│ │ 保存 / 点「试运行」 │ 运行时事件(节点高亮、流式字) │
└────────────┼──────────────────────────────────┼──────────────────────────────┘
│ HTTP │ SSE / 轮询
┌────────────▼──────────────────────────────────┼──────────────────────────────┐
│ ▼ │ 后端(Go) │
│ ① CanvasToWorkflowSchema │ │
│ 画布 JSON → WorkflowSchema(规整化) │ │
│ │ │ │
│ ▼ │ │
│ ② compose.NewWorkflow │ │
│ WorkflowSchema → eino 数据流图(编译) │ │
│ │ │ │
│ ▼ │ │
│ ③ wf.SyncRun / AsyncRun │ │
│ eino Runner 执行 ──► 节点回调 ──► 事件 ────┘ │
│ (并发 / 流式 / 中断 / 兜底) │
└──────────────────────────────────────────────────────────────────────────────┘

部件一句话职责:

部件干什么在哪
画布 / 节点表单用户拖拽编辑,序列化成 JSONfrontend/packages/workflow/playground/nodes/
vo.Canvas前后端契约:Nodes[] + Edges[] 的 JSON 结构backend/domain/workflow/entity/vo/canvas.go
CanvasToWorkflowSchema把画布 JSON 规整成后端的 WorkflowSchemabackend/domain/workflow/internal/canvas/adaptor/to_schema.go:66
WorkflowSchema引擎内部的「节点 + 连线 + 层级 + 分支」中间表示backend/domain/workflow/internal/schema/workflow_schema.go:35
compose.NewWorkflowWorkflowSchema 编译成 eino 图并 Compile 成 Runnerbackend/domain/workflow/internal/compose/workflow.go:83
节点实现每种节点的真正逻辑(LLM、选择器、代码……)backend/domain/workflow/internal/nodes/<类型>/
nodeRunner给每个节点套上超时/重试/错误兜底/回调的壳backend/domain/workflow/internal/compose/node_runner.go
执行引擎eino 调度器 + 事件处理 + 中断/恢复backend/domain/workflow/internal/execute/

主线走一遍(高层、不进代码):

  1. 用户在画布上编辑,前端把图序列化成 vo.Canvas JSON,保存到库里(wfEntity.Canvas 是一个 JSON 字符串)。
  2. 点「试运行」或被调用时,后端 SyncExecute 把 JSON 反序列化成 vo.Canvas,调 CanvasToWorkflowSchema 规整成 WorkflowSchemabackend/domain/workflow/service/executable_impl.go:79-87)。
  3. compose.NewWorkflow 遍历 schema 的节点和连线,把每个节点变成一个 eino Lambda,把「字段映射」翻译成 eino 的输入边,编译出一个 Runner
  4. wf.SyncRun / AsyncRun 启动 eino 执行:eino 按依赖关系并发跑节点,每个节点跑完把输出写进共享 State,下游按字段映射取值。
  5. 执行过程中产生回调事件(节点开始/结束/出错/流式 chunk),汇成事件流推给前端,让画布实时高亮、显示流式文字。

3. 阅读地图(建议顺序)

这套东西的「精华」在编译执行两步,建议按下面顺序读:

  1. 01-canvas-to-schema.md — 先看前端画的 JSON 长什么样、怎样被规整成后端 schema(剪枝孤立节点、端口归一化、批处理自动展开成「父+内层」)。这是「翻译」阶段。
  2. 02-compile-to-eino-graph.md本库最核心的一章。看 schema 怎样编译成 eino 图:字段映射如何变成「直接依赖 / 间接依赖 / 静态值」,数组下钻,子工作流的输入怎么「代理」给父节点。
  3. 03-node-execution.md — 单个节点运行时的通用骨架:四种流式范式(Invoke/Stream/Collect/Transform)、超时、重试、错误兜底三策略、回调转换、输入类型转换。
  4. 04-streaming-interrupt-state.md — 三个贯穿全局的机制:流式是否传播怎么判定、问答节点怎样中断并恢复、检查点序列化、共享 State 里都存了什么。

给 AI agent 的提示: 想改「节点种类」看 node_meta.go + to_schema.go:594RegisterAllNodeAdaptors;想改「依赖/数据流」看 compose/workflow.goresolveDependencies;想改「运行时行为(重试/兜底)」看 compose/node_runner.go;想改「中断/恢复」看 compose/state.go + execute/

4. 边界与局限(诚实)

  • 不支持嵌套的复合节点:循环/批处理体内不能再放循环/批处理。to_schema.go:89 明确 nested inner-workflow is not supported
  • 复合节点的内层不能带自己的连线:内层节点的执行顺序靠字段引用推导,不靠显式连线(to_schema.go:92-94,内层节点若带 Edges 直接报错)。
  • 节点不能引用自己resolveDependenciesfromNode == n 直接报错(compose/workflow.go:657)。
  • 代码节点的安全风险:README 自己点名,代码节点的 Python 执行环境、SSRF 等是公网部署的风险点(见仓库 README「Security risks」)。
  • 这是把上游商用「扣子」核心引擎开源的版本,部分高级节点/能力可能与云端不完全一致。

5. 横向对比(同 shelf 兄弟)

维度Coze Studio(本库)典型「代码优先」Agent 框架
编排方式可视化画布,低代码/无代码写 Python/TS 代码、声明 DAG
执行底座复用 cloudwego/eino 的 compose 引擎各家自研调度
数据传递字段级映射(A.x → B.y),非整对象传递多为整对象或全局 state
人在环路一等公民:问答节点中断 + 检查点恢复通常要自己实现

Coze 的差异点是把通用 LLM 编排框架 eino 包了一层「可视化工作流」语义:节点种类、字段映射、批处理展开、问答中断这些都是 Coze 自己加的,底层的图调度/流式则交给 eino。

6. 代码地图(导航索引)

主题文件关键符号
节点类型与元数据backend/domain/workflow/entity/node_meta.goNodeTypeNodeTypeMetasExecutableMeta
画布→schemabackend/domain/workflow/internal/canvas/adaptor/to_schema.goCanvasToWorkflowSchemaNodeToNodeSchemaRegisterAllNodeAdaptorsparseBatchMode
schema 中间表示backend/domain/workflow/internal/schema/workflow_schema.goWorkflowSchemaConnectiondoRequireStreaming
节点 schemabackend/domain/workflow/internal/schema/node_schema.goNodeSchemaExceptionConfigStreamConfig
编译成 eino 图backend/domain/workflow/internal/compose/workflow.goNewWorkflowaddNodeInternalresolveDependenciesarrayDrillDown
节点实例化backend/domain/workflow/internal/compose/node_builder.goNewbuildSubWorkflow
节点运行壳backend/domain/workflow/internal/compose/node_runner.gotoNodenodeRunnerinvokeonError
分支backend/domain/workflow/internal/schema/branch_schema.goBuildBranchesGetFullBranch
共享状态/中断backend/domain/workflow/internal/compose/state.goStateGenStatestatePreHandlerGenStateModifierByEventType
执行入口backend/domain/workflow/service/executable_impl.goSyncExecuteAsyncExecuteStreamExecuteAsyncResume
前端画布frontend/packages/workflow/playground/services / entities(flowgram free-layout)