跳到主要内容

团队编排:谁先说、谁后说

前两章:Core 让消息能流、AssistantAgent 让单个 agent 能想。这一章把它们拼成"团队":多个 agent 在一个群里协作,由一个主持人决定每轮谁发言。这是 AutoGen 价值最集中的地方。

1. 它要解决的小问题

几个 agent 要协作,得回答两个问题:(a)它们怎么共享上下文、互相看到对方说了啥?(b)每一轮,下一个该谁发言?AutoGen 的答案:(a)用 Core 的发布订阅,所有人订一个"群频道";(b)抽象出一个 select_speaker 方法,不同团队给不同实现。

2. 直觉:把 AgentChat 摆到 Core 运行时上

BaseGroupChat 的职责,用它自己的话说,是"provides the mapping between the agents of the AgentChat API and the agent runtime of the Core API"(_base_group_chat.py:59-61)。具体怎么映射:

  • 每个团队实例有个唯一 _team_id(UUID),用来生成互不冲突的 topic 名(_base_group_chat.py:109)。
  • 定义几类 topic:群频道 group_topic_{team_id}(广播给所有人)、每个参与者的私频道 {name}_{team_id}、主持人的专属频道、还有一个输出频道 output_topic_{team_id}(把消息中继到外部输出队列)(:117-127)。
  • 每个参与者不是直接注册,而是被包进一个 ChatAgentContainer(:178-189)。容器是个 Core agent,负责"缓冲消息→被点名时驱动里面的 AgentChat agent→把回复广播回主持人"。

3. 图示:topic 与订阅怎么接

群频道 group_topic (广播:人人都订)
┌──────────────┬───────────────┬──────────────┐
│ │ │ │
[Manager] [Container A] [Container B] ... 每个 Container 还订自己的私频道
│ ▲ ▲ (主持人点名时朝私频道发
│ │ │ GroupChatRequestPublish GroupChatRequestPublish)
│ │ ▼
│ │ 跑里面的 AssistantAgent
│ │ │ GroupChatAgentResponse (广播回 manager 频道)
│ └───────────┘

output_topic ──▶ _output_message_queue ──▶ run_stream() 吐给你

这些订阅在 _init 里建立:给每个参与者加"订自己私频道"+"订群频道"两条 TypeSubscription(_base_group_chat.py:206-210),给主持人加"订自己频道"+"订群频道"+"订输出频道"(:229-243)。

4. 主线:一次 run 的事件流

run_stream(_base_group_chat.py:351)是真正的引擎,run(:247)只是把它跑完取最后的 TaskResult。走一遍:

  1. task 规范成消息列表(字符串→TextMessage)(:455-472)。
  2. 启动嵌入式运行时(若未传外部 runtime),首次运行调 _init 注册所有人(:487-493)。
  3. 点对点GroupChatStart 发给主持人(:534-538)。
  4. 一个 while True 循环不断从 _output_message_queue 取消息 yield 出去,直到取到 GroupChatTermination 就 break,最后 yield TaskResult(:544-564)。

主持人侧(BaseGroupChatManager)用 Core 的装饰器区分处理器:

  • @rpc handle_start(_base_group_chat_manager.py:86):收到起始消息,广播给所有参与者(让大家有上下文),然后调 _transition_to_next_speakers 选人。
  • @event handle_agent_response(:134):每收到一个参与者的回复,并入消息线、从"在说话的人"列表里移除该人;等所有被点的人都回完(_active_speakers 空了)才检查终止、再选下一轮(:152-164)。

选人 + 发麦的核心(_transition_to_next_speakers,:172):

# _base_group_chat_manager.py:172-193 (节选)
speaker_names = await self.select_speaker(self._message_thread) # ← 各团队不同的策略
if isinstance(speaker_names, str):
speaker_names = [speaker_names]
...
for speaker_name in speaker_names:
speaker_topic_type = self._participant_name_to_topic_type[speaker_name]
await self.publish_message(GroupChatRequestPublish(), # ← 朝那个人的私频道"递麦"
topic_id=DefaultTopicId(type=speaker_topic_type), ...)
self._active_speakers.append(speaker_name)

参与者容器收到 GroupChatRequestPublish 就干活(ChatAgentContainer.handle_request,_chat_agent_container.py:85):把缓冲的消息喂给里面的 agent 跑 on_messages_stream,把最终 Response 包成 GroupChatAgentResponse 广播回主持人频道(:130-149)。注意它容错:agent 抛异常时改发 GroupChatError(:150-159)。

5. 三种选人策略(select_speaker 的三种实现)

select_speaker 是抽象方法(_base_group_chat_manager.py:305),三个团队各给一套:

5.1 RoundRobin —— 轮流(最简单)

就是"取模轮一圈":

# _round_robin_group_chat.py:72-80 (select_speaker 节选)
current_speaker_index = self._next_speaker_index
self._next_speaker_index = (current_speaker_index + 1) % len(self._participant_names)
current_speaker = self._participant_names[current_speaker_index]
return current_speaker

确定性、无需模型。适合"流水线式"协作。

5.2 Selector —— 让模型挑下一个发言人

SelectorGroupChatManager._select_speaker(_selector_group_chat.py:232)把"角色描述 + 参与者列表 + 历史"塞进 selector_prompt,问模型"下一个该谁",然后从模型回复里正则匹配出被提到的 agent 名。三个巧妙处:

  • 重试纠错: 一个 while num_attempts < max_attempts 循环。若模型没提到合法名字、或提到多个、或(在不许重复时)提到上一个人,就给一条 feedback 让它重选(:247-296)。
  • OpenAI 用 system、其它用 user: if ModelFamily.is_openai(...) 决定把 selector prompt 放 SystemMessage 还是 UserMessage,因为"Many other models need a UserMessage to respond to"(:240-245)。
  • 兜底: 重试耗尽,有上一个发言人就复用、否则用第一个参与者(:303-310)。

名字匹配 _mentioned_agents(:310)还很贴心:三种匹配条件写在 docstring 里(:312-315)——精确名、下划线名匹配空格版(Story_writer == Story writer)、下划线匹配转义下划线版(Story_writer == Story\_writer);对应的 re.escape + 词边界 regex 在 :328-340

你也可以传 selector_func 完全自定义选人逻辑(此时跳过模型,:163-169),或传 candidate_func 先筛候选再让模型从中选。

5.3 Swarm —— 按 handoff 路由

不靠轮转也不靠模型挑,而是看最近一条 HandoffMessage 指向谁:

# _swarm_group_chat.py:90-98 (select_speaker 节选)
for message in reversed(thread):
if isinstance(message, HandoffMessage):
self._current_speaker = message.target
assert self._current_speaker in self._participant_names
return [self._current_speaker]
return self._current_speaker

回想第 2 章:AssistantAgent 调用 handoff 工具就产出 HandoffMessage(target=...)。Swarm 就读这个 target 决定下一个发言人——agent 自己"显式交接"控制权,像接力棒。Swarm 还在构造时校验第一个参与者必须能产出 HandoffMessage(_swarm_group_chat.py:263)。

三种策略对比:

团队谁决定下一个发言人要模型吗典型场景
RoundRobinGroupChat取模轮流固定流水线、辩论
SelectorGroupChat一个 LLM 看历史挑人动态、按需找专家
Swarm最近 HandoffMessage.target否(由 agent 自己交接)agent 主动交接的工作流

此外还有 Magentic-One 团队(_group_chat/_magentic_one/),用一个带"任务账本(ledger)"的 orchestrator 做规划式编排——本库未逐行精读其算法,这里仅指出位置:_magentic_one_orchestrator.py、prompts 在 _prompts.py

6. 终止条件:什么时候停

团队靠 TerminationCondition 决定停。它是个 ABC,核心是 async __call__(messages) -> StopMessage | None(base/_termination.py:59)——返回非 None 就停。最妙的是可组合:重载了 __and__ / __or__(:79-83),于是能写 MaxMessageTermination(10) | TextMentionTermination("TERMINATE") 这种"任一命中即停"。主持人在每轮回复后调 _apply_termination_condition(_base_group_chat_manager.py:195)检查,命中就 _signal_termination 发出终止事件,run_stream 那个 while 循环收到就收尾。

7. 关键细节 / 坑

  • 串行保序: 主持人和容器都继承 SequentialRoutedAgent(_base_group_chat_manager.py:25),对某些消息类型(如 GroupChatStart/GroupChatAgentResponse)强制串行处理——否则第 1 章说的"并发交错"会让消息线乱序。这是 Core 的并发模型与"群聊要有序"之间的桥。
  • 嵌入式运行时不吞异常: 团队自建运行时时显式 ignore_unhandled_exceptions=False,注释解释:吞了异常会导致"non-surfaced exceptions and early team termination"(_base_group_chat.py:139-142)。
  • 状态可存取: save_state/load_stateagent 名字(不是 ID)做 key,好让状态在不同 team/runtime 间可移植(_base_group_chat.py:766-771)。
  • team 可作为参与者: 容器的 handle_request 判断参与者是不是 Team,是的话跑它的 run_stream——所以团队能嵌套(_chat_agent_container.py:89-112)。

8. 小结

团队 = 用 Core 的发布订阅把每个 agent(包在容器里)接到群频道 + 一个主持人在每轮调 select_speaker 递麦。三种选人策略覆盖"轮流/模型挑/自己交接"。终止条件可组合。整套东西完全建在第 1 章的运行时和第 2 章的 agent 之上——这就是 AutoGen 分层设计的回报。