跳到主要内容

回调管线:横切关注点做成可插拔钩子

这一章讲 Cua 怎么把“预算上限、只留最近 N 张图、PII 脱敏、遥测、保存轨迹”这些到处都要的逻辑,从主循环里抽出来,做成可插拔的钩子。

它要解决的小问题

“跑超 $5 就停”“历史只留最近 3 张截图”“上传前把邮箱抹掉”——这些和“看-想-做”循环正交,但又散落在循环的各个时间点。如果全写进 run(),主循环会变成一坨。

思路/直觉:主循环只“广播事件”,回调“按需响应”

借鉴 web 框架的中间件:定义一组生命周期事件(run 开始、调 LLM 前、调 LLM 后、每圈是否继续、拿到 usage……)。主循环到每个点就按顺序喊一遍所有回调的对应方法。回调可以:

  • 改消息(on_llm_start/on_llm_end 返回新的消息列表 → 链式传递)
  • 叫停(on_run_continue 返回 False → 循环 break)
  • 纯旁观(on_usage/on_screenshot → 记账、存盘)

图示:生命周期点上的广播

run() 时间线 回调在这些点被依次调用
───────────────────────────────────────────────────────────
run 开始 ──► on_run_start(kwargs, old_items)
┌─ 每圈开始 ──► on_run_continue(...) ──► 任一返回 False → break
│ 调模型前 ──► on_llm_start(messages) ──► 返回改过的 messages(链式)
│ 例: PII脱敏 / 只留最近N张图
│ (predict_step 内部) ──► on_api_start / on_api_end / on_usage / on_screenshot
│ 调模型后 ──► on_llm_end(output) ──► 返回改过的 output(链式)
│ 例: OperatorNormalizer 规范化动作
└─ 圈尾
run 结束 ──► on_run_end(kwargs, old, new)

怎么读: 竖线内是 while 的一圈。on_llm_start/on_llm_end变换器(输入消息、输出消息,串成链);on_run_continue闸门(任一回调说停就停);其余是旁观者(只接收,不改流程)。

原理演示(简化)

回调基类全是“默认啥也不做”的钩子,你只覆盖关心的那几个:

# 示意,非源码:一个“花超预算就停”的回调
class BudgetCallback(AsyncCallbackHandler):
def __init__(self, cap): self.cap, self.spent = cap, 0.0
async def on_usage(self, usage): # 旁观:累加花费
self.spent += usage.get("response_cost", 0)
async def on_run_continue(self, kwargs, old, new): # 闸门:超了就停
return self.spent < self.cap

主循环这边只是“有这个方法就调”:

# 示意,非源码:主循环如何广播
async def _on_run_continue(self, kwargs, old, new):
for cb in self.callbacks:
if hasattr(cb, "on_run_continue"):
if not await cb.on_run_continue(kwargs, old, new):
return False # 任一回调叫停 → 整体停
return True

重点看: 闸门是“一票否决”(任一回调返回 False 即停);变换器是“接力”(上一个的输出是下一个的输入)。

真实实现

基类AsyncCallbackHandler(agent/cua_agent/callbacks/base.py:9)定义了全部钩子,默认实现都是 pass 或原样返回——所以子类只覆盖需要的。

主循环的广播器集中在 agent.py 的一组 _on_* 方法(agent/cua_agent/agent.py:615-724)。闸门版 _on_run_continue 是“任一 False 即停”(agent.py:632-644);变换器版 _on_llm_start 是“链式传递”(agent.py:646-652)。

预算回调(agent/cua_agent/callbacks/budget_manager.py:12,BudgetManagerCallback):on_usage 累加 response_cost,on_run_continue 超额返回 False(budget_manager.py:36-56)。

动作规范化回调(agent/cua_agent/callbacks/operator_validator.py:20,OperatorNormalizerCallback)在 on_llm_end 里把模型输出的动作修形:把 hotkey 重命名成 keypress、把 keys 统一成列表、按动作类型只保留合法参数(operator_validator.py:23-103)。这就是“不同模型动作格式的最后一道找平”。

内置回调清单

构造 ComputerAgent 时,框架按参数自动插入回调(agent/cua_agent/agent.py:326-403):

回调触发参数干什么文件
OperatorNormalizerCallback总是(插在最前)规范化模型输出的动作callbacks/operator_validator.py
PromptInstructionsCallbackinstructions=...注入系统指令callbacks/prompt_instructions.py
LoggingCallbackverbosity=...按级别打日志callbacks/logging.py
ImageRetentionCallbackonly_n_most_recent_images=N历史只留最近 N 张截图callbacks/image_retention.py
TrajectorySaverCallbacktrajectory_dir=...存截图+响应轨迹callbacks/trajectory_saver.py
BudgetManagerCallbackmax_trajectory_budget=...花超预算就停callbacks/budget_manager.py
TelemetryCallback / OtelCallbacktelemetry_enabled(默认开)匿名产品分析 / OTel 指标callbacks/telemetry.py, otel.py

(还有 PiiAnonymizationCallback(callbacks/pii_anonymization.py)可手动加,在 on_llm_start 脱敏。)

关键细节/坑

  • 顺序有讲究。 OperatorNormalizerCallbackinsert(0, ...) 强制放在最前(agent.py:329),保证它最先看到/规范化动作;遥测回调在 agent_loop 选定之后才加,这样能记录正确的 agent_type(agent.py:393-403)。
  • 预算超了默认不抛错,只停。 raise_error=False 时打印一行然后返回 False,温和退出(budget_manager.py:48-55)。
  • 回调可叠加状态。 像预算这种回调是有状态的实例,reset_after_each_run 控制是否每次 run() 归零(budget_manager.py:31-34)。
  • 钩子是“可选实现”。 广播器全都 if hasattr(callback, "...") 再调,所以自定义回调只写关心的方法即可,不必实现全套。

代码地图

主题文件路径符号名
回调基类(全部钩子)agent/cua_agent/callbacks/base.pyAsyncCallbackHandler
主循环广播器agent/cua_agent/agent.py_on_run_continue, _on_llm_start, _on_llm_end, _on_usage
自动装配回调agent/cua_agent/agent.pyComputerAgent.__init__
预算闸门agent/cua_agent/callbacks/budget_manager.pyBudgetManagerCallback
动作规范化agent/cua_agent/callbacks/operator_validator.pyOperatorNormalizerCallback