第 1 章 · 一次 run() 的完整流水线
这章把
agent.run("问题")从入口追到底,讲清两件事:编排层(_run.py)做了什么准备和收尾,以及**「反复调工具」的循环为什么在Model层而不在Agent层**。读完你能在脑子里画出整条数据流。
1.1 入口:Agent.run 只是个转发
Agent 本身是个 @dataclass(init=False)(agent/agent.py:68),几十个字段都是配置开关。run() 方法不干实事,直接把所有参数转发给 _run.run_dispatch:
# agent/agent.py:1391 —— run() 的真身(def 在 :1391,转发在 :1417)(节选)
def run(self, input, *, stream=None, ...) -> Union[RunOutput, Iterator[...]]:
return _run.run_dispatch(self, input=input, stream=stream, ...)
这样设计的好处:agent.py 只放「一个 agent 能配什么」(可读性强),沉重的执行逻辑全在 _run.py、_messages.py、_tools.py 等私有模块里(agent/agent.py:23-34 的 import 列出了这一族模块)。
1.2 run_dispatch:开跑前的准备
run_dispatch(agent/_run.py:1292)是同步路径的总调度。它在进入主流程前做几件「一次性」的事:
- 生成 run_id 并立即注册,用于取消追踪:
run_id = run_id or str(uuid4())(_run.py:1326)。 - 同步/异步 DB 把关:如果 agent 用的是异步 DB,这里直接抛错让你改用
arun(_run.py:1322)。 - 校验输入、规范化 hooks、初始化会话/agent、把媒体(图/音/视频/文件)包成
RunInput(_run.py:1340-1367)。
这一层产出一个 RunOutput —— 本次运行的「答卷」对象,后面每一步都往里填(内容、工具执行记录、状态、指标),最后返回给你。RunOutput 的字段见 run/agent.py(如 tools、content、status)。
1.3 _run:16 步流水线
真正的主流程是 _run(agent/_run.py:339)。它的 docstring 把步骤列得很清楚,这里按「准备 → 执行 → 收尾」三段归类:
准备段(把上下文备齐):
- 读或建会话(
read_or_create_session,_run.py:412) - 更新元数据、加载会话状态(
_run.py:416-429) - 解析依赖(
resolve_run_dependencies,_run.py:433) - 跑 pre-hooks(校验、护栏,可改输入)(
_run.py:440-455) - 决定这次给模型哪些工具(
get_tools+determine_tools_for_model,_run.py:458-471) - 拼好要发给模型的消息(
get_run_messages,_run.py:474)
并发旁路(不阻塞主流程): 拼完消息后,立刻开三个后台 future——抽记忆、抽学习信号、抽文化知识(_run.py:497-518)。它们和主模型调用并行跑,收尾时再 join。
执行段:
- 若开了推理,先让模型「想一轮」(
handle_reasoning,_run.py:523,详见第 5 章) - 调一次
call_model_with_fallback(...)(_run.py:531)——这一步内部是整个工具循 环
收尾段:
- 把模型结果写进
RunOutput(update_run_response,_run.py:563) - 结构化输出、存媒体、跑 post-hooks、等后台记忆写完、生成会话摘要、存库(
_run.py后续步骤,对应 docstring 第 10-16 步)
注意 _run.py:400 外面还套了一层 retry 循环(for attempt in range(num_attempts)):整个准备+执行可按 agent.retries 重试。
1.4 关键直觉:循环在哪一层?
这是最容易误解的点。_run 只调一次 model.response(_run.py:531)。那「模型要调工具→执行→再问模型」的反复,发生在哪?
答案:在 Model.response 内部的 while True。 看 models/base.py:703:
# models/base.py:703 —— Model 层的 agentic loop(节选骨架)
while True:
assistant_message = Message(role=self.assistant_message_role)
self._process_model_response(messages=messages, assistant_message=assistant_message, ...) # 问一次模型
messages.append(assistant_message)
if assistant_message.tool_calls: # 模型要调工具?
function_calls_to_run = self._prepare_function_calls(...)
for fc_resp in self.run_function_calls(...): # 执行它们
...
self.format_function_call_results(...) # 把结果塞回 messages
continue # 带着工具结果再问一次
break # 没有工具调用 → 收工
用图说清这两层的分工(从上到下是调用栈,横向箭头是「一次 run 里只发生一次」vs「循环 N 次」):
_run (编排层) ── 调用一次 ──▶ model.response
·准备上下文 │
·调一次模型 ▼
·收尾存储 ┌──────────── while True ───────────┐
│ 问模型 ──▶ 有 tool_calls? │
│ ├─是→执行工具→结果喂回→ continue ┘ (循环)
│ └─否→ break ─▶ 返回最终回答
└──────────────────────────────── ────┘
为什么这么设计: 不同模型 provider(OpenAI / Anthropic / Gemini …)的消息格式、工具格式都不同,这些差异由各 Model 子类的 _process_model_response / _format_tools 吸收;而「反复调工具」的控制流只写一份,放在 models/base.py 的基类里。Agent 层因此完全不关心「调了几轮工具」。
1.5 四条路径:sync / async × 普通 / streaming
同一套逻辑在 _run.py 里有四份实现,因为 Python 的同步、异步、生成器、异步生成器不能共用一个函数体:
| 路径 | 编排函数(_run.py) | Model 层对应 |
|---|---|---|
| 同步、整段返回 | _run(:339) | Model.response(base.py:650) |
| 同步、流式 | _run_stream(:752) | Model.response_stream(base.py:1362) |
| 异步、整段返回 | _arun(:1472) | Model.aresponse(base.py:881) |
| 异步、流式 | _arun_stream(:2147) | Model.aresponse_stream(base.py:1641) |
四者步骤注释一致(都标着「8. Reason about the task if reasoning is enabled」等),只是 sync 用 deque(gen, maxlen=0) 消费生成器、stream 用 yield from(对比 _run.py:523 的 handle_reasoning 和 _run.py:942 的 yield from handle_reasoning_stream)。
streaming 模式下产出什么: 不是返回一个 RunOutput,而是沿途 yield 一串 RunOutputEvent(事件枚举见 run/agent.py:143 的 RunEvent),前端可据此做打字机效果、显示「正在调用工具」等。
1.6 巧妙之处
- per-turn checkpoint barrier。 Model 循环每跑完一批工具,会调一次
after_tool_results回调(models/base.py:842),用于checkpoint="tool-batch"时把中间状态落库。回调失败只记日志、不杀 run(base.py:845)——「存档失败不能搞挂正在干活的 agent」。 - 取消是协作式的。
_run在多个关键点插了raise_if_cancelled(run_response.run_id)(_run.py:435、520、526、552),而不是硬 kill 线程。 - 后台记忆并发。 抽记忆/学习/文化知识在拼完消息后立刻起后台 future(
_run.py:497-518),与主模型调用重叠,收尾才 join——省掉串行等待。
1.7 代码地图
| 主题 | 文件 | 符号 |
|---|---|---|
| run 入口转发 | libs/agno/agno/agent/agent.py | Agent.run、Agent.arun |
| 调度与准备 | libs/agno/agno/agent/_run.py | run_dispatch、arun_dispatch |
| 同步主流程 | libs/agno/agno/agent/_run.py | _run |
| 流式/异步主流程 | libs/agno/agno/agent/_run.py | _run_stream、_arun、_arun_stream |
| Model 工具循环 | libs/agno/agno/models/base.py | Model.response、Model.response_stream |
| 结果回填 | libs/agno/agno/agent/_response.py | update_run_response |
| 运行答卷对象 | libs/agno/agno/run/agent.py | RunOutput、RunEvent |