跳到主要内容

01 · 三节点状态图(骨架)

本章讲:一次运行从输入到输出,在三个节点间怎么流转,以及每个节点内部在干嘛。读完你能在脑子里「跑」一遍 agent loop。

1.1 为什么是「图」而不是 while 循环

agent loop 本质就是个 while 循环,但有一堆中途出口:token 超了、要人审批、工具要延迟执行、流式要能中断恢复……写成裸 while 会越加越乱。

Pydantic AI 的选择:把循环拆成节点,每个节点 run() 完返回「下一个节点」或 End。这样「下一步是什么」是显式的返回值,可被外部一步步驱动(这正是 agent.iter() 能让你逐节点观察的原因)。

底层引擎是同仓的 pydantic_graph。一个节点就是继承 BaseNode 并实现 async run(ctx) -> 下一个节点 | End

# pydantic_graph/pydantic_graph/basenode.py:33 —— 节点契约(简化展示)
class BaseNode(ABC, Generic[StateT, DepsT, NodeRunEndT]):
@abstractmethod
async def run(self, ctx) -> BaseNode[...] | End[NodeRunEndT]:
...

Endbasenode.py:62)只是个装数据的壳;GraphRunContextbasenode.py:24)把 state(跨节点的可变状态)和 deps(只读配置/依赖)一起递给每个节点。

agent 的图在 _agent_graph.py:1598build_agent_graph 里组装:注册三个干活节点 + 一个 SetFinalResult,入口是 UserPromptNode,产出是 FinalResult

1.2 跨节点共享的两样东西

所有节点共享一份 state 和一份 deps

  • GraphAgentState_agent_graph.py:138)—— 一次运行里会变的东西:message_history(对话历史)、usage(token 用量)、run_step(第几轮)、output_retries_used(输出重试已用),还有 run_id / conversation_id
  • GraphAgentDeps_agent_graph.py:199)—— 运行期固定的配置:模型、tool_manageroutput_schemaend_strategyroot_capability(中间件链)等。

记住这点:节点之间不直接传一大包参数,而是改同一份 state。比如 ModelRequestNode 把新的 ModelRequest append 进 state.message_history,下一个节点直接读这份历史。

1.3 节点 ①:UserPromptNode —— 把输入变成第一条请求

它要解决的小问题: 用户给的可能是「一句话」,也可能是「一段历史 + 新提问」,还可能是「带着工具结果来续跑」。得统一成「一个待发送的 ModelRequest」。

UserPromptNode.run_agent_graph.py:286)的主要分支:

  • 把传入的 message_history 清洗后设为 state.message_history,并记下 new_message_index(用来区分「本次运行新产生的消息」)——_agent_graph.py:304
  • 如果带了 deferred_tool_results(上一轮挂起的工具结果回填),直接转去处理工具结果(_agent_graph.py:306)。
  • 如果历史最后一条是 ModelResponse还有未处理的工具调用,直接跳到 CallToolsNode 去处理它们(_agent_graph.py:349-353)——注意这是绕过模型请求的捷径。
  • 否则:若是全新对话就拼系统提示(_sys_parts),把用户输入包成 UserPromptPart,组成 next_message,转交 ModelRequestNode_agent_graph.py:372-383)。

关键细节: 系统提示只在历史为空时才拼(_agent_graph.py:373if not messages:)。续跑已有对话时不会重复塞系统提示。

1.4 节点 ②:ModelRequestNode —— 真正调模型

它要解决的小问题: 把当前历史 + 工具定义 + 输出 schema 打包发给模型,拿回响应;这一步还要能流式、能被中间件包裹、能在出错时重试。

两条入口:run(非流式,_agent_graph.py:609)和 stream(流式,_agent_graph.py:622)。非流式主路径在 _make_request,核心是:

# _agent_graph.py:829 —— 实际调模型的 handler(简化展示)
async def model_handler(req_ctx):
response = await req_ctx.model.request(
req_ctx.messages, req_ctx.model_settings, req_ctx.model_request_parameters
)
return _narrow_tool_call_parts(response, req_ctx.model_request_parameters)

但模型不是裸调的——它被capability 中间件链包了一层(_agent_graph.py:847):

# _agent_graph.py:847 —— 模型调用被中间件包裹(简化展示)
model_response = await ctx.deps.root_capability.wrap_model_request(
run_context, request_context=request_context, handler=model_handler,
)

这就是为什么可观测、缓存、SkipModelRequest(跳过真实调用直接给个响应)这些功能能统一插进来——它们都是 capability。详见 04 章

请求参数怎么来。 _prepare_request_agent_graph.py:872)做几件事:把请求 append 进历史、run_step += 1、调 tool_manager.for_run_step(run_context) 解析本轮可用工具(这一步会报工具重名冲突)、解析指令(instructions)、构造 ModelRequestParameters

重试。 如果中间件或模型抛 ModelRetry,它不会直接失败,而是 _build_retry_node 把重试提示变成下一条请求,回到模型(_agent_graph.py:866)。

拿到响应后,统一交给 _finish_handling,它产出下一个节点 CallToolsNode

1.5 节点 ③:CallToolsNode —— 结束还是再问一轮

这是整张图的分岔点,也是最复杂的节点。_run_stream_agent_graph.py:1124)把模型响应拆开看。

第一步:响应是不是「空的/只有思考」?_agent_graph.py:1133)这种情况很微妙,处理了一堆边界:

  • finish_reason == 'length'(被 token 限制截断)→ 抛 UnexpectedModelBehavior,不重试(_agent_graph.py:1142)。
  • content_filter(被内容过滤)→ 抛 ContentFilterError_agent_graph.py:1148)。
  • 输出类型允许 None → 空响应也算合法结果(_agent_graph.py:1164)。
  • 否则尝试从历史里捞回之前的文本(模型上一轮文本+工具调用,文本被丢了,这轮又空了)→ _recover_text_from_message_history_agent_graph.py:1187)。
  • 实在不行就消耗一次输出重试预算,回到 ModelRequestNode 再问(_agent_graph.py:1203)。

第二步:把响应 parts 分类。_agent_graph.py:1214)遍历每个 part:文本累加、工具调用收进 tool_calls、文件收进 files、思考/原生工具调用跳过。

第三步:决定走向。_agent_graph.py:1242)优先级很明确:

有 tool_calls? → 执行工具(_handle_tool_calls),通常回到 ModelRequestNode
否则 有可用文本输出? → 当作最终文本输出(_handle_text_response)→ End
否则 有图片且允许图片输出? → 当作图片输出 → End
否则 → 拼一个 RetryPromptPart,回到模型让它重来

注意这条注释(_agent_graph.py:1243):只要有工具调用,就优先执行工具——即使模型同时回了文本。因为像 Anthropic 那样「先说一句『我去查一下』再调工具」很常见,那句文本不是最终答案。

1.6 工具执行:一轮里可能有好几个调用

_handle_tool_calls_agent_graph.py:1292)调 process_tool_calls_tool_execution.py:100)执行所有工具调用,收集 output_parts(工具返回)。然后:

  • 如果产生了最终结果(某个 output 工具命中)→ 走 End_agent_graph.py:1338)。
  • 否则把所有工具返回拼成新的 ModelRequest,回到 ModelRequestNode_agent_graph.py:1346)——这就是「再问一轮」。

一轮里多个工具谁先谁后、谁能当最终答案、ModelRetry 怎么压制 output——这套裁决规则是 end_strategy,单独在 03 章讲。

巧妙之处:部分结果也不丢。 工具执行中途若被打断(异常/取消),已经跑完的工具返回会被打包成一条 state='interrupted'ModelRequest 塞进历史(_agent_graph.py:1326-1335),这样 capture_run_messages 能看到「断点前做到哪」,支撑可恢复运行。

1.7 谁来驱动这张图

用户调 agent.run() 时,内部其实是 agent.iter() 建好图后,由 AgentRun 逐节点推进。

  • AgentRun.nextrun.py:328)—— 推进一步并触发 capability 节点钩子before_node_run / wrap_node_run / after_node_run)。agent.run() 走这条。
  • AgentRun.__anext__run.py:201)—— 裸推进,不触发节点钩子。async for node in agent_run 走这条。

这俩的区别有个坑:裸迭代不会 drain 某些挂起消息,所以遇到带 enqueue 的场景会抛 UndrainedPendingMessagesErrorrun.py:224)提醒你改用 next()

# 示意,非源码:iter 让你逐节点观察图的流转
async with agent.iter('法国首都是哪?') as agent_run:
async for node in agent_run: # 依次拿到 UserPromptNode/ModelRequestNode/CallToolsNode/End
print(type(node).__name__)
print(agent_run.result.output) # 'Paris'

1.8 代码地图

主题文件关键符号
图组装pydantic_ai_slim/pydantic_ai/_agent_graph.pybuild_agent_graph
跨节点状态pydantic_ai_slim/pydantic_ai/_agent_graph.pyGraphAgentState, GraphAgentDeps
节点①pydantic_ai_slim/pydantic_ai/_agent_graph.pyUserPromptNode
节点②pydantic_ai_slim/pydantic_ai/_agent_graph.pyModelRequestNode, _make_request, _prepare_request
节点③pydantic_ai_slim/pydantic_ai/_agent_graph.pyCallToolsNode, _run_stream, _handle_tool_calls
节点契约pydantic_graph/pydantic_graph/basenode.pyBaseNode, End, GraphRunContext
驱动循环pydantic_ai_slim/pydantic_ai/run.pyAgentRun.next, AgentRun.__anext__
重试异常pydantic_ai_slim/pydantic_ai/exceptions.pyModelRetry, UnexpectedModelBehavior