Agency Swarm — 架构与原理
30 秒导读: Agency Swarm 是一个多智能体编排框架。你定义几个有名字、有职责的 Agent(像公司里的 CEO、分析师、报告员),用「谁能给谁发消息」的有向通信流把他们连起来,然后给整个 "Agency" 发一句话。框架在底层调用 OpenAI Agents SDK 跑每个 agent,agent 之间靠一个自动生成的
send_message工具互相委派任务。它本身不重写推理循环——那是 SDK 干的;它加的是「组织结构 + 通信 + 共享历史 + 持久化」这一层。
1. 这是什么(零基础也能懂)
一句话定义: Agency Swarm 让你把「一个复杂任务」拆给「一群各有专长的 AI agent」去协作完成,而协作的规则(谁能找谁帮忙)由你显式画出来。
解决什么问题 / 给谁用。 假设你要做一个「投资研究」系统:一个 agent 负责拉行情、一个负责评估风险、一个负责写报告。你不想把这些塞进一个巨型 prompt,而想让它们像真实团队那样分工——协调员拿到任务后,自己干一部分,再把子任务委派给专家,收齐结果后汇总。Agency Swarm 就是给这种「按组织结构思考自动化」的人用的。
它能做什么(功能):
- 定义多个
Agent,每个有自己的instructions、tools、description。 - 用
communication_flows声明有向的委派关系(A > B表示 A 可以给 B 发消息,反过来不行)。 - agent 之间通过一个统一的
send_message工具通信,框架自动按通信流给每个 agent 装好这个工具。 - 所有对话(用户↔agent、agent↔agent)存进一张扁平消息表,每条带
agent/callerAgent标签,可经回调持久化到数据库/文件。 - 支持流式输出、结构化输出(
output_type)、守卫(guardrails)、文件/向量库、MCP 工具、FastAPI 部署、终端 UI 等。
用起来什么样。 一个最小的真实例子(摘自 examples/multi_agent_workflow.py):
# 示意,基于 examples/multi_agent_workflow.py 精简
from agency_swarm import Agency, Agent
portfolio_manager = Agent(name="PortfolioManager", instructions="...协调研究...", tools=[fetch_market_data])
risk_analyst = Agent(name="RiskAnalyst", instructions="...评估风险...", tools=[analyze_risk_factors])
report_generator = Agent(name="ReportGenerator", instructions="...写报告...")
agency = Agency(
portfolio_manager, # 位置参数 = 入口 agent(用户对话从这里进)
communication_flows=[ # 有向边:谁能给谁发消息
portfolio_manager > risk_analyst, # 用 > 运算符画一条委派边
portfolio_manager > report_generator,
],
)
response = await agency.get_response("分析 AAPL 的投资机会")
print(response.final_output)
用户只跟 PortfolioManager 说话。PortfolioManager 在自己的推理过程中,会调用框架塞给它的 send_message 工具去找 RiskAnalyst 和 ReportGenerator 要结果,最后把答案汇总返回。
一句话直觉/类比。 把它想成给 AI 搭一个公司组织架构图:Agent 是员工,communication_flows 是「谁向谁汇报/谁能给谁派活」的实线箭头,send_message 是公司内部的「发工单」按钮,而那张扁平消息表是全公司共享的会议记录。OpenAI Agents SDK 是每个员工脑子里的「思考引擎」;Agency Swarm 是把员工组织起来、管好通信和记录的那层 HR + 协作平台。
2. 顶层全景(它大概怎么转)
主要部件
下图是「用户发一句话 → 协调 agent → 委派给专家 → 汇总返回」的控制流。怎么读:实线箭头 = 调用/委派方向;所有 agent 共享中间那张消息表。
用户 / 外部调用
│ agency.get_response("...")
▼
┌─────────────────────────────────────────────┐
│ Agency (agency/core.py) │
│ · 持有 agents{}、entry_points[] │
│ · 持有 ThreadManager(全局历史) │
│ · 按 communication_flows 给每个 agent │
│ 装好 send_message 工具 │
└───────────────┬───────────────────────────────┘
│ 解析 recipient,构造 AgencyContext
▼
┌─────────────────────────────────────────────┐
│ 入口 Agent (agent/core.py, 无状态) │
│ Execution.get_response() ← agent/execution.py│
│ · 取历史 → 调 agents.Runner(SDK)跑推理循环 │
│ · 把新产生的消息打上元数据写回历史表 │
└───────────────┬───────────────────────────────┘
│ 模型决定调用 send_message 工具
▼
┌─────────────────────────────────────────────┐
│ SendMessage.on_invoke_tool() │
│ (tools/send_message.py) │
│ · 查 recipient → 调 recipient.get_response() │
│ · 把子 agent 的最终输出当作工具结果返回 │
└───────────────┬───────────────────────────────┘
▼
专家 Agent(同样跑一遍 Execution)
│
▼
┌───────────────────────────────┐
│ ThreadManager / MessageStore │ ← 所有 agent 读写同一张表
│ (utils/thread.py) │ 每条带 agent / callerAgent 标签
└───────────────────────────────┘
部件一句话职责
| 部件 | 干什么 | 在哪个文件 |
|---|---|---|
Agency | 顶层编排器:注册 agent、解析通信流、装 send_message 工具、入口路由 | src/agency_swarm/agency/core.py |
Agent | 在 SDK Agent 上扩展的无状态 agent;运行靠注入的 context | src/agency_swarm/agent/core.py |
Execution | 一次 agent turn 的真正流程:取历史、调 Runner、存结果 | src/agency_swarm/agent/execution.py |
SendMessage | 动态生成的 FunctionTool,调用即「让 recipient agent 跑一轮并返回其输出」 | src/agency_swarm/tools/send_message.py |
AgencyContext / AgentRuntimeState | 运行期注入给 agent 的「环境」:线程管理器、子 agent、send_message 工具实例 | src/agency_swarm/agent/context_types.py |
MasterContext | SDK Runner 在运行中传递的共享上下文(所有 agent、user_context、线程管理器) | src/agency_swarm/context.py |
ThreadManager / MessageStore | 扁平消息表 + 持久化回调 | src/agency_swarm/utils/thread.py |
MessageFormatter | 给消息打 agent/callerAgent 等元数据、出库前剥掉元数据、历史协议校验 | src/agency_swarm/messages/message_formatter.py |
主线走一遍(高层,不进代码)
-
构造期。
Agency.__init__收到入口 agent(位置参数)和communication_flows(关键字参数),解析出所有(sender, receiver)边(parse_agent_flows),给每对边在 sender 身上注册一个 send_message 工具(configure_agents→register_subagent)。见agency/core.py:136-201。 -
入口。 用户调
agency.get_response(msg)。框架挑出 recipient(默认第一个入 口 agent),为它构造一份AgencyContext(把全局ThreadManager、该 agent 的runtime_state打包进去),委派给agent.get_response(...)。见agency/responses.py:136-215。 -
跑一轮。
Execution.get_response从历史表取出「这一对 agent 的对话历史」,加上新消息,丢给 SDK 的agents.Runner去跑标准的「调模型 → 调工具 → 再调模型」循环。见agent/execution.py:73-282。 -
委派。 如果模型决定调用
send_message,工具的on_invoke_tool就同步地调用 recipient agent 的get_response,把它的final_output当作工具结果返回给 sender 的模型继续推理。见tools/send_message.py:314-539。 -
落历史。 每轮产生的新消息都被
MessageFormatter打上agent/callerAgent等标签,写进同一张扁平表;若配了持久化回调,顺手存盘。见agent/execution.py:284-347、utils/thread.py:151-167。
3. 接下来读哪一章
这是一个中大型框架,按由浅入深拆成了四章:
-
01-agency-and-flows.md— 先搞懂编排层:通信流怎么从agent1 > agent2这种运算符语法解析出来、send_message工具是怎么按边动态长出来的、Handoff(转交控制权)和send_message(同步求助)有什么本质区别。 -
02-execution-and-context.md— 跟着一次get_response端到端走一遍,理解「Agent 无状态、状态全在 context」这个核心设计:AgencyContext、MasterContext、AgentRuntimeState各管什么,以及为什么 send_message 要给 recipient 临时造一个上下文。 -
03-threads-and-messages.md— 看记忆层:一张扁平消息表怎么用agent/callerAgent两个标签同时表达「用户线程」和「agent↔agent 子对话」,持久化回调怎么接数据库,以及历史协议(Responses vs Chat Completions)校验为什么存在。 -
04-deep-dive.md— 提炼巧妙之处(无状态 agent + 运行期注入、共享扁平历史、send_message 的待处理去重锁、对话启动器缓存)、边界与局限、横向对比(对比同 shelf 的其他多 agent 框架取舍)、以及代码地图(符号级跳转表)。