Mastra — Agent 的中央循环
本章讲主线:一次
agent.stream()从入口到模型再到落库,中间发生了什么。读完你能讲清"agentic loop 到底循环的是什么"。
1. 这一章解决的问题
LLM 本身只会做一件事:给它一段文本,它吐 一段文本。要让它用工具,你得自己:把工具描述拼进请求、解析它"想调哪个工具"、真的去执行、把结果再拼回去、然后再问一次模型——如此往复,直到它不再要调工具、给出最终答案。
这个"反复推回桌前"的过程就是 agentic loop。Mastra 的 Agent 类负责入口和准备工作,loop() 负责真正的循环。
2. 入口:Agent.stream() 做了哪些准备
stream() 在真正循环前,做了一连串"准备"。看签名末端的主体(packages/core/src/agent/agent.ts:7596 起,async stream<OUTPUT = TOutput>):
它依次做这些事(每步一句话):
- 合并选项。 把 agent 的默认选项和本次调用选项
deepMerge(agent.ts:7612deepMerge(defaultOptions, ...))。 - 校验请求上下文。
#validateRequestContext(agent.ts:7607)。 untilIdle分流。 若开了untilIdle,转给runStreamUntilIdle(agent.ts:7626)——这是"流不关,等后台任务做完再继续"的模式(见 §6)。- 权限校验。
#requireAgentExecutionFGA(agent.ts:7637)——细粒度授权,防止越权访问别人的记忆。 - 解析模型。
getLLM(...)(agent.ts:7644):把'openai/gpt-4o'这种字符串解 析成真实 LanguageModel,并检查它是不是受支持的版本(v1 报错让你用streamLegacy())。 - 进
#execute。 真正的执行在私有方法#execute(agent.ts:6317)里。
关键直觉: stream() 本身不循环,它是"门面 + 准备";脏活在 #execute 和 loop()。
3. #execute:把记忆、指令、工具、模型摆上桌
#execute(packages/core/src/agent/agent.ts:6317)是"摆桌子"的地方。它解析出本次运行需要的一切:
- 线程与资源 ID。 从 requestContext / 选项 / 快照里解析
threadId、resourceId(agent.ts:6414起)。注意一个安全细节:requestContext 里的保留键优先——中间件按已认证用户设的resourceId会盖过请求体里的值,防止攻击者传别人的 ID 来劫持记忆(agent.ts:6411的注释)。 - 指令(系统提示)。
getInstructions(...)(agent.ts:6479),支持动态(函数式)指令。 - 模型。 再次
getLLM(agent.ts:6431),拿到MastraLLMVNext。 - 输入处理器链。 包括三个记忆处理器(下一章细讲),它们往一个
MessageList里塞历史 / 召回 / 工作记忆。
摆好后,把消息列表、工具、模型一起交给 loop()。
4. loop():真正的流式循环
loop()(packages/core/src/loop/loop.ts:11)是核心。它做的事可以拆成几步:
(1) 兜底与 ID。 模型数组空了直接抛 LOOP_MODELS_EMPTY 错误(loop.ts:35-43)。生成 runId、responseMessageId(loop.ts:47-85)。
(2) 跨迭代共享的处理器状态。 processorStates 是一个 Map,跨循环迭代持久,被所有处理器方法共享(loop.ts:95-97)——这让处理器能记住"这次运行里我已经做过什么"。
(3) 把循环交给一个内部 workflow。 这是巧妙之处:agentic loop 本身被实现成一个 workflow stream——workflowLoopStream(workflowLoopProps)(loop.ts:136)。也就是说"模型→工具→喂回→再模型"的迭代,复用了 §4 章 workflow 引擎的执行机制(在 packages/core/src/loop/workflows/)。
(4) 包装成统一输出对象。 最终 new MastraModelOutput({...})(loop.ts:144)把流包成对外的输出对象:既能 for await 流式消费,也能 await 聚合属性(.text、.toolCalls)。返回 createDestructurableOutput(modelOutput)(loop.ts:170)。
下面这张图说明循环"循环的是什么":
组好的 MessageList + tools + model
│
▼
┌──────────────┐
│ 调一次模型 │◀──────────────┐
└──────┬───────┘ │
│ 模型输出 │
┌────────┴─────────┐ │
▼ ▼ │ 把工具结果
出文本/finish 要调工具 │ 作为新消息
│ │ │ 喂回 MessageList
▼ ▼ │
结束循环 执行工具 ──────────────┘
(concurrency 可控)
怎么读: 每次"调一次模型"是一轮;只要模型还在要工具,就执行、喂回、再来一轮;直到模型出最终文本或 finish。toolCallConcurrency(loop.ts:26)控制同一轮里多个工具调用的并发。
5. MessageList:为什么需要一个"消息容器"
你可能以为消息就是个数组。但 Mastra 夹在多个 AI SDK 版本中间(v4/v5/v6),每个版本的消息格式都不同;而且记忆系统需要知道每条消息的来源(是用户输入?是召回的旧消息?是模型回复?)。
MessageList(packages/core/src/agent/message-list/message-list.ts)就是解决这个的:
- 多格式互转。 内置
AIV4Adapter/AIV5Adapter/AIV6Adapter(message-list.ts:12),用.get.all.aiV5.ui()这种链式取不同格式。 - 带来源标签。
add(messages, 'memory')、addSystem(instruction, 'memory')——第二个参数是来源。语义召回正是用'memory'标签把召回的旧消息加进去(下一章会看到)。
教学示例,体会它的用法:
// 示意,非源码:展示 MessageList 的来源标签与格式转换
const list = new MessageList();
list.add(userMessage, 'user'); // 用户输入
list.add(recalledOldMessages, 'memory'); // 语义召回的旧消息,标来源
list.addSystem(workingMemoryText, 'memory'); // 工作记忆,作为系统消息
const forV5Model = list.get.all.aiV5.ui(); // 取成 AI SDK v5 的 UI 消息格式
// 重点看:同一份消息,按目标模型版本取不同格式;来源标签让记忆系统能区分新旧
6. 一个进阶模式:streamUntilIdle(后台任务驱动的续跑)
普通 stream() 一轮对话结束就关流。但有些工具是"后台任务"——比如"帮我研究 Solana",工具在后台跑很久。streamUntilIdle(agent.ts:7781,以及 stream(messages, { untilIdle: true }))让流保持打开,直到本轮派发的所有后台任务都完成。
它的不变量值得记(摘自 agent.ts:7752 起的方法注释):
- 同一时刻只有一个内部 LLM 流在跑;期间到达的后台完成会排队,本轮结束后再处理。
- 后台任务完成时,其结果由 tool-call 步骤的
onResult钩子注入记忆,然后重新进入 agentic loop(agent.stream([], ...)), 让模型立刻处理结果——不用等新的用户消息。 - 没有记忆配置时降级为普通
stream()(续跑依赖记忆)。
这是"让 agent 在你不输入时也能自己往下走"的机制,本质是"后台事件 → 喂回记忆 → 重启循环"。
7. 巧妙之处
- agentic loop = 一个 workflow。 把"模型↔工具"的迭代实现成 workflow stream(
loop.ts:136workflowLoopStream),复用了 suspend/resume、序列化等基础设施——所以工具调用本身也能"暂停等审批"(requireToolApproval,loop.ts透传)。这是把两个看似不同的子系统(agent 循环 / workflow)统一到一套引擎上的关键决策。 - 处理器状态跨迭代持久。
processorStates这个 Map(loop.ts:97)在整次运行里活着,让记忆/守卫类处理器能累积状态而不是每轮从零开始。 - 安全:保留键优先。
#execute里 requestContext 的threadId/resourceId盖过请求体(agent.ts:6411注释),把"记忆归属"钉死在认证身份上。
8. 边界与局限
- v1 模型不支持
stream()。 必须用 v5+ 模型或streamLegacy()(agent.ts:7656的AGENT_STREAM_V1_MODEL_NOT_SUPPORTED)。这是上游 AI SDK 演进留下的双轨。 streamUntilIdle的聚合视图只覆盖第一轮。fullStream跨所有续跑,但.text/.toolCalls等聚合属性只对第一轮结算(agent.ts:7760起的注释明说)——要跨续跑聚合得自己累加。
9. 代码地图
| 主题 | 文件 | 关键符号 |
|---|---|---|
| stream 入口 | packages/core/src/agent/agent.ts | Agent.stream、#execute、getLLM、getInstructions |
| 流式循环 | packages/core/src/loop/loop.ts | loop、workflowLoopStream、processorStates、toolCallConcurrency |
| 续跑模式 | packages/core/src/agent/agent.ts | streamUntilIdle、runStreamUntilIdle |
| 消息容器 | packages/core/src/agent/message-list/message-list.ts | MessageList、AIV5Adapter(message-list.ts:12) |
| 循环 workflow 实现 | packages/core/src/loop/workflows/ | agentic-loop、stream.ts |