跳到主要内容

人在环:工作流如何停下来等人、再续跑

「人在环(Human-in-the-loop)」是 BISHENG 的招牌特性:工作流跑到一半可以停下来,让用户填表、选择、补充,然后从断点继续而不是重跑。本章讲它怎么实现。

1. 这一步要解决什么

企业流程里常有「需要人」的环节:让用户确认风险条款、二选一选择走向、上传一份文件。这意味着工作流必须能:

  1. 跑到某个节点之前停住;
  2. 告诉前端「我在等你输入,这是表单结构」;
  3. 收到用户输入后,从刚才停的地方继续,而不是从头来。

2. 思路:三块拼图

┌─ 拼图① interrupt_before ──────────────────────────────┐
│ 编译时把「输入节点」「output_fake 节点」标成中断点, │
│ LangGraph 执行到它们之前会自动暂停。 │
└────────────────────────────────────────────────────────┘
┌─ 拼图② 进程内缓存 ─────────────────────────────────────┐
│ 停下来后,把整个 Workflow 对象留在内存 _global_workflow,│
│ 执行现场由 MemorySaver(checkpointer)保住。 │
└────────────────────────────────────────────────────────┘
┌─ 拼图③ 注入输入 + 续跑 ─────────────────────────────────┐
│ 用户回话 → handle_input 把输入塞进对应节点 → │
│ graph.stream(None) 从 checkpoint 续跑。 │
└────────────────────────────────────────────────────────┘

关键认识:停/续的底层能力来自 LangGraph,BISHENG 加的是「判断该不该停、停了之后怎么衔接」的业务层。

3. 拼图①:哪些节点是断点

编译期,init_nodes 收集需要中断的节点(graph_engine.py:241-250):

# graph_engine.py:241-250(节选)
elif node_instance.type == NodeType.INPUT.value:
interrupt_nodes.append(node_instance.id) # 输入节点
elif node_instance.type == NodeType.OUTPUT.value:
fake_node = OutputFakeNode(id=f'{node_instance.id}_fake', ...)
self.nodes_map[fake_node.id] = fake_node
interrupt_nodes.append(fake_node.id) # output 的 fake 节点

然后传给 compile(graph_engine.py:278):interrupt_before=interrupt_nodes。LangGraph 执行到这些节点之前会暂停。

为什么 output 要个 fake 节点? 因为 output 节点本身要先执行(把消息发给用户),又要在「等用户选择/填写」处停住。一个节点没法既执行又在自己之前中断,所以拆成两个:

output 节点(真执行:发消息、算选项)
│ add_edge

output_fake 节点(什么都不干,只作为 interrupt_before 的断点)
│ add_conditional_edges(按用户选择路由到下游)

下游节点

OutputFakeNode.run 确实啥也不算,只补发一个 on_node_end 日志事件(nodes/output/output_fake.py:15-23)——因为输出节点的结束日志要等用户交互完才能正确显示。

4. 拼图②:停下来后,判断「停得对不对、要等什么」

LangGraph 暂停后,控制权回到 GraphEngine.judge_status(graph_engine.py:358)。它看检查点快照里「下一个要跑的节点」是谁:

# graph_engine.py:358-377(简化)
snapshot = self.graph.get_state(self.graph_config)
next_nodes = snapshot.next
if len(next_nodes) == 0:
self.status = WorkflowStatus.SUCCESS.value # 没有下一个了 → 跑完了
return
for node_id in next_nodes:
node_instance = self.nodes_map[node_id]
if node_instance.type == NodeType.INPUT.value:
input_schema = node_instance.get_input_schema()
if input_schema:
self.status = WorkflowStatus.INPUT.value # 进入等输入态
self.callback.on_user_input(UserInputData(...)) # 把表单结构推给前端
return
elif node_instance.type == NodeType.FAKE_OUTPUT.value:
... # output 要用户输入时同理

这里有个优化:不是所有 input/output 节点都真要停。get_input_schema 返回空就说明这个节点不需要用户交互(比如 output 只是单纯展示消息、不需要选择),judge_status 就不会把状态切到 INPUT,LangGraph 会继续往下跑。OutputNode.get_input_schema 只在 output_typeinput / choose 时才返回表单(nodes/output/output.py:47-52)。

5. 拼图③:门面层的「跑到停为止」循环

Workflow.run(graph/workflow.py:50)是驱动器,它的循环很值得品:

# graph/workflow.py:57-65(节选)
if input_data is not None:
self.graph_engine.continue_run(input_data) # 带用户输入续跑
else:
self.graph_engine.run() # 首次跑
while self.graph_engine.status == WorkflowStatus.RUNNING.value:
self.graph_engine.continue_run() # 没要等人但还在跑 → 继续推
return self.graph_engine.status, self.graph_engine.reason

这个 while 的意思是:一次 stream 可能因为内部中断点停下但其实不需要用户输入(judge_status 没切到 INPUT,状态还是 RUNNING),那就自动再 continue_run() 推一把,直到要么真的要等人(INPUT)、要么跑完(SUCCESS)、要么失败(FAILED)。所以门面对外只暴露三种终态。

6. 续跑:输入从哪来、怎么注入

续跑的真正注入点在 GraphEngine.continue_run(graph_engine.py:324):

# graph_engine.py:333-339(节选)
for node_id, node_params in data.items():
node_instance = self.nodes_map[node_id]
node_instance.handle_input(node_params) # 把用户输入塞进节点
self._run(None) # stream(None) → 从 checkpoint 续跑

handle_input 默认就是把输入并进节点参数(nodes/base.py:125-127);output 节点会重写它,额外把用户的选择存进全局变量并记进聊天历史(nodes/output/output.py:41-45)。

stream(None) 是 LangGraph 的「带 None 续跑」语义:不给新输入,从上次 checkpoint 接着往下走。

7. 跨进程的现场怎么保住(worker 层)

注意一个现实约束:MemorySaver进程内内存检查点。所以「等人」期间,整个 Workflow 对象必须留在同一个进程。worker 层正是这么做的:

# worker/workflow/tasks.py:41-45(节选)
if workflow.status() == WorkflowStatus.INPUT.value:
_global_workflow[redis_callback.unique_id] = workflow # 留在进程全局字典
redis_callback.set_workflow_status(status, reason)
  • 跑到 INPUT → 把 Workflow 对象塞进模块级 _global_workflow 字典(tasks.py:22),状态写回 Redis 通知前端。
  • 用户回话 → continue_workflow(tasks.py:136)从 _global_workflow 取回同一个对象,从 Redis 拿用户输入(tasks.py:119),调 workflow.run(user_input) 续跑。
  • 终态(SUCCESS/FAILED)→ _clear_workflow_obj 把对象从字典里删掉(tasks.py:25)。

重要边界: 因为现场只在进程内存,worker 重启或换进程,等待中的工作流就丢了。代码里 redis_callback.save_workflow_object(workflow) 那行是注释掉的(tasks.py:44),说明「把现场持久化到 Redis」目前没启用——这是已知的局限(超时机制见 Workflow.timeout,graph/workflow.py:27)。

8. 巧妙之处

  • fake 节点把「执行」和「中断点」解耦(graph_engine.py:246)。 用一个空节点专门承载 interrupt_before,绕开了「同一节点既执行又在自身前中断」的矛盾。
  • get_input_schema 返回空 = 不必停(graph_engine.py:371)。 用「有没有表单」来动态决定中断点是否真生效,省掉无谓的暂停往返。
  • 门面 while RUNNING 自动推进(graph/workflow.py:63)。 把「中断但无需用户输入」的内部状态对调用方隐藏,对外只剩干净的三态。

9. 代码地图

主题文件符号
标记中断节点graph/graph_engine.pyGraphEngine.init_nodes(interrupt_nodes)
中断后判状态graph/graph_engine.pyGraphEngine.judge_status
注入输入续跑graph/graph_engine.pyGraphEngine.continue_run, _run
门面驱动循环graph/workflow.pyWorkflow.run
fake 中断节点nodes/output/output_fake.pyOutputFakeNode
output 路由/schemanodes/output/output.pyOutputNode.route_node, get_input_schema
进程内现场缓存worker/workflow/tasks.py_global_workflow, continue_workflow, _clear_workflow_obj