跳到主要内容

Botpress Workflows (SDK) — 架构与原理

30 秒导读: 在 Botpress 里,「workflow」不是你在画布上连线的流程图,而是 SDK 层的一种持久化后台任务。你给它起个名字、定义输入/输出 schema,后端负责持久化它的状态、在它该跑/该续/该超时的时候,把事件推回你的 bot,你写的处理函数则用 acknowledge / setCompleted / setFailed 这几个动作显式地推动它的状态机。这一篇覆盖的是 packages/sdk 里的 workflow 子系统(area: workflow-builder)。


1. 这是什么(零基础也能懂)

  • 一句话定义: workflow 是一个有名字、有输入输出 schema、由后端持久化状态的长时间后台任务;它通过「生命周期事件回调」的方式运行,而不是一个同步函数。

  • 解决什么问题 / 给谁用: 假设你的 bot 要做一件跑很久、可能跨多次唤醒、还可能超时的活——比如「把一个网盘里上千个文件枚举并同步进知识库」「每周一遍历所有 Linear issue 做 lint」。你不能把它塞进一次 HTTP 请求里跑(会超时),也不想自己手搓「跑到一半存进度、下次接着跑」的样板。workflow 就是 Botpress 给这类活提供的托管编排原语

  • 它能做什么:

    • 启动一个命名实例(startNewInstance / createWorkflow),带类型化的 input
    • 在后端持久化它的 status(pending → in_progress → completed/failed/...)和 output / tags
    • 在「该开始」「该继续」「超时了」时,回调你注册的处理函数
    • 让任务自己续命(改 timeoutAt)、自己收尾(setCompleted / setFailed),甚至链式启动下一个 workflow
  • 用起来什么样: 下面是仓库里真实的最小用法(bots/bugbuster),一个 lintAll workflow——注册三种生命周期处理函数:

    // bots/bugbuster/src/index.ts:13-15(真实源码)
    bot.on.workflowStart('lintAll', handlers.handleLintAll)
    bot.on.workflowContinue('lintAll', handlers.handleLintAll)
    bot.on.workflowTimeout('lintAll', handlers.handleLintAllTimeout)

    处理函数里推动状态机:

    // 简化自 bots/bugbuster/src/handlers/lint-all.ts # 示意,非源码
    export const handleLintAll = async ({ workflow, conversation }) => {
    // 先告诉后端「我开始干活了」,否则会被当成卡死而重试
    await workflow.acknowledgeStartOfProcessing()
    // ...做实际的活...
    await workflow.setCompleted() // 干完,收尾
    }
  • 一句话直觉/类比: 把 workflow 当成一个交给后端托管的「带状态的待办任务」:你不持有它的执行线程,后端在合适的时机敲你的门(投递事件),你开门干一段活、汇报进度(acknowledge)、最后说「干完了」(setCompleted)。它更像消息驱动的 job 调度,而不是一段顺序代码。

本节不出现底层代码细节。要点:workflow = 命名 + 输入输出 schema + 由后端持久化的状态 + 你写的生命周期处理函数。


2. 顶层全景(它大概怎么转)

这套子系统横跨三段:定义(声明有哪些 workflow)、触发/操作(创建实例、改状态)、回调(后端事件 → 你的处理函数)。先看整体数据流。

2.1 一张图:一次 workflow 运行的来回

怎么读这张图:左边是「你的代码 / SDK」,右边是「Botpress 后端」;箭头是一次往返——你创建实例,后端持久化并在合适时机回投事件,SDK 把事件路由到你的处理函数,处理函数再通过 client 改回后端的状态。

你的 Bot 代码 / SDK Botpress 后端(持久化 + 调度)
┌─────────────────────────┐ ┌──────────────────────────────┐
│ workflows.X │ createWorkflow│ │
│ .startNewInstance() ──┼───────────────▶│ 新建实例 status=pending │
│ │ │ 持久化 input / tags │
│ │ │ │
│ │ workflow_update│ 到点了:投递生命周期事件 │
│ handler dispatch ◀─────┼────event───────┤ (started/continued/timedout)│
│ (update-handler.ts) │ │ │
│ │ │ │ │
│ ▼ │ updateWorkflow│ │
│ 你的处理函数 │ status=... │ │
│ workflow.ack() ───────┼───────────────▶│ status=in_progress │
│ workflow.setCompleted ┼───────────────▶│ status=completed + output │
└─────────────────────────┘ └──────────────────────────────┘

关键认知:执行不是连续的。后端可能就「同一个实例」多次投递事件(start 一次,后续 continued 多次),每次都是一次独立的 handler 调用——所以中间进度要靠 workflow-scoped state(见 §2.3)存。

2.2 部件一句话职责

部件干什么在哪个文件
WorkflowDefinition声明一个 workflow 的 input/output schema 与 tagspackages/sdk/src/bot/definition.ts:79-85
workflows 代理 (proxyWorkflows)暴露 startNewInstance / listInstances,把名字映射到后端 APIpackages/sdk/src/bot/workflow-proxy/proxy.ts:34-63
ActionableWorkflow单个实例的操作面:update / acknowledgeStartOfProcessing / setCompleted / setFailed / cancelpackages/sdk/src/bot/workflow-proxy/proxy.ts:65-138
bot.on.workflow{Start,Continue,Timeout}注册生命周期处理函数packages/sdk/src/bot/implementation.ts:414-450
handleWorkflowUpdateEvent收到 workflow_update 事件后,按类型+名字找到处理函数并依次调用packages/sdk/src/bot/server/workflows/update-handler.ts:14-104
workflowHandlers 代理把注册的处理函数包成「注入 workflow 操作面」的可调用形态packages/sdk/src/bot/implementation.ts:231-279

2.3 主线走一遍(高层,不进代码)

  1. 定义:在 BotDefinition / PluginDefinitionworkflows 字段声明 lintAll(input/output/tags schema)。
  2. 触发:某个事件处理函数里调用 client.getOrCreateWorkflow({ name: 'lintAll', ... })(bots/bugbuster/src/handlers/time-to-lint-all.ts:13-19),后端建一个 status=pending 的实例。
  3. 回调:后端把一个 type: 'workflow_update' 的事件投回 bot 的 HTTP handler;onEventReceived 看到 ctx.type === 'workflow_update' 就转交 handleWorkflowUpdateEvent(packages/sdk/src/bot/server/index.ts:162-163)。
  4. 路由:dispatcher 根据 started/continued/timedout 和 workflow 名,找到你注册的处理函数链,依次调用。
  5. 干活 + 推状态:处理函数 acknowledgeStartOfProcessing()(置 in_progress),做活,期间用 workflow-scoped state(type: 'workflow',以 workflow.id 为 key)存进度;最后 setCompleted() / setFailed()
  6. 续命或链式:活没干完就 workflow.update({ timeoutAt }) 让后端晚点再 continued 你;或 setCompleted() 后再 workflows.next.startNewInstance(...) 启动下一个 workflow(plugins/file-synchronizer 的 buildQueue→processQueue 就是这么干)。

目标:看懂「大盘」——workflow 是事件驱动的、跨多次唤醒的、靠显式状态动作推进的后台任务。下面三章逐层深入。


阅读地图

按这个顺序读,由浅入深:

  1. 01-lifecycle.md — 状态生命周期:一个实例的 8 种 status、为什么必须 acknowledgeStartOfProcessing(以及不 ack 会被重试 3 次的细节)、收尾动作的语义。先读这章建立心智模型。
  2. 02-event-dispatch.md — 事件投递与路由:后端的 workflow_update 事件长什么样、handleWorkflowUpdateEvent 怎么把 5 种桥接事件类型收敛成 3 种、多个处理函数(含插件)如何按顺序串联并传递 workflow 状态。
  3. 03-proxy-and-actions.md — 代理与操作面:Proxy 怎么把 workflows.X.startNewInstance 变成 createWorkflow 调用、ActionableWorkflow 每个动作的真实实现、类型系统如何从定义推断 input/output/tags。

巧妙之处(先记住,细节在各章)

  • 「acknowledge 才算活着」的看门狗模型:workflow 不靠「函数有没有抛异常」判断成功,而靠你有没有主动把 status 从 pending 推走。没及时 ack 会被重投 3 次再判失败(workflow-proxy/types.ts:69-81 的 JSDoc)。这把「卡死的后台任务」变成可观测、可重试的。

  • 状态机用「事件 + 显式动作」而非「连续执行」:同一实例可被 continued 多次唤醒,每次都是独立 handler 调用,进度必须外存。这让长任务天然可中断、可恢复、可超时续命。

  • 链式 workflow 取代「巨型单任务」:file-synchronizer 把「枚举文件」和「处理队列」拆成 buildQueue / processQueue 两个 workflow,前者干完 setCompleted()startNewInstance 后者(plugins/file-synchronizer/src/hooks/workflow-continued/build-queue.ts)——每段都能独立超时续命。


边界与局限(诚实)

  • 「子 workflow」尚未支持:dispatcher 显式把 child_workflow_deleted / child_workflow_finished 两类事件只记一条 info 日志后丢弃(update-handler.ts:33-38:"child workflows are not yet supported")。所谓「链式」是手动 startNewInstance,不是父子树。

  • workflow 定义其实是「运行时临时造」的:源码顶部一大段 FIXME 坦白:当前 workflow 定义不在 deploy 时发给后端,而是每次 run 时即时构造,因此 tag 没有按插件 alias 前缀(后端校验不允许 #)。这是已知技术债,追踪在 Linear KKN-292(workflow-proxy/proxy.ts:10-32)。

  • getWorkflow / updateWorkflow 无法类型推断:只有 id,没有名字,所以拿不到具体 workflow 的 input/output 类型(client/types.ts:211-218 的 FIXME)。要类型安全得走 workflows.X 代理或处理函数注入的 workflow

  • 整套 API 标注为 EXPERIMENTAL:workflowStart/Continue/Timeout 都带「This API is experimental and may change」(implementation.ts:410-413)。


横向对比

同属 ai-agent-reference 货架的「agent 编排 / 后台任务」一类里,Botpress workflow 的取舍是**「薄 SDK + 后端托管状态」:它自己几乎不存执行状态,全部 status/output 落在 Botpress 后端,SDK 只是「定义 + 代理 + 事件路由」。这和那些「在进程内用内存图/状态机跑编排」的框架(把整张流程图和当前节点都放本地)是相反方向——Botpress 押注持久化与可恢复**,代价是每一步状态变更都是一次后端往返。


代码地图(导航索引)

主题文件路径符号名
workflow 定义结构packages/sdk/src/bot/definition.tsWorkflowDefinition
注册生命周期处理函数packages/sdk/src/bot/implementation.tsworkflowStart / workflowContinue / workflowTimeout
处理函数包装(注入操作面)packages/sdk/src/bot/implementation.tsworkflowHandlers (getter)
事件入口路由packages/sdk/src/bot/server/index.tsonEventReceived
事件分发主逻辑packages/sdk/src/bot/server/workflows/update-handler.tshandleWorkflowUpdateEvent / _dispatchToHandlers
桥接事件类型收敛packages/sdk/src/bot/server/workflows/update-type-conv.tsbridgeUpdateTypeToSnakeCase
workflows 代理packages/sdk/src/bot/workflow-proxy/proxy.tsproxyWorkflows
单实例操作面packages/sdk/src/bot/workflow-proxy/proxy.tswrapWorkflowInstance
代理 / 操作面类型packages/sdk/src/bot/workflow-proxy/types.tsWorkflowProxy / ActionableWorkflow
client 端 workflow APIpackages/sdk/src/bot/client/index.tscreateWorkflow / getOrCreateWorkflow / updateWorkflow / listWorkflows
真实示例(单 workflow,三回调)bots/bugbuster/src/handlers/lint-all.tshandleLintAll / handleLintAllTimeout
真实示例(链式 workflow)plugins/file-synchronizer/src/hooks/workflow-continued/build-queue.tshandleEvent