跳到主要内容

第 03 章 · 持久化、中断与恢复

本章讲什么: LangGraph 最招牌的两个能力——能从崩溃点恢复、能暂停等人输入——都建立在 checkpoint 上。本章拆开 checkpoint 数据结构,再讲 interrupt() 为什么靠重放整个节点实现,以及这带来的著名坑。

3.1 checkpoint 是什么:状态的一张完整快照

开了 checkpointer 后,每个超步末尾(第 01 章动作④)都存一张 Checkpoint。结构(libs/checkpoint/langgraph/checkpoint/base/__init__.py:92-123):

字段装什么为什么需要
channel_values每个 channel 当前值恢复时重建状态
channel_versions每个 channel 的单调递增版本号判断进展到哪
versions_seen{节点: {channel: 见过的版本}}判断哪个节点还欠跑
id / ts单调递增 ID + 时间戳排序、定位历史步
v格式版本(当前 3)跨版本迁移(见 _migrate_checkpoint)

关于 v 当前值的依据: 写作时这个版本号确为 3——证据是 state.py:1625 的迁移码 if checkpoint["v"] >= 3: return(v 已到 3 就跳过迁移)。注意 Checkpoint 的 docstring(checkpoint/base/__init__.py:96)原文仍写 "Currently 1",这是上游没及时更新的过时文字;若你按 docstring 核对会困惑,以迁移码为准。

关键洞察: 恢复一次运行不需要任何内存状态——只要 channel_values(状态)+ channel_versionsversions_seen(进度)。把第 01 章的版本号机制和这张表合起来:谁还该跑 = 存在某 channel,其 channel_versions[c] > versions_seen[node][c]。这就是为什么 LangGraph 能从任意 checkpoint 精确续跑。

3.2 thread_id:一条对话/一次运行的主键

checkpoint 按 thread_id 分组存储(BaseCheckpointSaver docstring,checkpoint/base/__init__.py:182-199):

config = {"configurable": {"thread_id": "my-thread"}}
graph.invoke(inputs, config) # 不给 thread_id,checkpointer 无法存/恢复
  • 一次性任务:用随机 thread_id,各跑各的。
  • 对话记忆:复用同一个 thread_id,状态(如消息历史)跨多次 invoke 累积。

这就是长期记忆的实现层:同一 thread 的状态自动续上,你不用手动传历史。

3.3 三种 durability(存盘时机的取舍)

Durability 有三档(types.py:87-93):

档位何时存盘取舍
"sync"下一步开始前同步存完最安全,最慢
"async"下一步执行的同时异步存平衡
"exit"只在图退出时存一次最快,崩了丢中间步

这是安全 vs 吞吐的经典旋钮:agent 每步都很贵且不可重放时选 sync;快速短任务可选 exit。

3.4 interrupt():人在环路靠整段重放

这是 LangGraph 最反直觉、也最需要讲清的机制。在节点里调 interrupt("问题") 就能暂停图、把问题抛给客户端,等人答了再续。

interrupt() 实现(types.py:811-934,删减):

def interrupt(value):
scratchpad = conf[CONFIG_KEY_SCRATCHPAD]
idx = scratchpad.interrupt_counter() # 这是本节点第几个 interrupt 调用
if scratchpad.resume: # 恢复时:已有人给的答案?
if idx < len(scratchpad.resume):
return scratchpad.resume[idx] # 直接返回答案,不再抛异常
v = scratchpad.get_null_resume(True) # 单值 resume 的情况
if v is not None:
scratchpad.resume.append(v)
return v
raise GraphInterrupt( # 首次:抛异常,暂停整个图
(Interrupt.from_ns(value=value, ns=conf[CONFIG_KEY_CHECKPOINT_NS]),)
)

它的工作模型分两趟:

第一趟(首次执行,无 resume 值):
节点跑到 interrupt("年龄?") → 抛 GraphInterrupt
→ 图暂停,checkpoint 落盘,"年龄?"返回给客户端

……人类回答 30,客户端用 Command(resume="30") 续跑……

第二趟(恢复执行,带 resume 值):
从 checkpoint 恢复 → 节点【从头重新执行!】
→ 跑到同一个 interrupt("年龄?")
→ 这次 scratchpad.resume 里有 30 → 直接返回 30,继续往下

最重要的坑(docstring 原文,types.py:822-828): 图从节点开头恢复,重新执行所有逻辑。也就是说:

  • interrupt() 之前的代码会再跑一遍。所以 interrupt 之前不要做不可重复的副作用(发邮件、扣款、写数据库)——会做两次。
  • 一个节点里多个 interrupt() 调用,靠调用顺序匹配 resume 值(idx 计数器),别在 interrupt 之间放条件分支改变调用次数。

为什么这么设计? 因为节点函数是普通 Python 函数,LangGraph 没法在函数中间冻结栈。重放整段是用幂等假设换来了实现简单 + 完全靠 checkpoint 恢复,代价就是 interrupt 前的副作用风险。这是一个清醒的工程取舍。

3.5 should_interrupt:静态中断点(interrupt_before/after)

除了节点内的 interrupt(),你还能在 compile(interrupt_before=["node"]) 设静态断点。判定逻辑(_algo.py:155-185):

any_updates = any(
version > seen.get(chan, null_version) # 自上次中断后有新进展?
for chan, version in checkpoint["channel_versions"].items()
)
return [task for task in tasks if task.name in interrupt_nodes] if any_updates else []

又一次,靠版本号比对判断有没有新进展值得停下来。命中就抛 GraphInterrupt(_loop.py:660-665),图在执行目标节点之前暂停——这正是时间旅行调试和执行前人工审批的基础。

3.6 关键细节 / 坑

  • interrupt 必须配 checkpointer:没有持久化就没有现场可恢复(types.py:830-831 明说)。
  • 恢复用 Command(resume=...) 而非重新 invoke 输入:resume 值经 scratchpad 喂回 interrupt 调用点(第 04 章讲 Command)。
  • checkpoint 有格式迁移:v<3 的旧 checkpoint 会被 _migrate_checkpoint 在加载时改写 channel 命名(start:Xbranch:to:X,state.py:1612-1668),这是上游演进留下的兼容层。