跳到主要内容

Burr — 并行 map-reduce、依赖注入、巧妙之处与边界

最后一章:Burr 怎么做并行(同一逻辑跑在多份状态/多个动作上再汇总)、怎么把框架能力依赖注入进你的动作,以及全局的巧妙设计、边界与横向对比。

1. 并行:把「一群子应用」装进一个动作

它要解决的小问题

「把同一个问题用 5 个不同 prompt 各问一遍,再选最好的」「对 100 条记录各跑一遍同一子流程」——这是 map-reduce:map 出一堆并行任务,reduce 汇总。Burr 怎么在「状态机」框架里塞进并行?

核心思路:并行动作 = 一个特殊的单步动作

关键洞察(burr/core/parallelism.py:186 class TaskBasedParallelAction):一个并行步,对外仍是一个普通 SingleStepAction。它的 run_and_update 内部做三件事:

TaskBasedParallelAction (对外是 1 个动作)
│ run_and_update
┌──────────┼───────────────┐
▼ ▼ ▼
tasks() 每个 task 跑成 reduce()
生成一批 一个独立子 把这些子
SubGraph Application App 的终态
Task (并行/并发) 汇总回主 State
  • tasks(...)(parallelism.py:329,抽象):产出一串 SubGraphTask —— 每个描述「拿哪份初始状态、跑哪个子图、写哪些键」。
  • 每个 SubGraphTask.run(parallelism.py:144)用 _create_app_builder 把子图建成一个完整的子 Application 并跑到底。
  • reduce(state, states)(parallelism.py:342,抽象):拿到所有子应用的最终状态生成器,合并回主状态。

子图怎么来:RunnableGraph

并行任务跑的「子图」是 RunnableGraph(parallelism.py:53):包一个 Graph + 入口 + 终点。RunnableGraph.create(parallelism.py:65)能把「单个动作」或「现成 Graph」统一适配成可跑的子图——所以你 map 的对象既可以是一个动作,也可以是一整个子流程。

三种好用的封装

你通常不直接写 TaskBasedParallelAction,而是继承更具体的(都在 parallelism.py):

map 什么直觉
MapStates(:690)一个动作 × 多份状态同一逻辑跑在不同数据上(如批量处理 100 条)
MapActions(:593)多个动作 × 一份状态不同逻辑跑在同一数据上(如多模型投票)
MapActionsAndStates(:386)多动作 × 多状态的笛卡尔积最通用的底座

例如 MapStates:你实现 action()(给出要跑的子图)+ states(state, inputs)(产出一串状态)+ reduce(state, results)(汇总)。框架把它们组合成并行任务(parallelism.py:749 states:774 action:792 reduce)。

怎么真并行:可插拔 executor

并发执行交给一个 concurrent.futures.Executor。默认是 _create_default_executor(application.py:831),你可用 ApplicationBuilder.with_parallel_executor(application.py:2325)换成自己的(线程池/进程池/Ray 适配等)。子任务通过 ApplicationContext(见下)拿到这个 executor 工厂。

子应用的父子关系

每个子应用是被主应用「spawn」出来的,会带上 spawning_parent_pointer(application.py:847,由 with_spawning_parent 设置,:2544)。这和第 3 章的 fork(fork_parent_pointer)是两种不同关系:

fork(分叉): 旧线 ──┊──▶ 新线从某步起跑(平级,两条独立时间线)
spawn(派生): 父应用某一步 ──▶ 子应用整个跑完(嵌套,子在父的一步之内)

2. 依赖注入:框架能力怎么进你的动作

它要解决的小问题

你的动作有时需要框架的东西:当前 app_id、追踪器、能 spawn 子应用的上下文……但又不想把这些塞进 reads(它们不是业务状态)。Burr 用特殊命名的参数注入。

哨兵参数:__context__tracer

动作函数若声明了名为 __context__tracer 的参数,框架会自动注入对应对象(你通过 inputs 传):

# 示意,非源码 —— 动作声明 __context 即自动拿到应用上下文
@action(reads=["x"], writes=["y"])
def my_action(state, __context): # __context 由框架注入
print(__context.app_id, __context.sequence_id)
return state.update(y=state["x"] + 1)

注入表在 Application.__init__ 里建好(application.py:891-899 _dependency_factory):__tracer → 一个 TracerFactory,__context_context_factory_process_inputs(application.py:1021)在每步把这些工厂的产物填进动作的 kwargs(application.py:1046-1053)。

用户不能自己传 __ 开头的输入——会被显式拒绝(application.py:1023-1030),这些前缀保留给框架。

ApplicationContext 里有什么

ApplicationContext(application.py:618)是个 dataclass + 上下文管理器,带:app_idpartition_keysequence_idtrackerparallel_executor_factorystate_persister/initializer 等。它既给动作用,也让子应用(并行任务)能拿到父应用的 executor、追踪器,从而把 spawn 出来的子应用接进同一套追踪/持久化体系。ApplicationContext.get()(application.py:647)用 contextvars 让深层代码也能取到当前上下文。

3. 巧妙之处(可借鉴的技术)

  • 「并行是动作的特例」:不给状态机加新的执行模式,而是把「一堆子应用 + reduce」包成一个普通 SingleStepAction(parallelism.py:186)。核心循环零改动,却获得了 map-reduce。这是「用现有抽象表达新能力」的范本。
  • 依赖注入用命名约定:__context/__tracer 哨兵参数(application.py:891)让框架能力按需进动作,不污染业务状态键。_remap_dunder_parameters(application.py:130)还处理「参数实际名字不叫 __context 但类型对」的重映射。
  • reads/writes 三重防线:静态 AST 扫描(建图时,action.py:55)+ 运行时裁剪状态(subset,application.py:177)+ 运行后写键校验(application.py:255)。把「状态被悄悄改坏」从根上堵死。
  • 不可变状态 + delta 为「事件溯源历史」预留口子(state.py:308、issue #33),当前虽是 eager 全量计算,但 API 已经按未来形态设计。
  • 执行器可插拔:with_parallel_executor(application.py:2325)让并发后端从线程池换到任意 Executor,框架不绑定具体并发模型。
  • 零核心依赖:burr/core 不强依赖任何重型库;LLM、graphviz、pydantic、langchain 全是可选(用到才 import,失败就降级,见 state.py:487graph.py:204)。

4. 边界与局限(诚实)

  • 不替你做异步事件编排:README 自己的对比表(README.md:155)勾掉了「Asynchronous event-based orchestration」——Burr 是你驱动的状态机(你调 step/run),不是像 temporal 那样的事件驱动工作流引擎。
  • 状态全量物化:每步实打实算出整个状态 dict,而非只存 delta;大状态 + 长历史会有拷贝/序列化开销。代码多处注释承认这是次优(state.py:308application.py:202、issue #33)。
  • expr 条件不安全:Condition.expr 内部 eval,绝不能喂用户输入(action.py:417 明确警告)。
  • 同步钩子阻塞 async:异步应用里的同步钩子会阻塞事件循环(internal.py:205),库选择安全而非自动开线程。
  • sync/async 持久化不能混用:build() 配 sync persister、abuild() 配 async persister,搭错会报错(application.py:2569-2574 + build docstring 的兼容矩阵 :2796)。

5. 横向对比(同 shelf 的不同取舍)

面向「把 LLM 应用组织成图/流程」的兄弟项目里,Burr 的差异化取舍:

维度Burr 的选择对比
抽象核心显式状态机(节点+边+状态键)LangGraph 也是图,但 Burr 更强调「状态键级的读写契约」和「非 LLM 也适用」
状态模型不可变 + reads/writes 强制多数框架状态是可变共享字典,Burr 用契约挡偷改
持久化/回放一等公民(四元组主键 + fork + resume)不少框架持久化是事后补丁;Burr 从核心循环就按钩子接好
可观测自带 UI + 钩子底座追踪与持久化共用同一钩子机制
依赖核心零依赖、LLM 无关刻意不绑定模型/向量库,「不告诉你怎么调模型」(README.md:138)

(参考 README 的对比表 README.md:151-158,以及命名彩蛋:Burr 是 Hamilton 的「夙敌」,因为它给无环的 Hamilton DAG 补上了「带环的状态机」,README.md:162-164。)

6. 代码地图(导航索引)

主题文件符号名
并行动作底座burr/core/parallelism.pyTaskBasedParallelActionTaskBasedParallelAction.run_and_update
子任务/子图burr/core/parallelism.pySubGraphTaskRunnableGraphRunnableGraph.create
map-reduce 封装burr/core/parallelism.pyMapStatesMapActionsMapActionsAndStates
依赖注入工厂burr/core/application.py_dependency_factory_process_inputs_remap_dunder_parameters
应用上下文burr/core/application.pyApplicationContextApplicationContext.get_context_factory
可插拔执行器burr/core/application.pywith_parallel_executor_create_default_executor
派生/分叉指针burr/core/application.pyspawning_parent_pointerfork_parent_pointerwith_spawning_parent
公开 API 出口burr/core/__init__.pyactionStateApplicationBuilderwhenexpr