跳到主要内容

Mastra — 图式 Workflow 引擎

本章讲 agent 之外的另一条主线:workflow。如果说 agent 是"让模型自己决定下一步",workflow 就是"你画好步骤图,框架按图执行"。读完你能讲清两者何时各用、以及 suspend/resume 怎么实现人在环。

1. 这一章解决的问题

Agent 的自主循环很强,但有时你不想让模型决定流程。比如一个发薪流程:校验 → 计算 → 经理审批 → 打款。这里的步骤是确定的、顺序是确定的,你要的是可控、可审计、可暂停,不是"让模型即兴发挥"。

Workflow 就是为此:用代码把步骤连成一张图,框架负责按图执行、传数据、必要时暂停等人工。

Agent vs Workflow 一句话:

AgentWorkflow
谁决定下一步模型你的代码(图结构)
适合开放式任务、对话确定性多步流程
控制流隐式(模型推理)显式(then/branch/parallel)

有意思的是,Mastra 把这两者统一在同一套执行引擎上——agentic loop 本身就是用 workflow 实现的(见 01-agent-loop.md §7)。

2. 两个原语:Step 和 Workflow

  • Step = 一个步骤。用 createStep(...)(packages/core/src/workflows/workflow.ts:206 起的多个重载)定义,带 inputSchema/outputSchema 和一个 execute 函数。
  • Workflow = 把 step 连起来的图。

createStep 的强大之处:它能把多种东西包成 step(看 workflow.ts:337createStep 分发):

  • 普通参数 → createStepFromParams(workflow.ts:369)
  • 一个 AgentcreateStepFromAgent(workflow.ts:437)——可以把整个 agent 当一步塞进 workflow!
  • 一个 ToolcreateStepFromTool(workflow.ts:607)
  • 一个 Processor → createStepFromProcessor(workflow.ts:660)

直觉: step 是乐高积木,agent / tool / 函数都能做成同一种积木,于是能自由拼装。

教学示例,体会拼装(简化):

import { createWorkflow, createStep } from '@mastra/core/workflows';

// 示意,非源码:展示 workflow 的控制流拼装
const validate = createStep({ id: 'validate', /* schema + execute */ });
const compute = createStep({ id: 'compute', /* ... */ });
const payAgent = createStep(myPayoutAgent); // 把一个 agent 当一步

const payroll = createWorkflow({ id: 'payroll' })
.then(validate) // 顺序
.then(compute)
.then(payAgent)
.commit(); // 收尾,定型这张图
// 重点看:then 串顺序,commit() 把图定型;每步的 output 流向下一步的 input

3. 控制流:六个方法画出任意图

Workflow 构建器(packages/core/src/workflows/workflow.ts)提供这些链式方法画图:

方法作用源码行
then(step)顺序:接下一步workflow.ts:1688
branch([...])条件分支:按条件走不同步骤workflow.ts:2055
parallel([...])并行:同时跑多步,都完成再继续workflow.ts:1992
dowhile / dountil循环:满足/直到条件workflow.ts:2111 / 2159
foreach遍历:对数组每项跑同一步workflow.ts:2207
map(...)数据映射:在步骤间改造数据形状workflow.ts:1823
sleep(duration)定时等待workflow.ts:1733
waitForEvent(...)等外部事件workflow.ts:1808
commit()定型这张图(收尾必调)workflow.ts:2276

这套方法对应 README 里说的".then() / .branch() / .parallel() 直观控制流语法"。

一张控制流的图:

start
│ then

validate
│ branch
┌──┴───────────┐
▼ 金额>1万 ▼ 否则
经理审批 直接计算
(suspend) │
│ resume │
└──────┬────────┘
│ then

打款 ──→ 完成

怎么读: then 是直线,branch 是岔路,审批步骤用 suspend 把流程冻在那里等人,人批了用 resume 解冻继续。

4. suspend / resume:人在环的核心

Workflow 最有价值的特性是可暂停、可恢复。一个 step 的 execute 可以调 suspend,把流程停住;之后你用 resume 带着补充数据(比如"经理批准了")让它从断点继续。

看 step 创建时怎么接 suspend/resume(createStepFromTool,workflow.ts:607 起):execute 的上下文里带 suspendresumeData(workflow.ts:629-647),step 还能声明 suspendSchema/resumeSchema(workflow.ts:620-621)——即"暂停时存什么形状的数据、恢复时收什么形状的数据"。

它为什么能停下来再恢复? 因为执行状态被序列化进 storage。这正是 README 说的"Mastra 用 storage 记住执行状态,所以你能无限期暂停、再从原处恢复"。这也解释了为什么 §01 里 agentic loop(本身是 workflow)能支持"工具调用等审批"——requireToolApproval 就是 suspend 在工具层面的应用。

教学示例,体会暂停(简化):

// 示意,非源码:一个需要人工审批的 step
const approval = createStep({
id: 'manager-approval',
execute: async ({ inputData, suspend, resumeData }) => {
if (!resumeData) {
// 第一次进来:没有恢复数据 → 暂停,把待审信息存起来
return await suspend({ amount: inputData.amount }); // 流程冻结,存进 storage
}
// resume 时带着 { approved: true } 再次进来,从这里继续
if (!resumeData.approved) throw new Error('被拒绝');
return { ok: true };
},
});
// 重点看:同一个 execute 跑两次——第一次 suspend 冻结,resume 后带 resumeData 再跑

5. 巧妙之处

  • 万物皆 step。 agent / tool / processor / 普通函数都能 createStep(workflow.ts:337 的分发),于是 agent 能嵌进 workflow、workflow 能调 agent,组合自由。
  • 一套引擎跑两种范式。 agentic loop 复用 workflow 引擎(见 01 章 §7),意味着 suspend/resume、状态序列化只实现一次,两边共享。
  • 状态存 storage = 无限期暂停。 把执行状态持久化,而非留在内存,所以"等人批三天"也没问题——进程重启都不丢。

6. 边界与局限

  • suspend/resume 依赖 storage。 没接 store 就没法持久化暂停状态。
  • 确定性是双刃。 workflow 的好处是可控可审计,代价是"不会随机应变"——开放式任务还是该用 agent。
  • 本文未深入的部分(诚实): 本章讲到构建器 API 层面(then/branch/… 的签名与语义)。真正的步骤调度执行DefaultExecutionEngine(packages/core/src/workflows/default.ts:54)及 evented 引擎里,本章未逐行追其调度循环与快照序列化实现。

7. 横向对比

Workflow 这类"图式 + 可暂停"编排在 agent 框架里是常见取舍:有的框架(如基于图的编排库)把一切都建模成图、agent 只是图里的节点;Mastra 反过来——它两条腿走路(agent 自主 + workflow 确定),但底层把两者落到同一套可 suspend/resume 的引擎上,让"工具调用等审批"这种跨范式特性自然成立。

8. 代码地图

主题文件关键符号
Step / Workflow 构建器packages/core/src/workflows/workflow.tscreateStepcreateStepFromAgentcreateStepFromToolthenbranchparalleldowhiledountilforeachcommit
工厂入口packages/core/src/workflows/create.tscreateWorkflow
执行引擎packages/core/src/workflows/default.tsDefaultExecutionEngineExecutionEngine
suspend/resume 数据packages/core/src/workflows/workflow.tssuspendSchemaresumeSchemaresumeData
工具定义packages/core/src/tools/tool.tsToolcreateTool