跳到主要内容

Burr — 持久化、断点续跑与 fork

这一章是 Burr 区别于「普通 LLM 链」的杀手锏:每一步状态都能存盘,进程挂了能从上次接着跑,还能从任意历史步分叉出一条新应用线。本章讲它怎么实现。

1. 持久化接口:存什么、用什么主键

它要解决的小问题

要能「读档续跑」,就得知道:存的那条记录唯一对应哪个应用、哪一步?Burr 用一个四元组主键回答。

主键:四个字段定位一份状态

字段含义
partition_key分区键,通常是「用户/租户」维度,把不同人的应用分开
app_id一次应用实例的唯一 ID(一段对话/一次运行)
sequence_id第几步(每 step 自增)
position这一步执行的动作名

(burr/core/persistence.py:125 BaseStateSaver.save 的参数;:138 注释解释「为啥用四元组而非三元组」。)

两个抽象方法

持久化分读写两半,合起来是 BaseStatePersister(burr/core/persistence.py:202):

  • :BaseStateSaver.save(partition_key, app_id, sequence_id, position, state, status, ...)(persistence.py:125)。status"completed""failed"——失败时存的是动作执行前的状态(persistence.py:148 注释)。
  • :BaseStateLoader.load(partition_key, app_id, sequence_id=None)(persistence.py:59)。不给 sequence_id 就返回「最近一次完整完成的步」。还有 list_app_ids(列某分区下所有应用)。

异步版完全对称:AsyncBaseStateLoader / AsyncBaseStateSaver(persistence.py:83/157)。

自带实现:SQLitePersister

开箱即用的是 SQLitePersister(burr/core/persistence.py:312)。它的建表语句把上面的设计落成 schema:

-- 真实源码 burr/core/persistence.py:409-419 create_table_if_not_exists
CREATE TABLE IF NOT EXISTS {table_name} (
partition_key TEXT DEFAULT '',
app_id TEXT NOT NULL,
sequence_id INTEGER NOT NULL,
position TEXT NOT NULL,
status TEXT NOT NULL,
state TEXT NOT NULL, -- 序列化后的状态(JSON)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (partition_key, app_id, sequence_id, position)
)

注意 state 是文本列——状态先被 State.serialize(burr/core/state.py:325)转成 JSON 可序列化的 dict 再存。

必须先 initialize

SQLitePersister 用前要 .initialize() 建表(persistence.py:428)。builder 在装配同步持久化时会强制检查这点,没初始化直接报错教你去调(burr/core/application.py:2579-2585 _set_sync_state_persister)。

2. 状态怎么序列化

思路:逐字段选 serde

State.serialize(burr/core/state.py:325)对每个键值做序列化。优先级:

  1. 若该字段名在全局注册表 FIELD_SERIALIZATION 里(用户用 register_field_serde 注册的,state.py:35)→ 用自定义 serde。
  2. 否则走通用 serde.serialize(burr/core/serde.py)。

失败时报错会带上是哪个字段、什么类型(state.py:341-344),便于排查「某个对象不可序列化」。

内置插件

模块加载时会自动尝试注册三个 serde 插件:langchainpydanticpandas(burr/core/state.py:487-491)——装了对应库就能直接存它们的对象,没装就静默跳过。

3. 存盘怎么触发:它是一个钩子

持久化不是核心循环的硬编码,而是搭在生命周期钩子上的。PersisterHook(burr/core/persistence.py:214)是个 PostRunStepHook:每步跑完,框架触发 post_run_step,这个钩子就调 persister.save(...)

_step() ──finally──▶ post_run_step 钩子触发

├─ PersisterHook ──▶ persister.save(...) 存盘
└─ TrackingClient ──▶ 写追踪日志(同一机制)

ApplicationBuilder.with_state_persister(application.py:2521)在 build 时:若传的是 BaseStateSaver,就用 PersisterHook 包一层加进钩子集;若你传的是「裸钩子」(power-user 模式),直接当钩子用(application.py:2576-2588)。

4. 断点续跑:initialize_from

它要解决的小问题

应用重启后,我想接着上次跑——状态从哪来?序号接到几?入口是「从头」还是「上次停的地方」?

API 形态

# 示意,非源码 —— 续跑同一个 app_id
app = (
ApplicationBuilder()
.with_actions(...).with_transitions(...)
.with_identifiers(app_id="conv-123") # 用这个 ID 去查历史
.with_state_persister(persister) # 存盘后端
.initialize_from(
persister,
resume_at_next_action=True, # 接着上次的下一步
default_state={"chat_history": []}, # 查不到历史时的兜底
default_entrypoint="human_input", # 兜底入口
)
.build()
)

内部怎么接

initialize_from(application.py:2472)只是记录意图(把 initializer、resume_at_next_actiondefault_state 等存到 builder 字段),真正的加载发生在 build()_load_from_sync_persister(application.py:2691)。它做的事:

  1. (partition_key, app_id)persister.load(...) 取最近状态。
  2. 查到了 → 用存档状态 + 存档的 sequence_id;resume_at_next_action=True 时,不重置 __PRIOR_STEP,于是 get_next_action 会自然走「上次的下一步」。
  3. 查不到 → 用 default_state + default_entrypoint 从头开始。

这就是为什么 initialize_fromwith_state/with_entrypoint 二选一(application.py:2499-2504 直接报错):前者从持久化推导状态与入口,后者手动给定。

5. Fork:从历史某点分叉一条新应用

它要解决的小问题

「这次对话跑到第 5 步出了个有趣的岔路,我想保留原线,另开一条从第 5 步出发的新线做 A/B」——这就是 fork。

怎么用

initialize_fromfork_from_* 参数(application.py:2478-2480):

参数含义
fork_from_app_id哪个旧应用分叉(注意:不同于 with_identifiers 设的「新应用 ID」)
fork_from_partition_key旧应用的分区键(可选)
fork_from_sequence_id从旧应用的第几步分叉(可选,默认最新)

校验:给了 fork_from_partition_key/sequence_id 却没给 fork_from_app_id 会报错(application.py:2505-2510)。

fork vs resume 的区别

resume(续跑): fork(分叉):
旧 app_id ────────▶ 继续 旧 app_id ──┐ (只读它的历史状态)
(同一条线变长) ├──▶ 新 app_id 从某步起跑
└──▶ 旧线保持不动
  • resume:用 with_identifiers 的 ID 查自己的历史,接着写下去(同一 app_id)。
  • fork:从 fork_from_app_id 读出某步状态,作为app_id 的起点;两条线互不影响。

fork 信息会通过 fork_parent_pointer(application.py:846)挂到新 Application 上,供追踪 UI 画出「这条线是从那条线的第几步分出来的」父子关系。还有一个相关但不同的概念 spawning_parent_pointer(application.py:847/with_spawning_parent,:2544):用于「一个动作内部 spawn 出子应用」(见并行章)。

6. 巧妙之处

  • 持久化 = 钩子,不是硬编码:核心循环完全不知道「持久化」这回事,加存盘只是加一个 PostRunStepHook。同一机制也驱动追踪——一套接口,两种用途(persistence.py:214 PersisterHooktracking/client.py:155 LocalTrackingClient 都是钩子)。
  • failed 状态也存:post_run_stepfinally 里带 exception 触发,所以失败步也会落库(状态=动作执行前),便于「从失败点重试」。
  • 四元组主键里的 position:存动作名让你能精确区分「同一序号下不同动作」并支持精细回放,代码自嘲是「over-engineering」但留了扩展余地(persistence.py:138)。

7. 代码地图(导航索引)

主题文件符号名
持久化读写接口burr/core/persistence.pyBaseStateLoaderBaseStateSaverBaseStatePersister
异步持久化接口burr/core/persistence.pyAsyncBaseStateLoaderAsyncBaseStateSaver
SQLite 实现burr/core/persistence.pySQLitePersisterSQLitePersister.create_table_if_not_existsSQLitePersister.initialize
存盘钩子burr/core/persistence.pyPersisterHookPersisterHook.post_run_step
状态序列化burr/core/state.pyState.serializeState.deserializeregister_field_serde
装配持久化burr/core/application.pyApplicationBuilder.with_state_persister_set_sync_state_persister_set_async_state_persister
续跑/分叉入口burr/core/application.pyApplicationBuilder.initialize_from_load_from_sync_persister
父子指针burr/core/application.pyfork_parent_pointerwith_spawning_parent