01 · 三节点状态图(骨架)
本章讲:一次运行从输入到输出,在三个节点间怎么流转,以及每个节点内部在干嘛。读完你能在脑子里「跑」一遍 agent loop。
1.1 为什么是「图」而不是 while 循环
agent loop 本质就是个 while 循环,但有一堆中途出口:token 超了、要人审批、工具要延迟执行、流式要能中断恢复……写成裸 while 会越加越乱。
Pydantic AI 的选择:把循环拆成节点,每个节点 run() 完返回「下一个节点」或 End。这样「下一步是什么」是显式的返回值,可被外部一步步驱动(这正是 agent.iter() 能让你逐节点观察的原因)。
底层引擎是同仓的 pydantic_graph。一个节点就是继承 BaseNode 并实现 async run(ctx) -> 下一个节点 | End:
# pydantic_graph/pydantic_graph/basenode.py:33 —— 节点契约(简化展示)
class BaseNode(ABC, Generic[StateT, DepsT, NodeRunEndT]):
@abstractmethod
async def run(self, ctx) -> BaseNode[...] | End[NodeRunEndT]:
...
End(basenode.py:62)只是个装数据的壳;GraphRunContext(basenode.py:24)把 state(跨节点的可变状态)和 deps(只读配置/依赖)一起递给每个节点。
agent 的图在 _agent_graph.py:1598 的 build_agent_graph 里组装:注册三个干活节点 + 一个 SetFinalResult,入口是 UserPromptNode,产出是 FinalResult。
1.2 跨节点共享的两样东西
所有节点共享一份 state 和一份 deps:
GraphAgentState(_agent_graph.py:138)—— 一次运行里会变的东西:message_history(对话历史)、usage(token 用量)、run_step(第几轮)、output_retries_used(输出重试已用),还有run_id/conversation_id。GraphAgentDeps(_agent_graph.py:199)—— 运行期固定的配置:模型、tool_manager、output_schema、end_strategy、root_capability(中间件链)等。
记住这点:节点之间不直接传一大包参数,而是改同一份 state。比如 ModelRequestNode 把新的 ModelRequest append 进 state.message_history,下一个节点直接读这份历史。
1.3 节点 ①:UserPromptNode —— 把输入变成第一条请求
它要解决的小问题: 用户给的可能是「一句话」,也可能是「一段历史 + 新提问」,还可能是「带着工具结果来续跑」。得统一成「一个待发送的 ModelRequest」。
UserPromptNode.run(_agent_graph.py:286)的主要分支:
- 把传入的
message_history清洗后设为state.message_history,并记下new_message_index(用来区分「本次运行新产生的消息」)——_agent_graph.py:304。 - 如果带了
deferred_tool_results(上一轮挂起的工具结果回填),直接转去处理工具结果(_agent_graph.py:306)。 - 如果历史最后一条是
ModelResponse且还有未处理的工具调用,直接跳到CallToolsNode去处理它们(_agent_graph.py:349-353)——注意这是绕过模型请求的捷径。 - 否则:若是全新对话就拼系统提示(
_sys_parts),把用户输入包成UserPromptPart,组成next_message,转交ModelRequestNode(_agent_graph.py:372-383)。
关键细节: 系统提示只在历史为空时才拼(_agent_graph.py:373:if not messages:)。续跑已有对话时不会重复塞系统提示。
1.4 节点 ②:ModelRequestNode —— 真正调模型
它要解决的小问题: 把当前历史 + 工具定义 + 输出 schema 打包发给模型,拿回响应;这一步还要能流式、能被中间件包裹、能在出错时重试。
两条入口:run(非流式,_agent_graph.py:609)和 stream(流式,_agent_graph.py:622)。非流式主路径在 _make_request,核心是:
# _agent_graph.py:829 —— 实际调模型的 handler(简化展示)
async def model_handler(req_ctx):
response = await req_ctx.model.request(
req_ctx.messages, req_ctx.model_settings, req_ctx.model_request_parameters
)
return _narrow_tool_call_parts(response, req_ctx.model_request_parameters)
但模型不是裸调的——它被capability 中间件链包了一层(_agent_graph.py:847):
# _agent_graph.py:847 —— 模型调用被中间件包裹(简化展示)
model_response = await ctx.deps.root_capability.wrap_model_request(
run_context, request_context=request_context, handler=model_handler,
)
这就是为什么可观测、缓存、SkipModelRequest(跳过真实调用直接给个响应)这些功能能统一插进来——它们都是 capability。详见 04 章。
请求参数怎么来。 _prepare_request(_agent_graph.py:872)做几件事:把请求 append 进历史、run_step += 1、调 tool_manager.for_run_step(run_context) 解析本轮可用工具(这一步会报工具重名冲突)、解析指令(instructions)、构造 ModelRequestParameters。
重试。 如果中间件或模型抛 ModelRetry,它不会直接失败,而是 _build_retry_node 把重试提示变成下一条请求,回到模型(_agent_graph.py:866)。
拿到响应后,统一交给 _finish_handling,它产出下一个节点 CallToolsNode。
1.5 节点 ③:CallToolsNode —— 结束还是再问一轮
这是整张图的分岔点,也是最复杂的节点。_run_stream(_agent_graph.py:1124)把模型响应拆开看。
第一步:响应是不是「空的/只有思考」?(_agent_graph.py:1133)这种情况很 微妙,处理了一堆边界:
finish_reason == 'length'(被 token 限制截断)→ 抛UnexpectedModelBehavior,不重试(_agent_graph.py:1142)。content_filter(被内容过滤)→ 抛ContentFilterError(_agent_graph.py:1148)。- 输出类型允许
None→ 空响应也算合法结果(_agent_graph.py:1164)。 - 否则尝试从历史里捞回之前的文本(模型上一轮文本+工具调用,文本被丢了,这轮又空了)→
_recover_text_from_message_history(_agent_graph.py:1187)。 - 实在不行就消耗一次输出重试预算,回到
ModelRequestNode再问(_agent_graph.py:1203)。
第二步:把响应 parts 分类。(_agent_graph.py:1214)遍历每个 part:文本累加、工具调用收进 tool_calls、文件收进 files、思考/原生工具调用跳过。
第三步:决定走向。(_agent_graph.py:1242)优先级很明确:
有 tool_calls? → 执行工具(_handle_tool_calls),通常回到 ModelRequestNode
否则 有可用文本输出? → 当作最终文本输出(_handle_text_response)→ End
否则 有图片且允许图片输出? → 当作图片输出 → End
否则 → 拼一个 RetryPromptPart,回到模型让它重来
注意这条注释(_agent_graph.py:1243):只要有工具调用,就优先执行工具——即使模型同时回了文本。因为像 Anthropic 那样「先说一句『我去查一下』再调工具」很常见,那句文本不是最终答案。
1.6 工具执行:一轮里可能有好几个调用
_handle_tool_calls(_agent_graph.py:1292)调 process_tool_calls(_tool_execution.py:100)执行所有工具调用,收集 output_parts(工具返回)。然后:
- 如果产生了最终结果(某个 output 工具命中)→ 走
End(_agent_graph.py:1338)。 - 否则把所有工具返回拼成新的
ModelRequest,回到ModelRequestNode(_agent_graph.py:1346)——这就是「再问一轮」。
一轮里多个工具谁先谁后、谁能当最终答案、ModelRetry 怎么压制 output——这套裁决规则是 end_strategy,单独在 03 章讲。
巧妙之处:部分结果也不丢。 工具执行中途若被打断(异常/取消),已经跑完的工具返回会被打包成一条 state='interrupted' 的 ModelRequest 塞进历史(_agent_graph.py:1326-1335),这样 capture_run_messages 能看到「断点前做到哪」,支撑可恢复运行。
1.7 谁来驱动这张图
用户调 agent.run() 时,内部其实是 agent.iter() 建好图后,由 AgentRun 逐节点推进。
AgentRun.next(run.py:328)—— 推进一步并触发 capability 节点钩子(before_node_run/wrap_node_run/after_node_run)。agent.run()走这条。AgentRun.__anext__(run.py:201)—— 裸推进,不触发节点钩子。async for node in agent_run走这条。
这俩的区别有个坑:裸迭代不会 drain 某些挂起消息,所以遇到带 enqueue 的场景会抛 UndrainedPendingMessagesError(run.py:224)提醒你改用 next()。
# 示意,非源码:iter 让你逐节点观察图的流转
async with agent.iter('法国首都是哪?') as agent_run:
async for node in agent_run: # 依次拿到 UserPromptNode/ModelRequestNode/CallToolsNode/End
print(type(node).__name__)
print(agent_run.result.output) # 'Paris'
1.8 代码地图
| 主题 | 文件 | 关键符号 |
|---|---|---|
| 图组装 | pydantic_ai_slim/pydantic_ai/_agent_graph.py | build_agent_graph |
| 跨节点状态 | pydantic_ai_slim/pydantic_ai/_agent_graph.py | GraphAgentState, GraphAgentDeps |
| 节点① | pydantic_ai_slim/pydantic_ai/_agent_graph.py | UserPromptNode |
| 节点② | pydantic_ai_slim/pydantic_ai/_agent_graph.py | ModelRequestNode, _make_request, _prepare_request |
| 节点③ | pydantic_ai_slim/pydantic_ai/_agent_graph.py | CallToolsNode, _run_stream, _handle_tool_calls |
| 节点契约 | pydantic_graph/pydantic_graph/basenode.py | BaseNode, End, GraphRunContext |
| 驱动循环 | pydantic_ai_slim/pydantic_ai/run.py | AgentRun.next, AgentRun.__anext__ |
| 重试异常 | pydantic_ai_slim/pydantic_ai/exceptions.py | ModelRetry, UnexpectedModelBehavior |