跳到主要内容

记忆层:扁平消息表与多 agent 历史

这章讲 Agency Swarm 怎么记住对话。核心是一个看似简单却很巧的决定:不按 agent 分桶存历史,而是所有消息放一张扁平表,每条贴两个标签(agent = 收件人,callerAgent = 发件人),再靠这两个标签在读取时「过滤出需要的那段」。

3.9 一张扁平表,两个标签

它要解决的小问题。 多 agent 系统里有好几种对话同时进行:用户↔入口 agent、manager↔analyst、manager↔writer……怎么存,才能既各自隔离、又能整体回放和持久化?

思路。 用一张扁平 list(MessageStore.messages),给每条消息打上 agent(谁在回应)和 callerAgent(谁发起的,None = 用户)。需要某段对话时,按这两个字段过滤即可,不用维护多个独立 thread 结构。源码注释明说这是「替换了之前基于 thread 的结构」:storing all messages in a single flat list with agent/callerAgent metadata embedded(src/agency_swarm/utils/thread.py:14-20)。

MessageStore.messages (一张扁平表)
┌──────────────────────────────────────────────────────────┐
│ #1 role=user agent=Manager callerAgent=None │ ← 用户线程
│ #2 role=assistant agent=Manager callerAgent=None │ ← 用户线程
│ #3 role=user agent=Analyst callerAgent=Manager │ ┐ Manager→Analyst
│ #4 role=assistant agent=Analyst callerAgent=Manager │ ┘ 子对话
│ #5 role=assistant agent=Manager callerAgent=None │ ← 回到用户线程
└──────────────────────────────────────────────────────────┘
读「用户线程」: 过滤 callerAgent is None → #1,#2,#5
读「Manager↔Analyst」: 过滤这两者之间 → #3,#4

两种读取怎么实现。 ThreadManager.get_conversation_history(utils/thread.py:177-196)分两路:

  • caller_agent is None(用户对话):返回所有 callerAgent is None 的消息,忽略 recipient agent 名。这是关键设计——它保证「所有入口 agent 共享同一条用户线程」,见 thread.py:192-194 及 docstring thread.py:182-188
  • 否则:返回这两个 agent 双向之间的消息(MessageStore.get_conversation_between,thread.py:80-100)。

为什么用户线程要跨入口 agent 共享? 这样用户从「跟 ManagerA 聊」切到「跟 ManagerB 聊」时,后者能看到之前的完整用户对话,体验是「跟同一个公司聊」,而不是「换了个人要重新交代」。配套地,agency/responses.py 在用户切换 recipient 时会注入一条 recipient_reminder system 消息(_build_user_message_with_recipient_reminder,responses.py:107-133)。

3.10 排序:插入序,不是时间戳

一个容易踩的直觉错误。 消息上确实带 timestamp(微秒级,message_formatter.py:239-250),但历史的实际顺序是插入序,时间戳只作诊断用。get_all_messages 直接 messages.copy() 按插入序返回,注释明说 Timestamps are preserved for diagnostic purposes but don't drive ordering(utils/thread.py:198-206)。Handoff 提醒消息为了排在正确位置,会把时间戳「+1」做微调(send_message.py:631-633),但这只是兜底。

3.11 持久化:一对回调

它要解决的小问题。 进程重启后对话要还能续上,得能存到数据库/文件再读回。

思路。 Agency Swarm 不内置存储后端,而是要你给两个回调:

  • load_threads_callback() -> list[消息] —— ThreadManager 初始化时调一次,把历史灌进内存表(thread.py:208-222)。
  • save_threads_callback(list[消息]) —— 每次 add_message(s) 后自动调,把整张表交出去存(thread.py:234-242)。
# 示意,非源码:接一个文件后端
def load_threads(): # 进程启动时还原
return json.load(open("chat.json")) if os.path.exists("chat.json") else []

def save_threads(messages): # 每次有新消息就整体落盘
json.dump(messages, open("chat.json", "w"))

agency = Agency(manager, load_threads_callback=load_threads, save_threads_callback=save_threads)

两条触发路径。 除了 ThreadManager 内部每次写入即存,Agency 还在两个回调都给齐时装一个 PersistenceHooks,它实现 SDK 的 RunHooks,在 on_run_end 时再保存一次整表,见 src/agency_swarm/hooks.py:74-94。加载只在 ThreadManager 初始化做一次,on_run_start 故意不重复加载(hooks.py:61-72)。

注意:框架交给回调的永远是带 agency 元数据的完整消息;只有在投喂给 OpenAI 之前,MessageFormatter.strip_agency_metadata 才把 agent/callerAgent/citations 等私有字段剥掉(message_formatter.py:351-366metadata_fields 清单在 message_formatter.py:51-61)。所以你的存储里能看到完整标签,但 OpenAI 看不到。

3.12 历史协议校验:别把两种格式混进一条历史

它要解决的小问题。 OpenAI 有两套消息格式:Responses API 风格(function_call / call_id)和 Chat Completions 风格(role: tool / tool_calls)。如果换了模型导致一条历史里两种格式混着,SDK 会报错且难懂。

思路:前置体检,给出可操作的错误。 prepare_history_for_runner 在投喂前调 _ensure_history_protocol_compatibility(message_formatter.py:174-198),它扫一遍历史判断协议:

  • 混了两种 → 抛 IncompatibleChatHistoryError,提示「开新会话或只保留 {role, content} 消息」。
  • 单一但和当前模型期望的不一致 → 同样抛错并说明期望哪种。

判定靠几个小启发式:_looks_like_responses(看 call_id / function_call 等,message_formatter.py:129-142)、_looks_like_chat_completions(看 role:tool / tool_calls,message_formatter.py:118-127)。每条存下来的消息还会带一个 history_protocol 标签(默认 responses),作为协议标注(message_formatter.py:235-238)。

这是个面向「换模型/迁移」的防御性设计:与其让底层 API 抛晦涩错误,不如在框架边界给一句人能照做的话。

接下来

三层(编排 / 执行 / 记忆)都过完了。最后一章 04-deep-dive.md 把散落的巧妙之处收拢成「可借鉴清单」,讲清边界与局限,和同 shelf 的多 agent 框架做横向对比,并给出符号级代码地图。