跳到主要内容

Core 运行时:actor 式的消息层

本章讲 AutoGen 最底下那块:消息怎么在 agent 之间流动。上层所有 agent 和团队最终都靠它收发消息。先建直觉,再看真实实现。

1. 它要解决的小问题

多个 agent 要协作,就得能互相"说话"。但"说话"有两种截然不同的需求:

  • 点对点、要回话: 我想问某个特定 agent 一个问题,等它把答案返回给我(像函数调用)。
  • 广播、不要回话: 我想把一条消息发到一个"频道",所有关心这个频道的 agent 都收到,但我不等任何人回复(像发通知)。

Core 把这两件事分别做成 send_messagepublish_message,这是理解整个框架的关键分叉。

2. 直觉:actor 模型 + 一个事件循环

AutoGen Core 是经典的 actor 模型(每个 actor 有地址、有自己的状态、靠收消息驱动行为)的一个 asyncio 实现。

三个核心概念,用"公司内部通讯"类比:

概念类比是什么
AgentId员工工号(部门+座位号)(type, key) 二元组,唯一标识一个 agent 实例
TopicId一个广播频道发布消息的目标命名空间
Subscription"谁订阅了哪个频道"把一个 topic 映射到应该收到它的 agent

关键设计:agent 不是一直存在的对象,而是按需创建的。 你注册的是一个"工厂函数"(register_factory),运行时第一次需要某个 AgentId 时才调用工厂把它造出来,之后缓存。这让"成千上万个 agent"在概念上可行——没被点到的不占内存。

3. 图示:一条消息的两条路

send_message(msg, recipient=AgentId) publish_message(msg, topic_id=TopicId)
点对点 · 要 future 回复 广播 · 无回复
│ │
▼ ▼
┌──────────────────────────────────────────────────────────┐
│ 单线程 asyncio 队列 _message_queue │
│ 装的是"信封": SendMessageEnvelope / PublishMessageEnvelope │
└──────────────────────────────────────────────────────────┘
│ _process_next() 一个个取出来

┌─────────────────────┐ ┌──────────────────────────────────┐
│ Send: 找到 recipient │ │ Publish: 问订阅表"谁订了这 topic"│
│ 调它的 on_message │ │ 给每个订阅者并发调 on_message │
│ 把返回值塞回 future │ │ (跳过发送者自己) 无回复 │
└─────────────────────┘ └──────────────────────────────────┘

怎么读这张图: 左路是"打电话等回话",右路是"群发通知"。两者都先进同一条队列,再由处理循环分流。

4. 原理演示:运行时的极简骨架

下面这段把运行时的本质浓缩成几十行,帮你建立心智模型——真实实现见 §5

# 示意,非源码:运行时本质
class MiniRuntime:
def __init__(self):
self.queue = asyncio.Queue() # 单条消息队列
self.factories = {} # type -> 造 agent 的工厂
self.instances = {} # AgentId -> 已造出的 agent
self.subscriptions = [] # (topic, agent_type) 列表

async def send_message(self, msg, recipient):
fut = asyncio.Future()
await self.queue.put(("send", msg, recipient, fut))
return await fut # 关键:send 要等 future

async def publish_message(self, msg, topic_id, sender=None):
await self.queue.put(("publish", msg, topic_id, sender)) # publish 不等

async def _process_next(self): # 处理循环每次取一条
kind, msg, target, *rest = await self.queue.get()
if kind == "send":
agent = await self._get_agent(target) # 按需造
fut = rest[0]
fut.set_result(await agent.on_message(msg)) # 回填 future
else: # publish
for agent_id in self._recipients_of(target): # 查订阅表
agent = await self._get_agent(agent_id)
await agent.on_message(msg) # 无回复

重点看两件事:send 创建一个 Futureawait 它(所以是 RPC 语义);publish 只是丢进队列就返回(fire-and-forget)。

5. 真实实现

运行时契约(协议):AgentRuntime 是一个 Protocol,定义了 send_message / publish_message / register_factory / 订阅管理等接口——_agent_runtime.py:21AgentRuntime。注意 send_message 的文档明确写它"send a message to an agent and get a response",而 publish_message 写"No responses are expected"(_agent_runtime.py:50-73)。

默认实现:SingleThreadedAgentRuntime——_single_threaded_agent_runtime.py:149。它的类文档直说:"processes all messages using a single asyncio queue. Messages are delivered in the order they are received",且"suitable for development and standalone applications. It is not suitable for high-throughput"。

消息被包成"信封":三种 dataclass —— SendMessageEnvelope(带 future,要回复)、PublishMessageEnvelope(带 topic_id)、ResponseMessageEnvelope(回复用)——_single_threaded_agent_runtime.py:57-92send_message 里就能看到那个关键 future:

# _single_threaded_agent_runtime.py:363-385 (send_message 节选)
future = asyncio.get_event_loop().create_future()
if recipient.type not in self._known_agent_names:
future.set_exception(Exception("Recipient not found"))
return await future
...
await self._message_queue.put(SendMessageEnvelope(message=message, recipient=recipient, future=future, ...))
cancellation_token.link_future(future)
return await future # ← 调用方在这里阻塞直到 agent 回话

这几行就是"RPC 语义"的来源:把信封丢进队列,然后 await future,直到 _process_send 把 agent 的返回值塞回这个 future(_single_threaded_agent_runtime.py:546-555 把响应作为 ResponseMessageEnvelope 再入队)。

处理循环:RunContext._run 是个 while True 不停调 _process_next(_single_threaded_agent_runtime.py:105-110)。_process_nextmatch 按信封类型分流,并为每条消息单独起一个 asyncio task 并发处理(_single_threaded_agent_runtime.py:689-794)。

广播路由:_process_publish 先问订阅管理器"谁订了这个 topic",再并发地给每个收件人调 on_message,而且会跳过发送者自己(避免回声):

# _single_threaded_agent_runtime.py:561-565 (_process_publish 节选)
recipients = await self._subscription_manager.get_subscribed_recipients(message_envelope.topic_id)
for agent_id in recipients:
# Avoid sending the message back to the sender
if message_envelope.sender is not None and agent_id == message_envelope.sender:
continue

按需实例化:_get_agent 没命中缓存就调工厂造一个再缓存(_single_threaded_agent_runtime.py:976-986),工厂在 register_factory 注册(:886)。

6. 订阅:广播怎么找到收件人

publish 只给一个 TopicId,运行时靠订阅把它翻译成具体 agent。最常用的是 TypeSubscription:把一个 topic_type 绑到一个 agent_type——_type_subscription.py:10TypeSubscription,构造参数就是 (topic_type, agent_type)(:33)。它的语义是"发到 topic type X 的消息,路由给 agent type X 中 key 与 topic source 相同的那个实例"。第 3 章会看到团队就是用一串 TypeSubscription 把"群频道"和"每个人的私频道"接起来的。

7. agent 怎么声明"我处理哪种消息"

你不会手写 on_message 里的大 if-else。Core 提供 RoutedAgent + 装饰器:

  • @message_handler:通吃 event 和 RPC 的处理器,按消息类型路由(_routed_agent.py:85message_handler)。它要求方法是 async、签名恰好是 (self, message, ctx)、且 message 有类型标注——运行时据此建路由表(_routed_agent.py:97-116 的文档)。
  • @event / @rpc:event 标注"这是处理广播事件的处理器"(无返回),rpc 标注"这是处理点对点请求的处理器"(有返回)。第 3 章的主持人就大量用这两个装饰器区分"收到广播该干嘛"和"收到 RPC 该干嘛"。

@message_handler 还支持 match= 二级路由:同一消息类型有多个处理器时,按方法名字母序应用 match 函数,第一个匹配的被调用(_routed_agent.py:115)。

8. 关键细节 / 坑

  • 顺序 vs 并发: 消息按入队顺序从队列取出,但每条消息的处理是各起一个 task 并发跑的(_process_next 里的 asyncio.create_task)。所以"取出有序、执行可能交错"。第 3 章会看到团队用 SequentialRoutedAgent 来强制某些 agent 串行处理,正是为了对抗这种交错。
  • 拦截器(intervention): 运行时支持 InterventionHandler,能在消息 send/publish/response 拦截、修改、甚至 DropMessage 丢弃它(_process_next 里逐个调 handler,:691-788)。这是做审计/防护/人审的钩子。
  • 后台异常: ignore_unhandled_exceptions 默认 True,事件处理器里的异常会被攒着、在下次 process_nextstop 时抛出(:164)。但团队用嵌入式运行时时故意设成 False(见第 3 章),否则异常被吞、团队会无声早退。

9. 小结

Core = 一条 asyncio 队列 + 两种投递语义(send 要回复 / publish 广播)+ 订阅路由 + 按需实例化。它完全不懂 LLM——它只懂 agent 和消息。LLM 的智能在上一层。下一章看那一层最重要的 agent。