跳到主要内容

多入边节点:到底要不要等?

这是整个引擎最烧脑的一段。读前先记住一句话:多条边指向同一个节点,有两种完全相反的含义,引擎必须分清。

1. 为什么这件事难

一个节点 D 有两条入边(B→D、C→D),可能是两种截然不同的意图:

场景图形D 应该怎么做
并行汇聚A 同时分叉到 B、C,B、C 各跑完再合到 D B 和 C 都跑完,再跑 D(否则汇总结果不全)
分支独占条件节点二选一:要么走 B、要么走 C,最后都到 D不等:只有一条分支会真的执行,谁到了就跑 D

如果判断错了:并行场景下不等 → 数据不全;分支场景下硬等 → 永远死等那条根本不会执行的分支。

LangGraph 的语义里:add_edge([B, C], D)(列表)= D 等 B、C 都完成才触发;分别 add_edge(B, D)add_edge(C, D) = 任一上游完成就触发。所以 BISHENG 要做的,就是判断该用哪种连法

2. 入口:只处理真正的多入边

# graph_engine.py:119-130(简化)
def build_more_fan_in_node(self):
for node_id, source_ids in self.nodes_fan_in.items():
if not source_ids or len(source_ids) <= 1:
continue # 单入边,普通 add_edge 已处理
wait_nodes, no_wait_nodes = self.parse_fan_in_node(node_id)
if wait_nodes: # 要等:列表形式一次性连
self.graph_builder.add_edge(wait_nodes, node_id)
if no_wait_nodes: # 不等:逐个单独连
for one in no_wait_nodes:
self.graph_builder.add_edge(one, node_id)

核心判定全在 parse_fan_in_node(graph_engine.py:132),它返回一个二元组 (wait_nodes, no_wait_nodes)——前者走列表式「等全部」,后者走逐条式「谁来谁触发」。

3. 判定算法:两道关卡

多入边节点 D

┌──────────▼───────────┐
│ 关卡一:所有上游的 level │
│ 都 ≤ D 的 level 吗? │
└──────────┬───────────┘
否(有上游在 D 之后,说明有回环/下游绕回)
│ → 不等(排除 output_/condition_ 上游)

是(上游都在 D 之前)

┌──────────▼─────────────────────┐
│ 关卡二:到 D 的多条路径里,存在 │
│ 两条「完全不相交」的分支吗? │
└──────────┬─────────────────────┘
是 → 互斥分支汇聚点 → 不等
否 → 真并行汇聚点 → 等(全部上游跑完)

关卡一:层级检查(graph_engine.py:135-144)

all_source_node_prev = True
for one in source_ids:
if self.node_level[one] > self.node_level[node_id]: # 有上游比我还"深"
all_source_node_prev = False
break
if not all_source_node_prev:
# 有上游其实在我下游(循环结构),不需要等
return [], [one for one in source_ids if not one.startswith(('output_', 'condition_'))]

直觉:如果某条「入边」的来源节点 level 比我还大,说明它其实排在我后面——这是循环把下游又绕回来了。这种回边不该让我傻等,所以归入「不等」。

注意结尾排除了 output_ / condition_ 开头的上游:这两类节点通过条件边连接(add_conditional_edges),不是普通数据边,不参与这里的等待逻辑。

关卡二:分支是否互斥(graph_engine.py:146-169)

这一步要回答:多条到 D 的路径,是「都会跑(并行)」还是「只跑一条(互斥)」?

判据是几何性的:枚举每个条件/选择节点到 D 的所有分支路径,如果存在两条节点完全不相交的路径,就说明这两条路径是「二选一」的——即 D 是互斥分支的汇聚点。

# graph_engine.py:159-169(简化)
def judge_not_same_branch():
for i in range(len(all_branches)):
for j in range(i + 1, len(all_branches)):
if not (set(all_branches[i]) & set(all_branches[j])): # 交集为空
return True
return False

if judge_not_same_branch():
# 互斥分支 → 不等
return [], [one for one in source_ids if not one.startswith(('output_', 'condition_'))]

# 否则:真并行汇聚 → 等所有上游
wait_nodes = []
for one in source_ids:
if one.startswith('output_'):
one = f'{one}_fake' # output 上游要等它的 fake 节点(中断点)
wait_nodes.append(one)
return wait_nodes, []

枚举分支用的是 EdgeManage.get_all_edges_nodes(edges/edges.py:60),它从某个条件节点 DFS 到 D,收集每条路径上的中间节点;然后把端点 D 和起点条件节点本身从路径里去掉,只比较「中间节点集」是否相交。

为什么「不相交」就等于「互斥」? 因为条件节点是二选一的:走分支①经过的节点,和走分支②经过的节点,如果一个都不重叠,那这两条路就是同一个条件下的两个互斥出口,运行时只会走其中一条。既然只走一条,D 当然不能等「另一条上那个根本不会执行的节点」。

4. 一个具体例子

┌─→ B ─┐
A ─┤ ├─→ D (并行:A 同时给 B、C,D 汇总)
└─→ C ─┘

┌─[true]→ B ─┐
A → Cond ─┤ ├─→ D (互斥:Cond 二选一,D 只会被一条触发)
└─[false]→ C ─┘
  • 上图:B、C 都在 D 之前(关卡一过),且任何两条到 D 的路径都共享 A 之外的节点关系——judge_not_same_branch 为假 → ,连成 add_edge([B, C], D)
  • 下图:B 路径节点集 {B} 与 C 路径节点集 {C} 交集为空 → judge_not_same_branch 为真 → 不等,分别连 add_edge(B, D)add_edge(C, D)

5. 巧妙之处与边界

  • 用「路径节点集相交性」推断分支语义(graph_engine.py:159)。 不需要前端额外标注「这是并行还是分支」,引擎纯靠图的拓扑结构反推出来。这是这段代码最聪明的地方。
  • level 兼当「环检测」(graph_engine.py:138)。 同一个 node_level 既服务「谁先谁后」,又用来识别「回边不必等」,一物两用。
  • 边界:依赖前端用 output_ / condition_ 前缀命名节点 id(graph_engine.py:144174)。 这些字符串前缀是硬编码的约定——判定逻辑靠 node_id.startswith(('output_', 'condition_')) 区分「数据边」和「条件边」上游。若上游节点 id 不遵守这套命名约定,等待判定会出错。这是一个隐式契约。

6. 代码地图

主题文件符号
多入边总处理graph/graph_engine.pyGraphEngine.build_more_fan_in_node
等/不等判定graph/graph_engine.pyGraphEngine.parse_fan_in_node
分支互斥判据graph/graph_engine.pyjudge_not_same_branch(parse_fan_in_node 内)
枚举分支路径edges/edges.pyEdgeManage.get_all_edges_nodes
节点层级graph/graph_engine.pyGraphEngine.build_node_level, node_level