跳到主要内容

事件投递与处理函数路由

这章讲:后端在「该跑/该续/超时」时回投的 workflow_update 事件长什么样、SDK 如何把它路由到你注册的处理函数、5 种桥接事件类型怎么收敛成 3 种、以及多个处理函数(包括插件带来的)如何串成一条链依次调用。

1. 入口:一个特殊的 event

workflow 的回调不是独立通道,而是复用 bot 的事件入口onEventReceived 看到 ctx.type === 'workflow_update' 就分流出去:

// packages/sdk/src/bot/server/index.ts:162-163(真实源码)
if (ctx.type === 'workflow_update') {
return await handleWorkflowUpdateEvent(serverProps, body.event as types.WorkflowUpdateEvent)
}

事件本身是「普通 client.Event + 一个 workflow_update payload」的合并类型(packages/sdk/src/bot/server/types.ts:211-217):payload 里带 type(桥接类型)、workflow(完整实例)、可选的 conversation / user(server/types.ts:204-210)。

2. 五种桥接类型 → 三种内部类型

后端发来的 payload.type 有 5 种(BridgeWorkflowUpdateType,server/types.ts:198-203),但 SDK 真正处理的只有 3 种。handleWorkflowUpdateEvent 做了一次清晰的分流:

// packages/sdk/src/bot/server/workflows/update-handler.ts:32-45(真实源码,节选)
switch (event.payload.type) {
case 'child_workflow_deleted':
case 'child_workflow_finished':
props.logger.info(`Received child workflow event "${event.payload.type}", but child workflows are not yet supported`)
break
case 'workflow_timedout':
case 'workflow_started':
case 'workflow_continued':
return await _handleWorkflowUpdate(props, event)
default:
event.payload.type satisfies never // 穷尽性检查
}
后端桥接类型内部类型处理
workflow_startedstarted路由到 workflowStart 处理函数
workflow_continuedcontinued路由到 workflowContinue 处理函数
workflow_timedouttimed_out路由到 workflowTimeout 处理函数
child_workflow_deleted——仅记 info 日志,丢弃(子 workflow 未支持)
child_workflow_finished——仅记 info 日志,丢弃

名字的「桥接 → 内部」转换是一个独立小函数:

// packages/sdk/src/bot/server/workflows/update-type-conv.ts:3-14(真实源码,节选)
export const bridgeUpdateTypeToSnakeCase = (updateType) => {
switch (updateType) {
case 'workflow_continued': return 'continued'
case 'workflow_started': return 'started'
case 'workflow_timedout': return 'timed_out'
default: throw new Error(`Unsupported workflow update type: ${updateType}`)
}
}

为什么要这层转换?后端的事件名是 workflow_xxx(对外协议),SDK 内部用更短的 started/continued/timed_out 当 map 的 key(见下)。这层桥接把「协议命名」和「内部命名」解耦。

3. 路由:按 (类型, 名字) 取处理函数链

找到处理函数靠二级查表:先按内部类型,再按 workflow 名。

// packages/sdk/src/bot/server/workflows/update-handler.ts:61-68(真实源码,节选)
const updateType = bridgeUpdateTypeToSnakeCase(event.payload.type)
const handlers = props.self.workflowHandlers[updateType]?.[event.payload.workflow.name]
if (!handlers || handlers.length === 0) {
props.logger.warn(`No ${updateType} handler found for workflow "${event.payload.workflow.name}"`)
return SUCCESS_RESPONSE
}

注意两个早退:

  • 没名字:event.payload.workflow.name 为空 → 警告「假定这个 workflow 不是本 bot 定义的」并直接成功返回(update-handler.ts:24-30)。
  • 没处理函数:警告 No <type> handler found 后成功返回——不报错,因为别的 bot/插件可能注册了它。

4. 处理函数链:依次调用,串联 workflow 状态

一个 (类型, 名字) 下可以有多个处理函数(你自己注册的 + 插件注册的)。它们不是并行,而是串行接力——前一个返回的 workflow 状态喂给后一个:

// packages/sdk/src/bot/server/workflows/update-handler.ts:91-101(真实源码,节选)
let currentWorkflowState = structuredClone(event.payload.workflow)
for (const handler of handlers ?? []) {
currentWorkflowState = await handler({
...props,
event,
conversation: event.payload.conversation,
user: event.payload.user,
workflow: currentWorkflowState, // ← 上一个的结果传给下一个
})
}

怎么读:structuredClone 先拷一份初始状态(避免改到事件原对象),然后像中间件链一样逐个调用,每个处理函数返回它操作后的最新 workflow 状态,传给下一个。这让「插件处理函数 + bot 处理函数」能在同一实例上叠加生效。

这些「返回最新状态」的处理函数从哪来?是 workflowHandlers getter 包出来的(packages/sdk/src/bot/implementation.ts:244-272):它把你注册的原始处理函数包一层,注入 wrapWorkflowInstance(操作面)+ proxyWorkflows(启动别的 workflow),并用一个闭包 currentWorkflowState 捕获每次 onWorkflowUpdate 的新状态后返回。自己的处理函数排在插件处理函数前面(implementation.ts:268-272,[...selfHandlers, ...pluginHandlers])。

5. 收尾检查:还停在 pending 就警告

链跑完后,dispatcher 检查终态——如果实例还是 pending,说明处理函数既没 ack 也没收尾,这通常是 bug:

// packages/sdk/src/bot/server/workflows/update-handler.ts:72-77(真实源码,节选)
if (updatedWorkflow.status === 'pending') {
props.logger.warn(
`Workflow "${event.payload.workflow.name}" is still in pending status after processing "${updateType}" event. ` +
'This may indicate that the workflow was not properly acknowledged or terminated by the handler. '
)
}

这正是 §01「return 不等于 completed」那条坑的运行时体现。

6. 关键细节 / 坑

  • workflow 回调走的是 event 通道,不是单独的 webhook;入口判别靠 ctx.type === 'workflow_update'
  • 未知/子 workflow 事件被静默吞掉(只记 info),所以「我发了子 workflow 事件却没反应」是预期行为——尚未支持。
  • 处理函数链是串行接力,顺序是「自己的在前、插件的在后」,且通过返回值传递最新状态。
  • 找不到处理函数不报错,只 warn,以兼容多 bot/插件共存。

代码地图

主题文件路径符号名
事件入口分流packages/sdk/src/bot/server/index.tsonEventReceived
事件 payload / event 类型packages/sdk/src/bot/server/types.tsWorkflowUpdateEventPayload / WorkflowUpdateEvent / BridgeWorkflowUpdateType
分发主逻辑 + 早退packages/sdk/src/bot/server/workflows/update-handler.tshandleWorkflowUpdateEvent / _handleWorkflowUpdate
处理函数链接力packages/sdk/src/bot/server/workflows/update-handler.ts_dispatchToHandlers
桥接类型转换packages/sdk/src/bot/server/workflows/update-type-conv.tsbridgeUpdateTypeToSnakeCase
处理函数包装(注入操作面)packages/sdk/src/bot/implementation.tsworkflowHandlers (getter)
注册三种处理函数packages/sdk/src/bot/implementation.tsworkflowStart / workflowContinue / workflowTimeout