跳到主要内容

1. 核心循环:一次 run() 怎么走

这章讲框架的“中央循环”。看懂它,后面的 schema / 提示 / 历史 / 工具都只是这个循环的零件。

1.1 它要解决的小问题

你想要的语义很简单:“给我一个输入对象,还我一个符合输出 schema 的对象。” 难点在于这背后要协调四样东西——系统提示、多轮历史、本次输入、输出结构约束——并把它们正确地喂给底层 LLM 客户端。AtomicAgent 就是这个协调者。

1.2 思路:组装,而非实现

AtomicAgent 自己不实现结构化输出。它持有一个 instructor 客户端,把真正的“按 schema 输出 + 校验 + 重试”全部委托出去。它自己只负责三步组装:

  1. 把背景/历史/输入组织成 messages 列表
  2. output_schema 作为 response_model 传下去。
  3. 把模型回的强类型对象写回历史,以便下一轮。

1.3 同步 run() 的真实实现

核心就这么短。run 方法见 agents/atomic_agent.py:551-583:

# 摘自 AtomicAgent.run,agents/atomic_agent.py:565-583(节选)
self._trim_context() # 1. 先按 token 预算裁老历史(保护新输入)
if user_input:
self.history.initialize_turn() # 2. 开一个新 turn(生成 uuid)
self.current_user_input = user_input
self.history.add_message("user", user_input)
self._prepare_messages() # 3. system + history → self.messages
response = self.client.chat.completions.create(
messages=self.messages, model=self.model,
response_model=self.output_schema, # 4. 关键:要 Instructor 按这个 schema 回
**self._get_completion_kwargs(),
)
self.history.add_message(self.assistant_role, response) # 5. 写回历史

几个值得停下来看的点:

  • 先裁剪再加输入。 _trim_context() 在加 user_input 之前调用,注释明说“protect the new input”——绝不会因为裁剪把刚来的问题裁掉(agents/atomic_agent.py:565)。
  • response_model=self.output_schema 是整个框架的命门。output_schema 不是手写的,而是从 generic 类型参数自动取出来的(见 02-schema-and-chaining.md)。
  • _get_completion_kwargs() 默认塞了 strict=None(agents/atomic_agent.py:367-378)。这是一个有意的覆盖:Instructor 默认 strict=True 会强制 enum 字段必须是 enum 实例,框架改成 None 让 Pydantic 自己的字符串强转生效,除非你显式覆盖。这是个容易被忽视但很贴心的默认。

1.4 messages 是怎么拼出来的

两步:_build_system_messages() 出系统提示,_prepare_messages() 把它和历史拼一起。

# _build_system_messages,agents/atomic_agent.py:284-291(节选)
if self.system_role is None:
return [] # system_role=None ⇒ 干脆不发系统提示
return [{"role": self.system_role,
"content": self.system_prompt_generator.generate_prompt()}]
# _prepare_messages,agents/atomic_agent.py:349-365(节选,去掉日志)
self.messages = self._build_system_messages()
history = self.history.get_history()
if self.tool_result_role != "system": # 后端不支持“对话中途的 system 消息”时(如 Gemini)
for msg in history:
if msg["role"] == "system":
msg["role"] = self.tool_result_role # 把中途 system 改写成 user
self.messages += history

这里藏着一个跨厂商兼容的细节:Gemini 会丢弃“对话中途的 system 角色消息”。框架用 tool_result_role 区分“初始系统提示”(单独由 _build_system_messages 发,不受影响)和“中途注入的 system 内容”(被改写成 user)。tool_result_role 的自动判定见 agents/atomic_agent.py:196-201:当 assistant_role == "model"(即 Gemini)时默认 "user",否则 "system"

1.5 四个入口:同步 / 异步 / 流式 / 异步流式

框架提供四个对称的运行入口,组装逻辑完全一致,只在“怎么调底层 + 怎么收结果”上不同:

方法客户端返回关键调用
run同步 Instructor一个 OutputSchemacompletions.create(...)
run_stream同步Generator,逐步 yield 部分对象completions.create_partial(..., stream=True)
run_asyncAsyncInstructorawait 一个对象await completions.create(...)
run_async_streamAsyncInstructorAsyncGeneratorcreate_partial(stream=True)

每个入口开头都有一句断言保证客户端类型匹配,例如 runagents/atomic_agent.py:561-563 断言客户端不是 AsyncInstructor,否则提示你改用 run_async。这是“早失败、给清楚错误”的小防呆。

流式入口的精妙处在收尾:它一路 yield 部分对象(partial),循环结束后用最后一个 partial 重建一个完整 schema 实例再写回历史,见 run_streamagents/atomic_agent.py:620-628:

# run_stream 收尾,agents/atomic_agent.py:620-628(节选)
for partial_response in response_stream:
last_response = partial_response
yield partial_response # 流式:把每个不完整对象交给调用方
if last_response:
full = self.output_schema(**last_response.model_dump()) # 用最后一帧重建完整对象
self.history.add_message(self.assistant_role, full) # 只把完整对象写回历史

为什么流式可行? 框架在模块加载时猴补丁(monkey-patch)了 Instructor 的部分解析逻辑——model_from_chunks_patched 把流式 JSON 片段用 jiter.from_json(..., partial_mode="trailing-strings") 增量解析成部分模型,见 agents/atomic_agent.py:21-42。这让“边收边解析未闭合的 JSON”成为可能。

1.6 关键细节 / 坑

  • 不带 user_inputrun() 是合法的。 当你已经手动往历史里塞了内容(比如工具结果),想让模型“接着说”,可以 run() 不传参——它会跳过加 user 消息,直接基于现有历史出一轮(agents/atomic_agent.py:568)。
  • 每轮结束会再调一次 _prepare_messages() run 末尾 agents/atomic_agent.py:581 又拼了一次 messages——把刚写回的 assistant 回复也纳入,方便你事后检查 agent.messages 拿到完整快照。
  • 历史写回用的是 assistant_role,不是硬编码 "assistant" Gemini 要求叫 "model",所以这个角色是可配的(AgentConfig.assistant_role,agents/atomic_agent.py:80-83)。

下一章:这套循环的“接口签名”——schema 是怎么定义和被取出的。