跳到主要内容

主线引擎:AgentLoop + AgentRunner

这一章讲 nanobot 那条「小核心」到底怎么转一圈。先看顶层的 AgentLoop(管一整轮对话的事务),再钻进 AgentRunner(管和模型来回的内循环)。这是整个项目最值得读透的部分。

1. 为什么要拆成两层?

一轮对话其实有两种完全不同的关切混在一起:

  • 面向渠道的事务:这条消息属于哪个会话?工作区在哪?历史要不要先压缩?是不是斜杠命令?跑完怎么存盘、怎么把答复发回去?
  • 面向模型的事务:把消息发给哪个 provider?模型要调工具吗?工具结果怎么喂回去?什么时候算「答完了」?撞上迭代上限怎么收尾?

nanobot 把前者放进 AgentLoop,后者放进 AgentRunner调试时这条线很有用:渠道路由/会话键/存盘的问题去 loop.py;模型调用/工具/流式/迭代上限的问题去 runner.py(docs/architecture.md:36-54)。

2. AgentLoop:一整轮 = 一个 7 态状态机

2.1 它要解决的小问题

一轮对话有很多步骤,而且任何一步都可能抛异常或需要中断。如果写成一长串顺序代码,容错和「断点续跑」会很难。nanobot 把它建模成一个显式状态机:每个状态是一个 async 处理函数,返回一个事件字符串,驱动器查表决定下一个状态。

2.2 状态与转移

七个状态(外加终态 DONE)定义在 TurnState(loop.py:82-90):

RESTORE → COMPACT → COMMAND →(dispatch)→ BUILD → RUN → SAVE → RESPOND → DONE
└─(shortcut)──────────────────────────────────→ DONE

各状态干什么:

状态处理函数职责
RESTORE_state_restore载入会话、恢复运行时检查点/未完成的用户轮
COMPACT_state_compact必要时按 token 预算压缩历史(见 04 章)
COMMAND_state_command是斜杠命令就内联处理并走 shortcut 提前结束
BUILD_state_build取历史、拼初始消息、提前持久化用户消息
RUN_state_run_run_agent_loopAgentRunner
SAVE_state_save算时延、存这一轮、后台触发压缩
RESPOND_state_respond组装 OutboundMessage

转移用一张纯数据表 _TRANSITIONS 描述,驱动器只是查表(loop.py:175-184):

# 示意,非源码:状态机驱动的核心就这么点
while ctx.state is not TurnState.DONE:
handler = getattr(self, f"_state_{ctx.state.name.lower()}")
event = await handler(ctx) # 处理函数返回一个事件字符串
ctx.state = self._TRANSITIONS[(ctx.state, event)] # 查表 → 下一态
# 重点看:状态、转移、处理函数三者解耦,加一步只改表 + 加一个 _state_xxx

真实驱动器在 loop.py:1297-1342(_process_message),还顺手给每个状态记了耗时 StateTraceEntry 做诊断。event 没有对应转移就直接抛 RuntimeError——宁可显式炸,也不静默走错

2.3 BUILD 这步:上下文怎么进来的

_state_build(loop.py:1450-1494)是「把世界塞进一次模型调用」的地方:取出受 token 预算约束的历史 get_history(...),再交给 _build_initial_messagesContextBuilder(系统提示 = 身份 + 记忆 + 技能 + 近期历史,见 02 章)。它还做了一件容易忽略但很重要的事:提前持久化用户消息 _persist_user_message_early——这样即使后面崩了,你这句话也不会丢。

2.4 主消费循环

状态机是「处理一条消息」。把消息源源不断喂进来的是 AgentLoop.run(loop.py:882):一个 while 循环不停 consume_inbound(),为每条消息起处理。这就是图里「总线 → AgentLoop」那根箭头的实体。

3. AgentRunner:面向模型的内循环

RUN 状态最终调到 AgentRunner.run_run_core。这是真正和大模型来回拉锯的地方。

3.1 思路:一个带上限的「问—用工具—再问」循环

核心直觉非常朴素:

# 示意,非源码:agent 内循环的灵魂
for iteration in range(max_iterations):
resp = await provider.chat(messages, tools) # 问模型
if resp 要调工具:
results = await 执行这些工具(resp.tool_calls)
messages += [助手消息(含 tool_calls), *每个工具的结果消息]
continue # 带着结果再问一遍
else:
return resp.content # 模型给了最终答复,收工
# 重点看:工具结果作为 role="tool" 消息追加回去,下一轮模型就能看到

真实实现是 _run_core(runner.py:341-706),围绕这个骨架加了大量容错,下面拆开讲。

3.2 何时算「该执行工具」

不是「模型给了 tool_calls 就执行」。LLMResponse.should_execute_tools(providers/base.py:172-178)要求 finish_reasontool_calls/function_call/stop 之一——refusal/content_filter/error 下,网关注入的工具调用会被拒绝执行(防 #3220 那类注入)。

3.3 判停与各种「坏情况」恢复

_run_core 的大半篇幅在处理模型不老实的情况。统一看一张表(都在 runner.py 的主循环里):

坏情况表现处理
畸形工具调用tool_call.name 缺失/非字符串_drop_malformed_tool_calls 丢弃;全丢了就重试一次,再不行降级为无工具请求(runner.py:880-911)
空回复finish_reason != error 但内容空白最多重试 _MAX_EMPTY_RETRIES 次,再不行发一条「请给最终答复」催它(runner.py:533-563)
被截断finish_reason == "length"把已有内容存下,追加 length_recovery 消息让它续写,最多 _MAX_LENGTH_RECOVERIES 次(runner.py:565-584)
欠费provider 报 arrearage直接给出明确的「请充值」文案(runner.py:613-619)
撞上迭代上限for 跑完没 breakelse 分支,先尝试一次「无工具最终化」_try_finalize_after_max_iterations,失败再用模板兜底(runner.py:672-695)

这些常量集中在 runner.py:66-69,一眼能看出系统的「耐心」边界。

3.4 工具执行:分批与并发

工具不是无脑并行。_execute_tools_partition_tool_batches(runner.py:1435-1458)把一串工具调用切成「批」:只有 concurrency_safe(即 read_only 且非 exclusive)的工具能凑在同一批里 asyncio.gather 并发;写文件、跑 shell 这类会落地副作用的工具各自单独成批、顺序执行。这样既能并行只读查询、又不会让两个写操作打架。

3.5 mid-turn 注入:agent 干活时你又说话了

这是一个很实用的设计。agent 正在多轮调工具时,你突然又补了一句——框架不想丢掉它、也不想为它另起一轮。_try_drain_injections(runner.py:168-221)会在几个安全点(工具执行后、最终答复后、各种错误后)调 injection_callback 把新消息「排空」并注入到当前会话继续跑。

注入有节流:每轮最多 _MAX_INJECTIONS_PER_TURN=3 条、最多 _MAX_INJECTION_CYCLES=5 个注入周期(runner.py:67-69),防止你一直说话让这轮永不收尾。注入回调由 loop.py_drain_pending(loop.py:744-793)提供,它从一个 pending_queue 取消息;特别地,如果队列空但本轮派生的子 agent 还在跑,它会阻塞等子 agent 结果,好让多个子 agent 的完成被按序注入而不是各开一轮。

3.6 sustained goal:让一轮自己「续命」

当一个持续目标(/goal,见 long_task 工具)处于活跃态,即使模型这轮想停,_try_drain_injections(..., allow_goal_continue=True) 也会注入一条「你还有未完成目标,继续干或调用 complete_goal」的消息(runner.py:191-197loop.py:812-821),把循环续上。这就是「跨多轮持续目标」在运行时的落地点。

3.7 checkpoint:崩了能续

每到关键相位(awaiting_tools / tools_completed / final_response),Runner 通过 checkpoint_callback 回吐一个快照(runner.py:442-452 等),由 loop.py 写进会话的运行时检查点。配合 RESTORE 状态的 _restore_runtime_checkpoint,一轮跑到一半进程挂了,重启后能从检查点接着来,而不是重头跑或丢工具结果。

4. 巧妙之处

  • 状态机用数据表而非 if-else。 _TRANSITIONS 是 dict,加状态=加表项+加函数,转移逻辑零分支(loop.py:175-184)。
  • 提前持久化用户消息。 在调模型之前就把你的话存盘(loop.py:1485-1487),崩溃也不丢输入。
  • 注入与流式协同。 发现有注入时,流式 on_stream_end(resuming=True) 不会过早「封卡」,避免聊天 UI 把还没说完的消息提前定稿(runner.py:594-611)。
  • 并发只给只读工具。 concurrency_safe 的定义把「能并行」收得很紧,写操作天然串行(tools/base.py:166-174)。

5. 边界与局限

  • 迭代上限是硬墙。 超过 max_iterations 就强制收尾,复杂任务可能在中途被「最终化」截断——这是用可终止性换的(runner.py:672-695)。
  • 注入节流可能丢话。 单轮超过 3 条注入会被截断并记日志(runner.py:274-280),极端高频追问时后面的会留到下一轮。
  • wall-clock 超时。 非流式请求默认 300s 超时(NANOBOT_LLM_TIMEOUT_S,runner.py:744);流式则改用 idle 超时,避免长推理被误杀。

6. 代码地图

主题文件符号
状态枚举/转移表nanobot/agent/loop.pyTurnState_TRANSITIONS
状态机驱动nanobot/agent/loop.py_process_message
BUILD/RUN/SAVE 处理nanobot/agent/loop.py_state_build_state_run_state_save
进入 Runnernanobot/agent/loop.py_run_agent_loop_drain_pending_goal_continue
内循环主体nanobot/agent/runner.pyAgentRunner._run_core
判停/恢复常量nanobot/agent/runner.py_MAX_EMPTY_RETRIES_MAX_LENGTH_RECOVERIES_MAX_INJECTION_CYCLES
工具执行/分批nanobot/agent/runner.py_execute_tools_partition_tool_batches_run_tool
注入nanobot/agent/runner.py_try_drain_injections_drain_injections
是否执行工具nanobot/providers/base.pyLLMResponse.should_execute_tools