跳到主要内容

Burr — 生命周期钩子与流式动作

两个让 Burr「好观测、好交互」的机制。钩子是追踪与持久化共用的回调底座;流式动作让 LLM 的 token 一个个吐给用户,同时最后仍干净地更新状态。

1. 生命周期钩子:统一的回调底座

它要解决的小问题

「每步前后想插点逻辑」——记日志、存盘、发追踪、计时……如果每种都改核心循环就乱了。Burr 把这些统一成生命周期钩子:你实现某个钩子接口,框架在对应时机回调你。

有哪些钩子时机

钩子接口都在 burr/lifecycle/base.py,每个都有同步/异步两版:

钩子触发时机典型用途
PostApplicationCreateHook应用一建好注册图信息到 UI
PreRunStepHook每步执行前计时起点、记录入参
PostRunStepHook每步执行后(含失败)持久化、追踪
PreStartStreamHook / PostStreamItemHook / PostEndStreamHook流式输出的各阶段记录 token 级延迟
PreStartSpanHook / PostEndSpanHook / DoLogAttributeHook子追踪 spanOpenTelemetry 风格的细粒度 tracing
Pre/PostApplicationExecuteCallHook一次 run/iterate 整体前后整体计时

(对应符号见 burr/lifecycle/base.py:34 PreRunStepHook:88 PostRunStepHook:146 PostApplicationCreateHook 等。)

怎么实现的:按名注册 + MRO 扫描

关键类 LifecycleAdapterSet(burr/lifecycle/internal.py:109)。每个钩子基类用 @lifecycle.base_hook("pre_run_step") 装饰(internal.py:67 class lifecycle),把「这个类实现了哪个钩子名」登记到全局集合 REGISTERED_SYNC_HOOKS/REGISTERED_ASYNC_HOOKS(internal.py:32-33)。

你的 adapter 可能一次实现好几个钩子(多继承)。LifecycleAdapterSet 在初始化时遍历每个 adapter 的 MRO(方法解析顺序),把它实现的每个钩子归类到 sync_hooks / async_hooks 两张表:

# 真实源码节选 burr/lifecycle/internal.py:137-146 _get_lifecycle_hooks
for adapter in self.adapters:
for cls in inspect.getmro(adapter.__class__): # 沿继承链找
sync_hook = getattr(cls, SYNC_HOOK, None) # 这个基类登记的钩子名
if sync_hook is not None:
if adapter not in sync_hooks[sync_hook]:
sync_hooks[sync_hook].append(adapter)
# async 同理

怎么调:委托广播

核心循环不直接调钩子,而是喊一声「post_run_step 阶段到了」,由 adapter set 广播给所有登记了这个钩子的 adapter:

# 真实源码 burr/lifecycle/internal.py:174-183 call_all_lifecycle_hooks_sync
def call_all_lifecycle_hooks_sync(self, hook_name: str, **kwargs):
if not self._does_hook(hook_name, False): # 没人实现这个钩子 → 直接返回
return
for adapter in self.sync_hooks[hook_name]:
getattr(adapter, hook_name)(**kwargs) # 用名字动态调

异步版用 asyncio.gather 并发所有 async 钩子(internal.py:185-198);还有 call_all_lifecycle_hooks_sync_and_async(internal.py:200)先跑同步再 await 异步——异步应用里的同步钩子照样能用,只是会阻塞事件循环。

追踪也是钩子

本地追踪客户端 LocalTrackingClient(burr/tracking/client.py:155)就是一个实现了 PostApplicationCreateHook + PostRunStepHook 等的 adapter。ApplicationBuilder.with_tracker(application.py:2422)把它加进钩子集。所以追踪、持久化用的是同一套机制——这是 Burr 架构上很干净的一笔。

一个内部钩子的例子

框架自己也用钩子:TracerFactoryContextHook(application.py:768)在每步前后管理「追踪上下文」,它在 Application.__init__ 时被自动塞进 adapter set(application.py:878)。

2. 流式动作:边吐 token 边收尾

它要解决的小问题

LLM 回答是一个个 token 蹦出来的,你想实时显示给用户(而不是等整段)。但状态机又要求动作最终干净地更新状态。怎么兼得?

思路:生成器 yield 增量,最后一发带状态

流式动作是个生成器:中间每次 yield 一个增量结果(state_update=None),最后一次 yield 必须是 (最终结果, 新State)。框架靠「state_update 是不是 None」判断流是否结束。

# 示意,非源码 —— @streaming_action 的写法(对照真实 docstring action.py:1553)
@streaming_action(reads=["prompt"], writes=["response"])
def streaming_response(state):
buffer = []
for chunk in llm_stream(state["prompt"]):
buffer.append(chunk)
yield {"response": chunk}, None # 中间:只给增量,状态更新留 None
full = "".join(buffer)
yield {"response": full}, state.update(response=full) # 最后:给最终结果 + 新状态

重点看:中间 yield 的第二项是 None,最后一次才是真的 state.update(...)

真实实现:用 None 当「未结束」哨兵

_run_single_step_streaming_action(application.py:323)逐个消费生成器:

# 真实源码骨架 burr/core/application.py:341-389
for item in generator:
if isinstance(item, dict):
item = (item, None) # 允许只 yield dict,自动补 None
result, state_update = item
if state_update is None: # 中间增量
# 触发 post_stream_item 钩子(token 级追踪),然后
yield result, None
# 循环结束后
if state_update is None: # 从没给过最终状态 → 报错教你
raise ValueError(f"Action {action.name} did not return a state update...")
_validate_reducer_writes(action, state_update, action.name)
yield result, state_update # 最后吐一发带状态的

两个贴心点:

  • 容错:如果生成器在已经给出最终 state_update 之后才抛异常,框架不崩,记个 warning 用已有的最终态收尾(application.py:369-380)。
  • 强制收尾:如果你忘了在最后给状态更新,报错会直接给出正确写法示例(application.py:382-386)。

token 级追踪钩子

每吐一个增量,框架触发 post_stream_item 钩子(application.py:357-367),带上 item_indexstream_initialize_timefirst_stream_item_start_time——这让 UI 能算出「首 token 延迟」「token 间隔」这类流式指标。

用户侧:StreamingResultContainer

用户调 app.stream_result(halt_after=[...])(application.py:1394)拿到 (动作, StreamingResultContainer)。这个容器(burr/core/action.py:1008)是个会缓存最终结果的迭代器:

# 示意,非源码 —— 用法(对照 action.py:1021 docstring)
action, container = app.stream_result(halt_after=["ai_response"], inputs={"prompt": q})
for chunk in container: # 边迭代边拿增量
print(chunk["response"], end="")
final_result, final_state = container.get() # 迭代完后取最终结果 + 状态

它的 __next__(action.py:1076)在拿到「带 state 的那一项」时把它存进 self._resultStopIteration;get()(action.py:1106)负责把生成器跑干再返回缓存。__iter__finally 块保证无论如何最终都会触发一次回调(action.py:1094-1100)——这样即使用户提前 break,持久化/追踪的收尾钩子也不会漏。异步版 AsyncStreamingResultContainer(action.py:1114)结构对称。

3. 巧妙之处

  • 一套钩子,多种用途:持久化、追踪、计时、token 延迟统计——全是 adapter,核心循环只管「在某阶段广播」。新增可观测能力不碰核心。
  • MRO 扫描让一个 adapter 类能同时实现多个钩子,框架自动把它登记到对应阶段(internal.py:138)。
  • None 哨兵:流式动作用「state_update is None」干净地区分「增量」与「终值」,无需额外协议字段(application.py:343-368)。
  • finally 保证回调:流式容器即使被提前中断,也保证收尾回调跑一次(action.py:1094-1100),避免「存盘/追踪漏最后一笔」。

4. 边界与局限

  • 异步应用里跑同步钩子会阻塞事件循环(internal.py:205 + application.py:2834 注释明说),库选择「安全但可能慢」而非偷偷开线程。
  • 钩子是按名字字符串匹配的:实现了一个没注册的钩子名会被 _does_hook 当作错误拦下(internal.py:160-169),保护你不写错名。

5. 代码地图(导航索引)

主题文件符号名
钩子接口(各时机)burr/lifecycle/base.pyPreRunStepHookPostRunStepHookPostApplicationCreateHookPostStreamItemHook
钩子注册装饰器burr/lifecycle/internal.pylifecycleREGISTERED_SYNC_HOOKS
钩子集与广播burr/lifecycle/internal.pyLifecycleAdapterSet_get_lifecycle_hookscall_all_lifecycle_hooks_sync
追踪客户端(也是钩子)burr/tracking/client.pyLocalTrackingClient
框架内置追踪上下文钩子burr/core/application.pyTracerFactoryContextHook
流式动作基类burr/core/action.pyStreamingActionSingleStepStreamingActionFunctionBasedStreamingAction
流式装饰器burr/core/action.pystreaming_action
流式执行burr/core/application.py_run_single_step_streaming_action_run_multi_step_streaming_action
流式结果容器burr/core/action.pyStreamingResultContainerAsyncStreamingResultContainer
用户级流式 APIburr/core/application.pyApplication.stream_resultApplication.stream_iterate