跳到主要内容

AutoGPT — 架构与原理

30 秒导读: AutoGPT 现在不是「一个会自己想事情的 GPT 循环」,而是一个 agent 搭建与运行平台:你像拼乐高一样把「积木块」(Block)连成一张「数据流图」(Graph),平台后台的执行引擎把这张图当成一个 dataflow 程序跑起来——哪个节点的输入凑齐了就跑哪个,输出顺着连线流到下一个节点。本库讲的是这套后端引擎的原理。

1. 这是什么(零基础也能懂)

  • 一句话定义: AutoGPT 是一个可视化的 AI agent 搭建 + 托管运行平台——前端拖积木连线画出 agent,后端把这张图当成数据流程序持续地、可计费地执行。

  • 解决谁的什么问题: 假设你想要一个 agent:每天抓一批新闻 → 用 LLM 总结 → 发到你的 Slack。传统做法你得写代码、自己跑、自己接 API key、自己排错。AutoGPT 让你不写代码,在画布上拖三个积木(抓取块、LLM 块、Slack 块)连起来,平台帮你托管运行、管理凭据、按用量计费、出错重试。

  • 它能做什么:

    • 把 agent 表达成「块 + 连线」的,存进数据库;
    • 一个执行引擎把图跑起来,自动按依赖顺序调度节点、并发执行无依赖的分支;
    • 内置 ~180 个积木:LLM 调用、网页抓取、GitHub/Slack/Notion/Twitter 等几十个第三方集成、逻辑控制(if/循环)、子 agent 调用;
    • 定时(cron)、Webhook 触发、按次调用;
    • 按用量扣「积分」(credit),LLM token、运行时长都能计价。
  • 用起来什么样: 后端是一组长驻服务进程,一条命令拉起(backend/app.py:34 main):

# autogpt_platform/backend/backend/app.py:49 —— 核心后台进程(节选;实际还含 BatchExecutor / PlatformLinkingManager / CoPilotChatBridge / CoPilotExecutor 等)
run_processes(
DatabaseManager(), # 数据库访问的 RPC 服务
Scheduler(), # cron / 定时触发
NotificationManager(), # 通知
WebsocketServer(), # 给前端推执行进度
AgentServer(), # FastAPI REST API
ExecutionManager(), # ★ 执行引擎:从队列取图、跑图
...
)

用户点「运行」→ REST API 把一条「图执行请求」丢进 RabbitMQ → ExecutionManager 取出来 → 在线程池里把图跑完 → 进度经 WebSocket 实时推回前端。

  • 一句话直觉/类比: 把它想成**「给 AI agent 的 Node-RED / Zapier」**:积木是函数,连线是数据流,引擎是一个 ready-driven 的数据流虚拟机——不是从上到下顺序执行,而是「谁的输入齐了就执行谁」。

2. 顶层全景(它大概怎么转)

2.1 一次「运行 agent」的生命周期

下面这张图从左到右,是一条真实路径:用户点运行 → 图被跑完。读法:实线是控制流(谁调用谁), 标的是跨进程的队列。

用户/前端 API 进程 消息队列 执行引擎进程(ExecutionManager)
───────── ────────── ──────── ──────────────────────────────
点击「运行」 ──HTTP──▶ execute_graph 路由
add_graph_execution()
1. 校验图+输入+凭据
2. 建 GraphExecution 记录
3. 发布到队列 ─────▷ graph_execution_queue

▼ basic_consume
_handle_run_message()
提交到线程池 executor


on_graph_execution()
└─ _on_graph_execution() ← 单图调度循环
· 起始节点入队
· while 队列非空: 取节点→计费→异步跑
· 节点产出 → 路由到下游节点 → 入队
进度条 ◀──WebSocket──── send_execution_update() ◀──────┘ (每个节点状态变化都推一次)

2.2 核心部件一句话职责

部件干什么在哪个文件(符号)
Block一块积木 = 一个带输入/输出 schema 的异步函数blocks/_base.py:550 Block
Graph / Node / Linkagent 的静态结构:节点(块实例)+ 连线(输出 pin→输入 pin)data/graph.py:385 GraphModeldata/graph.py:104 Nodedata/graph.py:82 Link
ExecutionManager长驻进程,从 RabbitMQ 消费图执行请求,丢进线程池executor/manager.py:1353 ExecutionManager
ExecutionProcessor真正跑一张图的逻辑:ready 队列调度循环executor/manager.py:551 ExecutionProcessor
动态 pin 编解码把「输出对象的某个字段」精确路由到「下游某个输入字段」data/dynamic_fields.py:152 parse_execution_output
计费预扣 + 事后按真实 token/时长对账executor/manager.pyexecutor/billing.py
block 注册表启动时扫描 blocks/ 目录,自动发现所有 Block 子类blocks/__init__.py:17 load_all_blocks

2.3 主线走一遍(高层,不进代码)

  1. 画图阶段(写时): 用户在前端连积木,后端把它存成 AgentGraph + 一堆 AgentNode + AgentNodeLink
  2. 触发(读时入口): 手动点击 / cron / webhook,三条路最终都汇到同一个函数 add_graph_execution(executor/utils.py:1171)——校验、建执行记录、丢队列。
  3. 派发: ExecutionManager 从 RabbitMQ 取到请求,提交给线程池;每个图占一个 worker 线程。
  4. 调度: _on_graph_execution 维护一个节点 ready 队列:把起始节点入队,然后不断「取节点 → 异步执行 → 收集输出 → 把输出路由给下游、下游输入齐了就入队」,直到队列空。
  5. 执行一个节点: 校验输入 → 取凭据(加锁,防同一凭据并发) → 调 block.execute() → 块以异步生成器逐个 yield (输出名, 数据)
  6. 结算: 每个节点跑完按真实用量对账计费;整图跑完更新状态、生成「活动摘要」、发通知。

3. 阅读地图(按这个顺序读)

这是个大项目,拆成四章,由浅入深:

  1. 01-block-contract.md — 积木的契约。 先搞懂最小单元:一个 Block 长什么样、输入/输出怎么用 Pydantic schema 定义、execute() 这条校验管线做了哪些事(schema 校验、人审拦截、错误包装)。读完你能自己写一个块。

  2. 02-graph-and-dataflow.md — 图与数据流。 块怎么连成图;最巧的部分:扁平化动态 pin——怎么把「上游输出对象里的 result.items[0].name」这种深层路径,编码成一个字符串 pin 名,精确路由到下游某个输入字段。

  3. 03-execution-engine.md — 执行引擎(心脏)。 整个 ready-driven 调度循环:跨进程队列、单图的节点队列、并发节点、输出如何驱动下游入队、静态 pin 的特殊处理、取消与超时。

  4. 04-billing-safety-nesting.md — 深水区三件套。 计费的「预扣 + 事后对账」模型;dry-run 用 LLM 仿真整张图而不真花钱;敏感动作的人在回路审核;以及 agent 调 agent 的嵌套执行怎么把子图开销卷回父图。

只想要「它怎么把一张图跑起来」的最短路径:读 §2.1 → 直接跳 03-execution-engine.md