跳到主要内容

Mastra — Agent 的中央循环

本章讲主线:一次 agent.stream() 从入口到模型再到落库,中间发生了什么。读完你能讲清"agentic loop 到底循环的是什么"。

1. 这一章解决的问题

LLM 本身只会做一件事:给它一段文本,它吐一段文本。要让它用工具,你得自己:把工具描述拼进请求、解析它"想调哪个工具"、真的去执行、把结果再拼回去、然后再问一次模型——如此往复,直到它不再要调工具、给出最终答案。

这个"反复推回桌前"的过程就是 agentic loop。Mastra 的 Agent 类负责入口和准备工作,loop() 负责真正的循环。

2. 入口:Agent.stream() 做了哪些准备

stream() 在真正循环前,做了一连串"准备"。看签名末端的主体(packages/core/src/agent/agent.ts:7596 起,async stream<OUTPUT = TOutput>):

它依次做这些事(每步一句话):

  1. 合并选项。 把 agent 的默认选项和本次调用选项 deepMerge(agent.ts:7612 deepMerge(defaultOptions, ...))。
  2. 校验请求上下文。 #validateRequestContext(agent.ts:7607)。
  3. untilIdle 分流。 若开了 untilIdle,转给 runStreamUntilIdle(agent.ts:7626)——这是"流不关,等后台任务做完再继续"的模式(见 §6)。
  4. 权限校验。 #requireAgentExecutionFGA(agent.ts:7637)——细粒度授权,防止越权访问别人的记忆。
  5. 解析模型。 getLLM(...)(agent.ts:7644):把 'openai/gpt-4o' 这种字符串解析成真实 LanguageModel,并检查它是不是受支持的版本(v1 报错让你用 streamLegacy())。
  6. #execute 真正的执行在私有方法 #execute(agent.ts:6317)里。

关键直觉: stream() 本身不循环,它是"门面 + 准备";脏活在 #executeloop()

3. #execute:把记忆、指令、工具、模型摆上桌

#execute(packages/core/src/agent/agent.ts:6317)是"摆桌子"的地方。它解析出本次运行需要的一切:

  • 线程与资源 ID。 从 requestContext / 选项 / 快照里解析 threadIdresourceId(agent.ts:6414 起)。注意一个安全细节:requestContext 里的保留键优先——中间件按已认证用户设的 resourceId 会盖过请求体里的值,防止攻击者传别人的 ID 来劫持记忆(agent.ts:6411 的注释)。
  • 指令(系统提示)。 getInstructions(...)(agent.ts:6479),支持动态(函数式)指令。
  • 模型。 再次 getLLM(agent.ts:6431),拿到 MastraLLMVNext
  • 输入处理器链。 包括三个记忆处理器(下一章细讲),它们往一个 MessageList 里塞历史 / 召回 / 工作记忆。

摆好后,把消息列表、工具、模型一起交给 loop()

4. loop():真正的流式循环

loop()(packages/core/src/loop/loop.ts:11)是核心。它做的事可以拆成几步:

(1) 兜底与 ID。 模型数组空了直接抛 LOOP_MODELS_EMPTY 错误(loop.ts:35-43)。生成 runIdresponseMessageId(loop.ts:47-85)。

(2) 跨迭代共享的处理器状态。 processorStates 是一个 Map,跨循环迭代持久,被所有处理器方法共享(loop.ts:95-97)——这让处理器能记住"这次运行里我已经做过什么"。

(3) 把循环交给一个内部 workflow。 这是巧妙之处:agentic loop 本身被实现成一个 workflow stream——workflowLoopStream(workflowLoopProps)(loop.ts:136)。也就是说"模型→工具→喂回→再模型"的迭代,复用了 §4 章 workflow 引擎的执行机制(在 packages/core/src/loop/workflows/)。

(4) 包装成统一输出对象。 最终 new MastraModelOutput({...})(loop.ts:144)把流包成对外的输出对象:既能 for await 流式消费,也能 await 聚合属性(.text.toolCalls)。返回 createDestructurableOutput(modelOutput)(loop.ts:170)。

下面这张图说明循环"循环的是什么":

组好的 MessageList + tools + model


┌──────────────┐
│ 调一次模型 │◀──────────────┐
└──────┬───────┘ │
│ 模型输出 │
┌────────┴─────────┐ │
▼ ▼ │ 把工具结果
出文本/finish 要调工具 │ 作为新消息
│ │ │ 喂回 MessageList
▼ ▼ │
结束循环 执行工具 ──────────────┘
(concurrency 可控)

怎么读: 每次"调一次模型"是一轮;只要模型还在要工具,就执行、喂回、再来一轮;直到模型出最终文本或 finish。toolCallConcurrency(loop.ts:26)控制同一轮里多个工具调用的并发。

5. MessageList:为什么需要一个"消息容器"

你可能以为消息就是个数组。但 Mastra 夹在多个 AI SDK 版本中间(v4/v5/v6),每个版本的消息格式都不同;而且记忆系统需要知道每条消息的来源(是用户输入?是召回的旧消息?是模型回复?)。

MessageList(packages/core/src/agent/message-list/message-list.ts)就是解决这个的:

  • 多格式互转。 内置 AIV4Adapter/AIV5Adapter/AIV6Adapter(message-list.ts:12),用 .get.all.aiV5.ui() 这种链式取不同格式。
  • 带来源标签。 add(messages, 'memory')addSystem(instruction, 'memory')——第二个参数是来源。语义召回正是用 'memory' 标签把召回的旧消息加进去(下一章会看到)。

教学示例,体会它的用法:

// 示意,非源码:展示 MessageList 的来源标签与格式转换
const list = new MessageList();
list.add(userMessage, 'user'); // 用户输入
list.add(recalledOldMessages, 'memory'); // 语义召回的旧消息,标来源
list.addSystem(workingMemoryText, 'memory'); // 工作记忆,作为系统消息

const forV5Model = list.get.all.aiV5.ui(); // 取成 AI SDK v5 的 UI 消息格式
// 重点看:同一份消息,按目标模型版本取不同格式;来源标签让记忆系统能区分新旧

6. 一个进阶模式:streamUntilIdle(后台任务驱动的续跑)

普通 stream() 一轮对话结束就关流。但有些工具是"后台任务"——比如"帮我研究 Solana",工具在后台跑很久。streamUntilIdle(agent.ts:7781,以及 stream(messages, { untilIdle: true }))让流保持打开,直到本轮派发的所有后台任务都完成。

它的不变量值得记(摘自 agent.ts:7752 起的方法注释):

  • 同一时刻只有一个内部 LLM 流在跑;期间到达的后台完成会排队,本轮结束后再处理。
  • 后台任务完成时,其结果由 tool-call 步骤的 onResult 钩子注入记忆,然后重新进入 agentic loop(agent.stream([], ...)),让模型立刻处理结果——不用等新的用户消息
  • 没有记忆配置时降级为普通 stream()(续跑依赖记忆)。

这是"让 agent 在你不输入时也能自己往下走"的机制,本质是"后台事件 → 喂回记忆 → 重启循环"。

7. 巧妙之处

  • agentic loop = 一个 workflow。 把"模型↔工具"的迭代实现成 workflow stream(loop.ts:136 workflowLoopStream),复用了 suspend/resume、序列化等基础设施——所以工具调用本身也能"暂停等审批"(requireToolApproval,loop.ts 透传)。这是把两个看似不同的子系统(agent 循环 / workflow)统一到一套引擎上的关键决策。
  • 处理器状态跨迭代持久。 processorStates 这个 Map(loop.ts:97)在整次运行里活着,让记忆/守卫类处理器能累积状态而不是每轮从零开始。
  • 安全:保留键优先。 #execute 里 requestContext 的 threadId/resourceId 盖过请求体(agent.ts:6411 注释),把"记忆归属"钉死在认证身份上。

8. 边界与局限

  • v1 模型不支持 stream() 必须用 v5+ 模型或 streamLegacy()(agent.ts:7656AGENT_STREAM_V1_MODEL_NOT_SUPPORTED)。这是上游 AI SDK 演进留下的双轨。
  • streamUntilIdle 的聚合视图只覆盖第一轮。 fullStream 跨所有续跑,但 .text/.toolCalls 等聚合属性只对第一轮结算(agent.ts:7760 起的注释明说)——要跨续跑聚合得自己累加。

9. 代码地图

主题文件关键符号
stream 入口packages/core/src/agent/agent.tsAgent.stream#executegetLLMgetInstructions
流式循环packages/core/src/loop/loop.tsloopworkflowLoopStreamprocessorStatestoolCallConcurrency
续跑模式packages/core/src/agent/agent.tsstreamUntilIdlerunStreamUntilIdle
消息容器packages/core/src/agent/message-list/message-list.tsMessageListAIV5Adapter(message-list.ts:12)
循环 workflow 实现packages/core/src/loop/workflows/agentic-loopstream.ts