第 01 章 · 核心引擎:Pregel/BSP 超步循环
本章讲什么: LangGraph 的心脏是一个借鉴自 Google Pregel 的超步循环。 本章把它彻底拆开:如何决定下一步跑谁、如何把并行节点的写入批量提交,以及为什么这套版本号机制让恢复和并行都变简单。
1.1 先建直觉:什么是 BSP 超步
LangGraph 自述其灵感来自 Pregel 和 Apache Beam(README.md 结尾的 Acknowledgements)。Pregel 是 Google 的图计算模型,核心思想叫 BSP(Bulk Synchronous Parallel):
- 计算分成一个个超步(superstep)。
- 一个超步内,所有活跃节点并行计算,它们看到的是上一步结束时的状态,彼此的本步写入互不可见。
- 超步结束有一道同步栅栏:所有写入在这里一次性生效。
- 没有节点活跃时,计算停止。
为什么 agent 适合用这个模型?
- 并行无竞态:同一超步内多节点并行写状态,但写入要到栅栏才合并,不会互相踩。
- 可恢复:栅栏处状态是一致快照,存盘即可;恢复就是从某张快照接着跑。
- 确定性提交顺序:合并时按任务路径排序,保证可复现。
1.2 一个超步的四个动作
PregelLoop 的一圈,就是 tick() + 执行 + after_tick():
┌───────────────── 一个超步(superstep) ─────────────────┐
上一步的 │ ① tick(): prepare_next_tasks │
channel │ "哪些 channel 刚被更新? 它们触发哪些节点?"→ tasks │
版本号 │ │ │
───────┼─────────────┼── 没有 tasks? → status=done,循环结束 ─────┤
│ ▼ │
│ ② 并行执行 tasks(节点函数跑起来) │
│ 节点读状态、算出增量写入,暂存进 task.writes(不生效) │
│ ▼ ←──── 同步栅栏(barrier) ──── │
│ ③ after_tick(): apply_writes │
│ 写入按 channel 分组 → reducer 合并 → 改的 channel 升版本 │
│ ▼ │
│ ④ _put_checkpoint(): 把新状态存盘 │
└─────────────┼──────────────────────────────────────────┘
▼ 回到 ①,用新版本号选下一批 tasks
1.3 动作①:tick —— 选出本步要跑的任务
PregelLoop.tick() 的骨架(pregel/_loop.py:593-623,删减):
def tick(self) -> bool:
if self.step > self.stop: # 超过递归上限,防死循环
self.status = "out_of_steps"
return False
self.tasks = prepare_next_tasks( # 关键:算出本步任务
self.checkpoint, ..., self.channels,
trigger_to_nodes=self.trigger_to_nodes,
updated_channels=self.updated_channels, # 上一步改了哪些 channel
for_execution=True,
)
if not self.tasks: # 没任务 = 图跑完了
self.status = "done"
return False
return True
self.stop 是递归上限,恢复时被设成 step + recursion_limit + 1(_loop.py:1677-1678),这就是著名的 recursion_limit——它本质是最多允许多少个超步,防止图无限循环。
怎么选任务? 看 prepare_next_tasks 里的优化分支(pregel/_algo.py:475-486):
if updated_channels and trigger_to_nodes:
triggered_nodes = set()
for channel in updated_channels: # 上一步被改的 channel
if node_ids := trigger_to_nodes.get(channel):
triggered_nodes.update(node_ids) # 找出订阅了它的节点
candidate_nodes = sorted(triggered_nodes) # 排序保证确定性
else:
candidate_nodes = processes.keys() # 退路:扫所有节点
这就是谁该上场的判定:只有上一步被改过的 channel 才会触发它的订阅节点。trigger_to_nodes 是一张 channel → 订阅它的节点 反查表,引擎不必每步遍历全图,只看变化点。这是 Pregel 只唤醒收到消息的节点 思想的直接落地。
1.4 动作③:apply_writes —— 全章最关键的函数
执行完节点后,所有写入暂存在各 task 的 writes 里。after_tick() 调 apply_writes 把它们提交(pregel/_loop.py:677-687)。这个函数(pregel/_algo.py:232-345)做五件事:
第 1 件:按路径排序任务,保证确定性合并顺序(_algo.py:253-256):
# 按 task.path 前 3 段排序,确保更新应用顺序确定、可复现
tasks = sorted(tasks, key=lambda t: task_path_str(t.path[:3]))
第 2 件:记录每个节点看过的 channel 版本(_algo.py:262-269)。这是 versions_seen——它记录节点 N 在某 channel 上消费到了版本几。下一步判断 N 要不要再跑,就是比对 channel 当前版本 vs versions_seen[N][channel]:更新了才跑。
for task in tasks:
checkpoint["versions_seen"].setdefault(task.name, {}).update({
chan: checkpoint["channel_versions"][chan]
for chan in task.triggers # 这个节点订阅的 channel
if chan in checkpoint["channel_versions"]
})
第 3 件:把写入按 channel 分组,逐 channel update()(_algo.py:294-323,删减):
pending_writes_by_channel = defaultdict(list)
for task in tasks:
for chan, val in task.writes:
pending_writes_by_channel[chan].append(val) # 同名 channel 写入聚到一起
for chan, vals in pending_writes_by_channel.items():
if channels[chan].update(vals) and next_version is not None:
checkpoint["channel_versions"][chan] = next_version # 改了就升版本号
if channels[chan].is_available():
updated_channels.add(chan)
注意 channels[chan].update(vals)——一个 channel 一步内可能收到多个写入,合并逻辑由 channel 自己定(reducer 的归宿,见第 02 章)。LastValue 收到多个会报错,BinaryOperatorAggregate 会逐个折叠。
第 4 件:给本步没被写的 channel 也发空更新(_algo.py:325-333)。bump_step 仅当有任务带 triggers 时为真,让某些等待型 channel(如屏障)能感知又过了一步。
第 5 件:若本步无 channel 能触发任何节点,通知所有 channel finish()(_algo.py:335-342):
if bump_step and updated_channels.isdisjoint(trigger_to_nodes):
for chan in channels:
if channels[chan].finish() and next_version is not None:
...
这是图即将结束的信号——LastValueAfterFinish / NamedBarrierValueAfterFinish 这类 defer(延迟)channel 专门等这个信号才放行(见第 02 章)。
apply_writes 返回 updated_channels(本步被改的 channel 集合),直接喂给下一次 tick,形成闭环。
1.5 版本号:整套机制的隐形主角
一个单调递增的整数版本号贯穿全程。默认版本函数就是自增(_algo.py:227-229):
def increment(current: int | None, channel: None) -> int:
return current + 1 if current is not None else 1
它驱动三件事,缺一不可:
| 用途 | 怎么用版本号 | 代码 |
|---|---|---|
| 唤醒下一步节点 | channel 版本 > 节点 versions_seen 记录的版本 → 唤醒 | prepare_single_task(_algo.py:524) |
| 判断要不要中断 | 任一 channel 版本 > 上次中断见过的 → 有新进展 | should_interrupt(_algo.py:155-185) |
| 崩溃恢复对齐 | checkpoint 存了 channel_versions + versions_seen,恢复后比对即知接着跑谁 | 第 03 章 |
为什么不用脏标记而用版本号? 因为版本号持久化友好:存进 checkpoint 后,任何时候加载回来,只要比一下数字就知道状态进展到哪、谁还欠跑——不依赖任何内存临时标记。这是可恢复能力的根基。
1.6 关键细节 / 坑
- 一步之内,节点看到的是上一步的 状态。期望节点 A 写了、同一步 B 立刻读到——不会。B 要到下一步才看见。这是 BSP 本质,也是新手最常踩的坑。
recursion_limit是超步上限,不是递归深度。会循环往复的图(如 ReAct 的想→工具→想)每来回一次烧若干超步,撞上限抛GraphRecursionError(tick里status="out_of_steps",_loop.py:601-603)。- 合并顺序按
task.path排序而非节点注册顺序(_algo.py:253-256),并行节点写同一 reducer channel 时结果顺序确定、可复现。
下一章看状态到底长什么样:channel 与 reducer。