跳到主要内容

01 · reasoning-acting 主循环

这一章把 AgentScope 的主线讲透:你喊一句 agent.reply_stream(msg),它内部到底怎么转。理解了这一章,后面的事件流、权限、中间件都只是挂在这根主轴上的零件。

1.1 它要解决的小问题

模型一次只能做一件事:要么吐文本,要么说「我要调工具 X 参数 Y」。但一个真实任务往往要调好几次工具、看结果再决定下一步(查文件 → 读到内容 → 改文件 → 跑测试 → 看报错 → 再改)。需要一个循环替模型把这些往返跑完。这就是 reasoning-acting(ReAct)循环。

1.2 思路/直觉:下一步做什么,是「推导」出来的

AgentScope 没把循环写成「第一步调模型、第二步执行工具」的流水账。它的循环顶部先问一句:「看看上一条助手消息,里面有没有还没出结果的工具调用?它们处于什么状态?」 答案决定下一步:

上一条消息的情况下一步动作
有「待执行」的工具调用(state=pending/allowed)acting:去执行它们
有「在等用户确认 / 等外部执行结果」的工具调用exit:循环退出,把控制权交出去等外部事件
没有任何未完成的工具调用reasoning:调模型想下一步

这个判定就是 _check_next_action(agent/_agent.py:2297)。把它做成「读状态 → 推导动作」而非硬编码步骤,是 human-in-the-loop 能干净暂停/续跑的根本原因(详见 03 章)。

1.3 图示:一次 reply 的控制流

reply_stream(inputs)


_reply_impl: agent/_agent.py:569

├─ 是「确认/外部结果」事件? ─是─▶ 处理事件、把结果写回上下文
│ └否─▶ 把新消息存进上下文 + 发 ReplyStartEvent


while cur_iter < max_iters: agent/_agent.py:622

├─① action,_ = _check_next_action() agent/_agent.py:626
│ ├─ "exit" ─▶ yield 最终 Msg, return
│ ├─ "reasoning" ─▶ ② 先压缩上下文 → _reasoning()
│ │ └─ 模型没要工具? ─▶ yield ReplyEndEvent + Msg, return
│ └─ "acting" ─▶ (落到 ③)

├─③ for batch in await _batch_tool_calls(): agent/_agent.py:655
│ ├─ sequential ─▶ _execute_sequential_tool_calls
│ └─ concurrent ─▶ _execute_concurrent_tool_calls
│ 若某工具要「用户确认/外部执行」─▶ yield 占位 Msg, return(暂停)

└─ cur_iter += 1 ─▶ 回到 while 顶

▼ (跳出 while = 撞上 max_iters)
yield ExceedMaxItersEvent + ReplyEndEvent + 兜底 Msg agent/_agent.py:702

1.4 主线逐段拆解

第一步:入口与「是新对话还是续上一次暂停」

reply_stream(agent/_agent.py:191)是公开入口,它只是把 _reply 产出的事件转发出去,并过滤掉 Msg(Msg 是给 reply() 这种「只要最终结果」的调用方用的内部信号)。真正的分发逻辑在 _reply_impl(agent/_agent.py:569):

  • 如果传进来的是 UserConfirmResultEvent / ExternalExecutionResultEvent,说明这是上一次暂停后的续跑——先 _check_incoming_event 校验确实在等这个事件,再 _handle_incoming_event 把确认/结果落进上下文。
  • 否则是新一轮 reply:_handle_incoming_messages 把新消息追加进 state.context,生成新 reply_id,把 cur_iter 归零,发出 ReplyStartEvent

重点看:一个 reply 对应一个 reply_id,而且后续产出的最终 AssistantMsg 也用这个 id(agent/_agent.py:609)。流式事件全都带 reply_id,前端据此把碎片归到同一次回复。

第二步:reasoning —— 调模型,把响应翻译成事件

进入 _reasoning_impl(agent/_agent.py:782)后:

  1. ModelCallStartEvent
  2. _prepare_model_input(agent/_agent.py:2035)拼输入:系统提示 + 压缩摘要(若有)+ 对话上下文 + 工具 schema。
  3. _call_model(agent/_agent.py:2065)真正调模型——这里带重试 + 备用模型回退(下一节细讲)。
  4. 模型返回可能是流式(async generator)或整块。流式时逐 chunk 调 _convert_chat_response_to_event 翻译成事件(见 02 章),并把最后一个带 is_last 的 chunk 留作完整响应。
  5. _save_to_context 把模型产出的内容块写回上下文。
  6. 关键分叉:如果完整响应里没有任何 ToolCallBlock,说明模型给出了最终答复——yield 一个 AssistantMsg(agent/_agent.py:902)。这个 Msg 冒泡回 _reply_impl 时触发 ReplyEndEventreturn,循环结束。

这段教学伪代码演示「为什么没工具调用就结束」:

# 示意,非源码:reasoning 收尾的核心判断
if not any(isinstance(b, ToolCallBlock) for b in response.content):
# 模型这轮只吐了文本/思考,没要调工具 → 这就是最终答复
yield AssistantMsg(id=reply_id, name=self.name, content=response.content)
# 否则:有工具调用,什么都不 yield,让外层循环进入 acting

第三步:模型调用的健壮性 —— 重试 + 回退

_call_model(agent/_agent.py:2065)把容错做在两层:

  • 同一模型重试:model_config.max_retries(默认 0)次额外尝试。
  • 回退到备用模型:主模型用尽重试仍失败,且配了 model_config.fallback_model,就换备用模型再来一遍(agent/_agent.py:2091)。

注意这层 max_retries 默认是 0,因为 ChatModelBase 自己内部还有一层重试循环(model/_base.py:184),框架不想让两层重试相乘放大(agent/_config.py:152 的注释明说了)。

第四步:acting —— 把工具调用分批,顺序 or 并发

模型要调工具时,_batch_tool_calls(agent/_agent.py:1130)先把这一批工具调用分组:

  • 工具标了 is_concurrency_safe(或查不到该工具)→ 归入 concurrent 批,可以一起跑。
  • 否则(有副作用、非并发安全)→ 归入 sequential 批,必须一个个来。

相邻同类型的会合并进同一批,于是得到一串「并发批 / 顺序批」交替的列表,按序执行。

并发执行(_execute_concurrent_tool_calls,agent/_agent.py:1216)有个值得学的细节:它用一个 asyncio.Queue + 哨兵对象收集所有 worker 的事件。asyncio.gather(..., return_exceptions=True) 保证一个工具失败不会取消其他工具,全部跑完后把异常打包成 ExceptionGroup 一起抛。哨兵在 gather 返回之后才入队,确保「事件全部入队」先于「生成器结束」——不会漏事件。

# 示意,非源码:并发执行的「哨兵保证事件不丢」骨架
async def _run_all():
results = await asyncio.gather(*[into_queue(tc) for tc in calls],
return_exceptions=True) # 失败不互相取消
await queue.put(SENTINEL) # 哨兵在 gather 之后入队 = 所有事件已入队
return results

while (evt := await queue.get()) is not SENTINEL:
yield evt # 排空队列,直到遇到哨兵
# 此时所有 worker 已结束,再统一收集并 re-raise 异常

第五步:单个工具调用的一生(_execute_tool_call)

_execute_tool_call(agent/_agent.py:1321)是工具调用的完整生命周期,严格分五步(代码里用注释标了 Step 1–5):

  1. 校验输入:工具是否可用、能否按 schema 解析参数、jsonschema 校验。任一步失败 → 直接把错误信息当工具结果喂回模型(_handle_error_tool_call),不抛给开发者。
  2. 权限闸:调 PermissionEngine.check_permission(若已被用户确认过则直接放行)。详见 03 章
  3. 按决定处理:ASK → 发 RequireUserConfirmEvent 暂停;DENY → 回错误结果;ALLOW → 继续。
  4. 委托给 _acting 做真正执行(这是 on_acting 中间件的钩子点)。
  5. 截断超长结果:_split_tool_result_for_compression 把超过 tool_result_limit 的结果截断,溢出部分 offload(见 04 章)。

这里有个安全设计:权限检查、上下文写入都在 _acting 之外(由 _execute_tool_call 前后负责)。_acting(agent/_agent.py:1574)只包 toolkit.call_tool 这一层纯 I/O。这样把 _actingnext_handler 丢到后台任务也不会偷偷改上下文(middleware/_base.py:114 的注释解释了这个边界)。

1.5 关键细节 / 坑

  • 撞上 max_iters 不是静默挂掉。 循环跑满 react_config.max_iters(默认 20)还没结束,会 yield ExceedMaxItersEvent + ReplyEndEvent + 一条兜底 AssistantMsg(agent/_agent.py:702)。代码注释明说这是为了「镜像正常退出路径,免得等终止事件的 SSE 客户端挂死」。
  • 外部工具(is_external_tool)的特殊路径。 这类工具框架不自己执行,而是发 RequireExternalExecutionEvent、把状态置为 submitted、暂停,等外部把 ExternalExecutionResultEvent 送回来。
  • 并发批里一旦有工具要确认/外部执行,整个执行就 break。 _reply_impl 里检测到 RequireUserConfirmEvent/RequireExternalExecutionEvent 就停掉后续批次、yield 占位 Msg、return(agent/_agent.py:685)。

1.6 小结

主循环的灵魂是 _check_next_action 的「读状态推导动作」+ _reply_impl 的 while 循环。记住这条主轴,你就能把后面三章(事件、权限、中间件)都挂上去理解。

→ 下一章:02 · 事件流协议