第 03 章 · 持久化、中断与恢复
本章讲什么: LangGraph 最招牌的两个能力——能从崩溃点恢复、能暂停等人输入— —都建立在 checkpoint 上。本章拆开 checkpoint 数据结构,再讲
interrupt()为什么靠重放整个节点实现,以及这带来的著名坑。
3.1 checkpoint 是什么:状态的一张完整快照
开了 checkpointer 后,每个超步末尾(第 01 章动作④)都存一张 Checkpoint。结构(libs/checkpoint/langgraph/checkpoint/base/__init__.py:92-123):
| 字段 | 装什么 | 为什么需要 |
|---|---|---|
channel_values | 每个 channel 当前值 | 恢复时重建状态 |
channel_versions | 每个 channel 的单调递增版本号 | 判断进展到哪 |
versions_seen | {节点: {channel: 见过的版本}} | 判断哪个节点还欠跑 |
id / ts | 单调递增 ID + 时间戳 | 排序、定位历史步 |
v | 格式版本(当前 3) | 跨版本迁移(见 _migrate_checkpoint) |
关于
v当前值的依据: 写作时这个版本号确为3——证据是state.py:1625的迁移码if checkpoint["v"] >= 3: return(v已到 3 就跳过迁移)。注意Checkpoint的 docstring(checkpoint/base/__init__.py:96)原文仍写 "Currently1",这是上游没及时更新的过时文字;若你按 docstring 核对 会困惑,以迁移码为准。
关键洞察: 恢复一次运行不需要任何内存状态——只要 channel_values(状态)+ channel_versions 和 versions_seen(进度)。把第 01 章的版本号机制和这张表合起来:谁还该跑 = 存在某 channel,其 channel_versions[c] > versions_seen[node][c]。这就是为什么 LangGraph 能从任意 checkpoint 精确续跑。
3.2 thread_id:一条对话/一次运行的主键
checkpoint 按 thread_id 分组存储(BaseCheckpointSaver docstring,checkpoint/base/__init__.py:182-199):
config = {"configurable": {"thread_id": "my-thread"}}
graph.invoke(inputs, config) # 不给 thread_id,checkpointer 无法存/恢复
- 一次性任务:用随机
thread_id,各跑各的。 - 对话记忆:复用同一个
thread_id,状态(如消息历史)跨多次 invoke 累积。
这就是长期记忆的实现层:同一 thread 的状态自动续上,你不用手动传历史。
3.3 三种 durability(存盘时机的取舍)
Durability 有三档(types.py:87-93):
| 档位 | 何时存盘 | 取舍 |
|---|---|---|
"sync" | 下一步开始前同步存完 | 最安全,最慢 |
"async" | 下一步执行的同时异步存 | 平衡 |
"exit" | 只在图退出时存一次 | 最快,崩了丢中间步 |
这是安全 vs 吞吐的经典旋钮:agent 每步都很贵且不可重放时选 sync;快速短任务可选 exit。
3.4 interrupt():人在环路靠整段重放
这是 LangGraph 最反直觉、也最需要讲清的机制。在节点里调 interrupt("问题") 就能暂停图、把问题抛给客户端,等人答了再续。
看 interrupt() 实现(types.py:811-934,删减):
def interrupt(value):
scratchpad = conf[CONFIG_KEY_SCRATCHPAD]
idx = scratchpad.interrupt_counter() # 这是本节点第几个 interrupt 调用
if scratchpad.resume: # 恢复时:已有人给的答案?
if idx < len(scratchpad.resume):
return scratchpad.resume[idx] # 直接返回答案,不再抛异常
v = scratchpad.get_null_resume(True) # 单值 resume 的情况
if v is not None:
scratchpad.resume.append(v)
return v
raise GraphInterrupt( # 首次:抛异常,暂停整个图
(Interrupt.from_ns(value=value, ns=conf[CONFIG_KEY_CHECKPOINT_NS]),)
)
它的工作模型分两趟:
第一趟(首次执行,无 resume 值):
节点跑到 interrupt("年龄?") → 抛 GraphInterrupt
→ 图暂停,checkpoint 落盘,"年龄?"返回给客户端
……人类回答 30,客户端用 Command(resume="30") 续跑……
第二趟(恢复执行,带 resume 值):
从 checkpoint 恢复 → 节点【从头重新执行!】
→ 跑到同一个 interrupt("年龄?")
→ 这次 scratchpad.resume 里有 30 → 直接返回 30,继续往下
最重要的坑(docstring 原文,types.py:822-828): 图从节点开头恢复,重新执行所有逻辑。也就是说:
interrupt()之前的代码会再跑一遍。所以 interrupt 之前不要做不可重复的副作用(发邮件、扣款、写数据库)——会做两次。- 一个节点里多个
interrupt()调用,靠调用顺序匹配 resume 值(idx计数器),别在 interrupt 之间放条件分支改变调用次数。
为什么这么设计? 因为节点函数是普通 Python 函数,LangGraph 没法在函数中间冻结栈。重放整段是用幂等假设换来了实现简单 + 完全靠 checkpoint 恢复,代价就是 interrupt 前的副作用风险。这是一个清醒的工程取舍。
3.5 should_interrupt:静态中断点(interrupt_before/after)
除了节点内的 interrupt(),你还能在 compile(interrupt_before=["node"]) 设静态断点。判定逻辑(_algo.py:155-185):
any_updates = any(
version > seen.get(chan, null_version) # 自上次中断后有新进展?
for chan, version in checkpoint["channel_versions"].items()
)
return [task for task in tasks if task.name in interrupt_nodes] if any_updates else []
又一次,靠版本号比对判断有没有新进展值得停下来。命中就抛 GraphInterrupt(_loop.py:660-665),图在执行目标节点之前暂停——这正是时间旅行调试和执行前人工审批的基础。
3.6 关键细节 / 坑
- interrupt 必须配 checkpointer:没有持久化就没有现场可恢复(
types.py:830-831明说)。 - 恢复用
Command(resume=...)而非重新 invoke 输入:resume 值经scratchpad喂回 interrupt 调用点(第 04 章讲 Command)。 - checkpoint 有格式迁移:
v<3的旧 checkpoint 会被_migrate_checkpoint在加载时改写 channel 命名(start:X→branch:to:X,state.py:1612-1668),这是上游演进留下的兼容层。