跳到主要内容

BISHENG Workflow — 架构与原理

30 秒导读: BISHENG Workflow 是一个企业级 LLM 应用编排引擎。用户在前端拖拽出一张流程图(开始 → LLM → 条件 → 输出…),后端把这张图编译成一张 LangGraph 状态图并执行。它最特别的地方是能在执行中途停下来等人——比如让用户补一句话、做个选择、上传文件——然后从断点续跑,而不是从头重来。

1. 这是什么(零基础也能懂)

一句话定义: 它是一个「流程图 → 可执行 LLM 应用」的运行时:你画的图,它来跑。

解决什么问题 / 给谁用。 假设你是企业里搭 AI 应用的人,要做一个「合同审查助手」:先让用户上传合同 → LLM 抽取条款 → 如果有风险条款就让人工确认 → 生成报告。这套逻辑有循环、分支、要等人确认,用纯代码写既繁琐又难维护。BISHENG Workflow 让你把它画成流程图,引擎负责把图变成能跑、能停、能续的程序。

它能做什么(功能):

  • 把前端的节点 + 连线(JSON)编译成可执行图。
  • 支持 13 种节点:开始 / 结束 / 输入 / 输出 / LLM / Agent / 条件 / 代码 / RAG / 工具 / 报告 等。
  • 人在环(Human-in-the-loop): 在「输入」「输出(需用户选择/填写)」节点暂停,等用户交互后续跑。
  • 支持分支(条件路由)、并行汇聚、循环、批处理。
  • 节点之间用全局变量池传数据,用 {{#node_id.key#}} 语法在 prompt 里引用上游输出。
  • 通过回调把每个节点的开始/结束/流式输出/等待输入等事件实时推给前端。

用起来什么样(后端视角的最小调用):

# 示意,非源码:引擎对外的最小用法
workflow = Workflow(
workflow_id="wf_123",
user_id=1,
workflow_data=graph_json, # 前端画的图:{"nodes": [...], "edges": [...]}
callback=redis_callback, # 事件回调(推给前端)
)
status, reason = workflow.run() # 跑到第一个「要等人」的地方就停
# ... 前端收集到用户输入后 ...
status, reason = workflow.run(user_input) # 带着用户输入续跑

真实入口见 bisheng/workflow/graph/workflow.py:50(Workflow.run)。

一句话直觉/类比。 把它想成「带断点的流程图解释器」:LangGraph 是底层 CPU(负责按图调度节点),BISHENG 在上面加了一层——它知道哪些节点是「该停下来问人」的断点,停下来后把整个执行现场缓存在内存里,等用户回话再恢复。

2. 顶层全景(它大概怎么转)

2.1 三层结构

这套引擎从外到内分三层,职责清晰:

┌──────────────────────────────────────────────────────────┐
│ worker 层(Celery 任务 + Redis) │
│ tasks.py: execute_workflow / continue_workflow │
│ redis_callback.py: 事件→Redis→SSE 推前端;缓存等输入的现场 │
└───────────────┬──────────────────────────────────────────┘
│ 调用
┌───────────────▼──────────────────────────────────────────┐
│ Workflow 门面(workflow.py) │
│ run() / arun():反复 continue_run 直到不再是 RUNNING │
└───────────────┬──────────────────────────────────────────┘
│ 持有
┌───────────────▼──────────────────────────────────────────┐
│ GraphEngine(graph_engine.py)—— 真正的引擎 │
│ · 把 JSON 编译成 LangGraph StateGraph │
│ · interrupt_before 标记「该停的节点」 │
│ · run / continue_run / judge_status │
│ 内含:EdgeManage(边) · NodeFactory(节点) · GraphState(全局状态)│
└──────────────────────────────────────────────────────────┘

2.2 部件一句话职责

部件干什么在哪个文件
Workflow对外门面;循环驱动直到工作流结束或要等人graph/workflow.py
GraphEngine核心:编译图、跑图、判定状态、处理中断graph/graph_engine.py
EdgeManage管理所有边,提供「谁连谁 / 谁的上游下游」查询edges/edges.py
NodeFactory按节点 type 实例化对应的节点类nodes/node_manage.py
BaseNode所有节点的基类:统一 run 流程、变量替换、回调nodes/base.py
GraphState全局状态:变量池 + 聊天历史graph/graph_state.py
BaseCallback事件回调接口(节点开始/结束/流式/等输入)callback/base_callback.py
RedisCallback回调的生产实现:落 Redis、推 SSE、缓存现场worker/workflow/redis_callback.py

2.3 主线走一遍(高层,不进代码)

一次「跑到要等人 → 续跑 → 结束」的完整生命周期:

前端画图(JSON)


Workflow(...) → GraphEngine.__init__
│ build_edges() 把 edges 装进 EdgeManage
│ build_nodes() 实例化每个节点 + 翻译成 LangGraph 的 add_node/add_edge
│ 并用 interrupt_before 标出「输入/输出_fake」节点

workflow.run()
│ graph.stream(...) 沿图执行节点
│ 碰到 interrupt_before 标记的节点 → LangGraph 在它之前暂停

judge_status():snapshot.next 里有等输入的节点?
│ 是 → status=INPUT,回调 on_user_input,把现场对象留在内存

(用户在前端填了内容)


workflow.run(user_input)
│ 把输入塞进对应节点(handle_input)→ graph.stream(None) 从断点续跑

直到 snapshot.next 为空 → status=SUCCESS

关键直觉:「停」和「续」不是 BISHENG 自己实现的,而是借了 LangGraph 的 interrupt_before + checkpointer。 BISHENG 的活儿是:① 把图翻译对(尤其是分支/汇聚),② 判断停下来后到底要不要问人、问什么,③ 把「正在等人」的工作流对象缓存在进程内存里等续跑。详见 03-human-in-the-loop.md

3. 阅读地图

建议按顺序读,由浅入深:

  1. 01-graph-engine.md —— 先搞懂「JSON 怎么变成一张可执行的 LangGraph 图」:节点实例化、边翻译、节点层级(level)、最后 compile。这是地基。
  2. 02-fan-in-concurrency.md —— 整个引擎最烧脑的一段:一个节点有多条入边时,到底要「等所有上游都跑完」还是「谁先到就跑」?这决定了并行和分支汇聚的正确性。
  3. 03-human-in-the-loop.md —— 「人在环」是 BISHENG 的招牌特性。看 interrupt_beforeOutputFakeNode、进程内缓存、Redis 续跑怎么配合。
  4. 04-nodes-and-state.md —— 节点基类的统一执行骨架、13 种节点速览(含代码节点的 exec 沙箱缺位)、全局变量池和 {{#node.key#}} 变量解析。

如果你只想要一句话精华:它的全部巧思,都在「如何把一张可能含分支与循环的前端流程图,正确编译成一张能在中途停下来等人、再无缝续跑的状态图」。

4. 代码地图(导航索引)

主题文件关键符号
对外门面、驱动循环graph/workflow.pyWorkflow, Workflow.run
编译 + 执行引擎graph/graph_engine.pyGraphEngine, build_nodes, add_node_edge, judge_status
边管理edges/edges.pyEdgeManage, get_all_edges_nodes, get_next_nodes
节点工厂nodes/node_manage.pyNodeFactory.instance_node, NODE_CLASS_MAP
节点基类nodes/base.pyBaseNode.run, handle_input, parse_msg_with_variables
全局状态graph/graph_state.pyGraphState, get_variable_by_str, set_variable
条件路由nodes/condition/condition.pyConditionNode.route_node, ConditionCases.evaluate_conditions
输出/中断nodes/output/output.py, nodes/output/output_fake.pyOutputNode.route_node, OutputFakeNode
节点类型枚举common/node.pyNodeType, BaseNodeData
续跑/事件桥接worker/workflow/tasks.py, worker/workflow/redis_callback.pycontinue_workflow, RedisCallback, _global_workflow