跳到主要内容

执行与可观测骨架:Runnable / RunContext / Emitter

agent、工具、工作流、需求……这些看起来不同的东西,在 BeeAI 里共享同一套执行 + 事件骨架。理解这套骨架,你就理解了「为什么日志、流式、询问许可这些横切能力能统一挂上去」。

4.1 三个概念,一句话各自定位

概念定位文件
Runnable「可被 run 的东西」的统一接口,带一个 emitter + 一组中间件runnable.py
RunContext一次执行的上下文:run_id、abort 信号、emitter 子节点context.py
Emitter命名空间化的事件总线,父子可 pipe 成树emitter/emitter.py

4.2 Runnable:统一入口

Runnable(runnable.py:52-86)是抽象基类,要求子类提供 runemitter。agent 的基类 BaseAgent 就是 Runnable 的子类(agents/base.py:62)。

真正的魔法在 @runnable_entry 装饰器(runnable.py:89-131)。它把一个普通的 async def run 包成「自动进入 RunContext」的版本:

# 示意,浓缩自 runnable.py:101-129
def wrapper(*args, **kwargs):
self = args[0]
return (
RunContext.enter(self, inner, signal=..., run_params={"input": args[1], ...})
.middleware(*self.middlewares) # 把该 runnable 的中间件挂上
.context(kwargs.get("context") or {})
)

所以任何 runnable 一被 run,框架都会:① 建一个 RunContext;② 把它的中间件接上;③ 返回一个 Run 对象(可 await、也可 async for 迭代事件)。这就是 01 章RequirementAgent.run 上那个 @runnable_entry 在做的事。

4.3 RunContext:一次执行的「信封」

RunContext.enter(context.py:187-273)是每次执行的真正入口。它做几件关键事:

  • 建立父子关系。ContextVar 里取当前上下文当 parent(context.py:195),于是嵌套执行(agent 调工具、工具内部又跑子 agent)天然形成一棵执行树,每个节点有自己的 run_id 和共享的 group_id
  • 建一个 emitter 子节点pipe 到父 emitter(context.py:144-154),让事件能向上冒泡。
  • 用 abort 信号包裹执行。 它同时跑「真正的任务」和「等待 abort 信号」两个 task,谁先完成谁说了算(context.py:235-259)——这是框架统一的取消机制。
  • 发四个生命周期事件: start / success / error / finish(context.py:209-269)。中间件和需求都靠监听这四个事件工作。

Run 对象本身(context.py:47-124)支持两种消费方式:await run(只要结果)或 async for (data, event) in run(流式拿每个事件)。后者让你能实时看到 agent 内部每一步。

4.4 Emitter:命名空间化的事件树

Emitter(emitter/emitter.py:52)是事件总线。每个 emitter 有一个 namespace(如 ["agent", "requirement"]),事件的完整路径就是 namespace + 事件名(如 agent.requirement.start)。

父子关系靠 child() + pipe()(emitter.py:88-121):子 emitter 的所有事件通过一个 *.* 监听器转发给父 emitter。于是整棵执行树的事件最终都能在根上被监听到。

监听用 on(matcher, callback, options)(emitter.py:135-157),matcher 可以是:

matcher 形式匹配
"start"本 emitter 自己的 start 事件
"agent.requirement.start"这条精确路径
"*"本 emitter 的所有事件(不含嵌套)
"*.*"所有事件(含嵌套子节点)
正则 / 函数自定义匹配

两个让这套系统强大的 option:is_blocking=True(监听器执行完才继续,询问许可就靠它)和 priority(用 bisect 按优先级插入监听器列表,emitter.py:189-193,让某些中间件能先于别人拿到事件)。

4.5 中间件:挂在事件树上的横切能力

中间件(RunMiddlewareType,context.py:42-44)就是「在执行开始时被 bind(ctx) 一次、然后往 ctx.emitter 上挂监听器」的对象。两个代表:

GlobalTrajectoryMiddleware(middleware/trajectory.py)—— 把整棵执行树打印成带缩进的轨迹日志。它在 bind 时给 emitter 挂上 start/success/error/finish 的监听器(trajectory.py:100-118),并维护每个 run_id 的「深度」来算缩进。它给不同实体加 emoji 前缀(trajectory.py:79:agent 🤖 / ChatModel 💬 / Tool 🛠️ / Requirement 🔎),一眼能看出谁在调谁。

# 示意:挂上轨迹中间件后,run 会把内部每一步打到 stdout
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware
await agent.run("...", middlewares=[GlobalTrajectoryMiddleware()])

StreamToolCallMiddleware(middleware/stream_tool_call.py)—— runner 用它把 final_answer 工具的流式输出实时发成 final_answer 事件(见 _runner.py:92-108),从而支持「答案边生成边吐」。

带走的点: 因为 agent / 工具 / 工作流全都跑在同一套 RunContext + Emitter 上,任何横切能力(日志、流式、人审、限流)都只需写一个监听事件树的中间件,而不必改 agent 内部代码。这是这套骨架最大的收益。


下一章看怎么把多个 agent 拼成系统。→ 04-multi-agent.md