04 · Capability 中间件与 Model 抽象
本章讲两块「横切」基础设施:① capability——一套把功能插进运行各阶段的中间件钩子链;② Model——把 30+ 家 provider 抹成一个统一接口。理解它们,就懂了为什么这库既能可观测又能换模型还能做审批,却没把核心代码搞乱。
4.1 Capability:中间件,不是插件
它要解决的小问题: 可观测(OTel)、HITL 审批、缓存、限流、持久化执行……这些都想「在运行的某个阶段插一脚」。如果每个都改核心代码,核心会被改烂。
思路: 做成 中间件链(middleware)。每个 capability 实现一组钩子,框架在运行各阶段调它们;多个 capability 用 CombinedCapability 串成一条链,像洋葱一样层层包裹。
钩子定义在 AbstractCapability(capabilities/abstract.py),分两类:
① 贡献型(往运行里加东西):
| 钩子 | 贡献什么 |
|---|---|
get_instructions | 额外指令 |
get_toolset | 额外工具 |
get_native_tools | 原生工具(如 web search) |
get_model_settings | 模型设置 |
② 拦截型(包裹某个阶段): 每个阶段都有 before_ / after_ / wrap_ / on_*_error 四件套。阶段覆盖:
run —— 整个运行 (wrap_run / before_run / after_run / on_run_error)
node_run —— 每个图节点 (wrap_node_run / before_node_run / ...)
model_request —— 每次模型调用 (wrap_model_request / before_model_request / ...)
tool_validate —— 工具参数校验 (wrap_tool_validate / ...)
tool_execute —— 工具执行 (wrap_tool_execute / before_tool_execute / ...)
output_validate/process —— 输出 (wrap_output_validate / wrap_output_process / ...)
(钩子签名见 capabilities/abstract.py:363-856。)
4.2 wrap_ 钩子怎么用:洋葱模型
wrap_* 钩子拿到一个 handler(下一层/真实操作),自己决定调不调、调前调后做什么。回顾 01 章 §1.4 模型调用那段:
# _agent_graph.py:847 —— 模型调用穿过 capability 链(简化 展示)
model_response = await ctx.deps.root_capability.wrap_model_request(
run_context, request_context=request_context, handler=model_handler,
)
root_capability 是整条链的入口。一个缓存 capability 可以这样写:
# 示意,非源码:用 wrap_model_request 实现缓存
async def wrap_model_request(self, ctx, request_context, handler):
key = make_key(request_context.messages)
if key in cache:
return cache[key] # 命中:不调真实模型
resp = await handler(request_context) # 未命中:调下一层(最终是真模型)
cache[key] = resp
return resp
两个特殊出口(异常即控制流):
- 在
before/wrap_model_request里抛SkipModelRequest(response)(exceptions.py:116)→ 跳过真实模型,直接用你给的响应。 - 在工具钩子里抛
SkipToolValidation/SkipToolExecution(exceptions.py:133,146)→ 跳过校验/执行,用你给的值。
4.3 内建 capability:默认就在链上的几层
Agent.iter 组装链时(agent/__init__.py:1240-1270),会自动把一些 capability 加进去:
- Instrumentation(OTel)被放到最外层,这样它的 span 包住一切(
agent/__init__.py:1256)——除非你已经自己加了。 - DeferredCapabilityLoader 在存在延迟加载 capability 时注入(
agent/__init__.py:1266),且有去重保护(注释提到 issue #5047 的双重包裹 bug)。
CombinedCapability 会自动 flatten 嵌套输入(agent/__init__.py:1255 注释),让各 capability 作为兄弟参与排序——排序由 get_ordering(abstract.py:232)控制。
直觉: 持久化执行(durable_exec/temporal 等)、UI 事件流(ui/)本质上也是「包在外面的一层 capability / 适配」。本组文档不深入它们的内部,只点出它们站在同一套钩子机制上。
4.4 wrap_run 的协作式交接(一处精巧实现)
wrap_run 要「包住整个运行」,但运行又是用户用 async for node in agent_run 逐节点驱动的——这俩怎么协调?agent/__init__.py:1424-1518 用了一套协作交接协议:
1. _do_run() 调 before_run,设 _run_ready,然后 await _run_done
2. wrap_run 经中间件链包住 _do_run
3. 主协程等 _run_ready(handler 已就位)或 _wrap_task 直接完成(短路)
4. 把 agent_run 交给调用方迭代
5. 调用方迭代完/出错 → 设 _run_done
6. _do_run 恢复:返回结果,或重新抛原始错误
7. wrap_run 若捕获了错误并返回恢复结果 → 用它;否则原错误继续抛
这套两个 asyncio.Event 来回握手的写法,是为了让「中间件包裹」和「外部逐步迭代」两种控制流共存,同时保证 wrap_run 的清理逻辑一定执行(cancel 时 cancel_and_drain,agent/__init__.py:1469)。wrap_model_request 的流式路径也是同款交接(_agent_graph.py:650-708)。
4.5 Model:provider 的统一抽象
所有模型 provider 实现同一个抽象基类 Model(models/__init__.py:192)。核心两个方法:
# models/__init__.py:240 —— 模型契约(简化展示)
class Model(ABC, Generic[InterfaceClient]):
@abstractmethod
async def request(self, messages, model_settings, model_request_parameters) -> ModelResponse:
... # 非流式:发请求拿一个完整响应
async def request_stream(self, messages, ...) -> AsyncGenerator[StreamedResponse]:
... # 流式:可选实现
request(models/__init__.py:241)是必须实现的——ModelRequestNode._make_request最终调的就是它(_agent_graph.py:832)。request_stream(models/__init__.py:278)和count_tokens(:253)、compact_messages(:263)是可选的,不实现就抛 NotImplementedError。StreamedResponse(models/__init__.py:662)是流式响应的抽象,把 provider 的增量 chunk 统一成框架的事件。
models/ 下有 anthropic.py / openai.py / google.py / bedrock.py 等具体实现(顶层共二十来个文件),providers/ 下有 30 个 provider 配置。本组文档不逐个展开 provider 内部。
4.6 ModelProfile:吸收模型差异
不同模型对 JSON schema、工具、结构化输出的支持各不相同。这些差异由 profiles/ 下的 ModelProfile 吸收。Model.customize_request_parameters(models/__init__.py:292)会用 profile 里的 json_schema_transformer 改写工具 schema,以适配某个 provider 的怪癖。
这就是为什么 03 章说的「同一份 output_type 代码跨模型自动选落地方式」能成立——default_structured_output_mode 就存在 profile 里。
4.7 特殊 Model:组合而非继承
几个「model」其实是包装别的 model 的装饰器:
FallbackModel(models/fallback.py) —— 主模型失败时自动切下一个。InstrumentedModel(models/instrumented.py)—— 给请求加 OTel 追踪。WrapperModel(models/wrapper.py)—— 包装基类。ConcurrencyLimitedModel(models/concurrency.py)—— 限并发。TestModel/FunctionModel(models/test.py、function.py)—— 测试用,不真调网络。
和 toolset 一样是包装器模式:加能力 = 套一层,不改核心。
4.8 边界
- 流式 / token 计数 / 消息压缩不是所有 provider 都支持,调用未实现的会抛 NotImplementedError(
models/__init__.py:261,287)。 - capability 链的排序若多个都想当「最外层」,靠
get_ordering协调;写错顺序可能导致 span 包不全等问题。 - 持久化执行、UI 协议的内部细节本组文档未深入(仅在边界层描述)。
4.9 代码地图
| 主题 | 文件 | 关键符号 |
|---|---|---|
| 中间件钩子 | pydantic_ai_slim/pydantic_ai/capabilities/abstract.py | AbstractCapability, wrap_model_request, wrap_run, before_tool_execute, get_ordering |
| 链组装 | pydantic_ai_slim/pydantic_ai/agent/__init__.py | Agent.iter(capability 合并段 ~1240-1270) |
| 跳过类异常 | pydantic_ai_slim/pydantic_ai/exceptions.py | SkipModelRequest, SkipToolValidation, SkipToolExecution |
| 模型抽象 | pydantic_ai_slim/pydantic_ai/models/__init__.py | Model, Model.request, Model.request_stream, StreamedResponse, customize_request_parameters |
| 模型 profile | pydantic_ai_slim/pydantic_ai/profiles/ | ModelProfile |
| 装饰器 model | pydantic_ai_slim/pydantic_ai/models/ | FallbackModel, InstrumentedModel, ConcurrencyLimitedModel |