跳到主要内容

04 · Capability 中间件与 Model 抽象

本章讲两块「横切」基础设施:① capability——一套把功能插进运行各阶段的中间件钩子链;② Model——把 30+ 家 provider 抹成一个统一接口。理解它们,就懂了为什么这库既能可观测又能换模型还能做审批,却没把核心代码搞乱。

4.1 Capability:中间件,不是插件

它要解决的小问题: 可观测(OTel)、HITL 审批、缓存、限流、持久化执行……这些都想「在运行的某个阶段插一脚」。如果每个都改核心代码,核心会被改烂。

思路: 做成 中间件链(middleware)。每个 capability 实现一组钩子,框架在运行各阶段调它们;多个 capability 用 CombinedCapability 串成一条链,像洋葱一样层层包裹。

钩子定义在 AbstractCapabilitycapabilities/abstract.py),分两类:

① 贡献型(往运行里加东西):

钩子贡献什么
get_instructions额外指令
get_toolset额外工具
get_native_tools原生工具(如 web search)
get_model_settings模型设置

② 拦截型(包裹某个阶段): 每个阶段都有 before_ / after_ / wrap_ / on_*_error 四件套。阶段覆盖:

run —— 整个运行 (wrap_run / before_run / after_run / on_run_error)
node_run —— 每个图节点 (wrap_node_run / before_node_run / ...)
model_request —— 每次模型调用 (wrap_model_request / before_model_request / ...)
tool_validate —— 工具参数校验 (wrap_tool_validate / ...)
tool_execute —— 工具执行 (wrap_tool_execute / before_tool_execute / ...)
output_validate/process —— 输出 (wrap_output_validate / wrap_output_process / ...)

(钩子签名见 capabilities/abstract.py:363-856。)

4.2 wrap_ 钩子怎么用:洋葱模型

wrap_* 钩子拿到一个 handler(下一层/真实操作),自己决定调不调、调前调后做什么。回顾 01 章 §1.4 模型调用那段:

# _agent_graph.py:847 —— 模型调用穿过 capability 链(简化展示)
model_response = await ctx.deps.root_capability.wrap_model_request(
run_context, request_context=request_context, handler=model_handler,
)

root_capability 是整条链的入口。一个缓存 capability 可以这样写:

# 示意,非源码:用 wrap_model_request 实现缓存
async def wrap_model_request(self, ctx, request_context, handler):
key = make_key(request_context.messages)
if key in cache:
return cache[key] # 命中:不调真实模型
resp = await handler(request_context) # 未命中:调下一层(最终是真模型)
cache[key] = resp
return resp

两个特殊出口(异常即控制流):

  • before/wrap_model_request 里抛 SkipModelRequest(response)exceptions.py:116)→ 跳过真实模型,直接用你给的响应。
  • 在工具钩子里抛 SkipToolValidation / SkipToolExecutionexceptions.py:133,146)→ 跳过校验/执行,用你给的值。

4.3 内建 capability:默认就在链上的几层

Agent.iter 组装链时(agent/__init__.py:1240-1270),会自动把一些 capability 加进去:

  • Instrumentation(OTel)被放到最外层,这样它的 span 包住一切(agent/__init__.py:1256)——除非你已经自己加了。
  • DeferredCapabilityLoader 在存在延迟加载 capability 时注入(agent/__init__.py:1266),且有去重保护(注释提到 issue #5047 的双重包裹 bug)。

CombinedCapability 会自动 flatten 嵌套输入(agent/__init__.py:1255 注释),让各 capability 作为兄弟参与排序——排序由 get_orderingabstract.py:232)控制。

直觉: 持久化执行(durable_exec/temporal 等)、UI 事件流(ui/)本质上也是「包在外面的一层 capability / 适配」。本组文档不深入它们的内部,只点出它们站在同一套钩子机制上。

4.4 wrap_run 的协作式交接(一处精巧实现)

wrap_run 要「包住整个运行」,但运行又是用户用 async for node in agent_run 逐节点驱动的——这俩怎么协调?agent/__init__.py:1424-1518 用了一套协作交接协议

1. _do_run() 调 before_run,设 _run_ready,然后 await _run_done
2. wrap_run 经中间件链包住 _do_run
3. 主协程等 _run_ready(handler 已就位)或 _wrap_task 直接完成(短路)
4. 把 agent_run 交给调用方迭代
5. 调用方迭代完/出错 → 设 _run_done
6. _do_run 恢复:返回结果,或重新抛原始错误
7. wrap_run 若捕获了错误并返回恢复结果 → 用它;否则原错误继续抛

这套两个 asyncio.Event 来回握手的写法,是为了让「中间件包裹」和「外部逐步迭代」两种控制流共存,同时保证 wrap_run 的清理逻辑一定执行(cancel 时 cancel_and_drainagent/__init__.py:1469)。wrap_model_request 的流式路径也是同款交接(_agent_graph.py:650-708)。

4.5 Model:provider 的统一抽象

所有模型 provider 实现同一个抽象基类 Modelmodels/__init__.py:192)。核心两个方法:

# models/__init__.py:240 —— 模型契约(简化展示)
class Model(ABC, Generic[InterfaceClient]):
@abstractmethod
async def request(self, messages, model_settings, model_request_parameters) -> ModelResponse:
... # 非流式:发请求拿一个完整响应

async def request_stream(self, messages, ...) -> AsyncGenerator[StreamedResponse]:
... # 流式:可选实现
  • requestmodels/__init__.py:241)是必须实现的——ModelRequestNode._make_request 最终调的就是它(_agent_graph.py:832)。
  • request_streammodels/__init__.py:278)和 count_tokens:253)、compact_messages:263)是可选的,不实现就抛 NotImplementedError。
  • StreamedResponsemodels/__init__.py:662)是流式响应的抽象,把 provider 的增量 chunk 统一成框架的事件。

models/ 下有 anthropic.py / openai.py / google.py / bedrock.py 等具体实现(顶层共二十来个文件),providers/ 下有 30 个 provider 配置。本组文档不逐个展开 provider 内部。

4.6 ModelProfile:吸收模型差异

不同模型对 JSON schema、工具、结构化输出的支持各不相同。这些差异由 profiles/ 下的 ModelProfile 吸收。Model.customize_request_parametersmodels/__init__.py:292)会用 profile 里的 json_schema_transformer 改写工具 schema,以适配某个 provider 的怪癖。

这就是为什么 03 章说的「同一份 output_type 代码跨模型自动选落地方式」能成立——default_structured_output_mode 就存在 profile 里。

4.7 特殊 Model:组合而非继承

几个「model」其实是包装别的 model 的装饰器:

  • FallbackModelmodels/fallback.py)—— 主模型失败时自动切下一个。
  • InstrumentedModelmodels/instrumented.py)—— 给请求加 OTel 追踪。
  • WrapperModelmodels/wrapper.py)—— 包装基类。
  • ConcurrencyLimitedModelmodels/concurrency.py)—— 限并发。
  • TestModel / FunctionModelmodels/test.pyfunction.py)—— 测试用,不真调网络。

和 toolset 一样是包装器模式:加能力 = 套一层,不改核心。

4.8 边界

  • 流式 / token 计数 / 消息压缩不是所有 provider 都支持,调用未实现的会抛 NotImplementedError(models/__init__.py:261,287)。
  • capability 链的排序若多个都想当「最外层」,靠 get_ordering 协调;写错顺序可能导致 span 包不全等问题。
  • 持久化执行、UI 协议的内部细节本组文档未深入(仅在边界层描述)。

4.9 代码地图

主题文件关键符号
中间件钩子pydantic_ai_slim/pydantic_ai/capabilities/abstract.pyAbstractCapability, wrap_model_request, wrap_run, before_tool_execute, get_ordering
链组装pydantic_ai_slim/pydantic_ai/agent/__init__.pyAgent.iter(capability 合并段 ~1240-1270)
跳过类异常pydantic_ai_slim/pydantic_ai/exceptions.pySkipModelRequest, SkipToolValidation, SkipToolExecution
模型抽象pydantic_ai_slim/pydantic_ai/models/__init__.pyModel, Model.request, Model.request_stream, StreamedResponse, customize_request_parameters
模型 profilepydantic_ai_slim/pydantic_ai/profiles/ModelProfile
装饰器 modelpydantic_ai_slim/pydantic_ai/models/FallbackModel, InstrumentedModel, ConcurrencyLimitedModel