跳到主要内容

AutoGPT — 执行引擎(ready 驱动的调度循环)

本章讲什么: 这是 AutoGPT 的心脏。一张图怎么从「躺在数据库里的结构」变成「真的跑起来」。分两层:跨进程的派发(RabbitMQ + 线程池),和单张图内的调度循环(ready 队列)。

1. 两层调度,一眼看清

进程级派发 │ 单图级调度
──────────────────────────────────────── │ ────────────────────────────────────────
add_graph_execution() ──▷ RabbitMQ │ _on_graph_execution(graph_exec):
(executor/utils.py:1171) │ queue ← 起始节点
│ │ while queue 非空:
ExecutionManager._consume_execution_run │ node = queue.get()
(manager.py:1460) │ charge_usage(node) # 预扣
│ 提交 │ 异步跑 on_node_execution(node)
ThreadPoolExecutor(num_graph_workers) │ 当节点产出 output:
│ 每图一线程 │ _process_node_output()
ExecutionProcessor.on_graph_execution ─────────┼──────────▶ _enqueue_next_nodes() # 路由给下游
(manager.py:812) │ 下游输入齐 → queue.add()
│ 队列空 → 整图结束、对账、出摘要

左边解决「多张图怎么分配到多个 worker」;右边解决「一张图内部的节点按什么顺序跑」。下面分别讲。

2. 进程级:从队列到线程池

2.1 入口收口:三条触发路汇成一个函数

手动运行、cron、webhook——三条触发路最终都调 add_graph_execution(executor/utils.py:1171)。它做三件事:校验图+输入+凭据(validate_and_construct_node_execution_input)、建 GraphExecution 数据库记录、把请求发布到 RabbitMQ 队列 graph_execution_queue_v2(executor/utils.py:969)。

设计点: 付费墙、并发上限这类闸门都放在这一个函数里(executor/utils.py:1225 is_user_paywalled),而不是分散在每个 HTTP 路由——这样 cron、webhook、copilot 工具调用等所有入口都自动受同一套约束。

2.2 消费与派发

ExecutionManager(manager.py:1353)是个长驻进程。它起两个消费线程:一个收「运行」消息(_consume_execution_run,manager.py:1460),一个收「取消」消息(manager.py:1432)。收到运行消息后,_handle_run_message(manager.py:1529)把这张图提交给线程池 executor(manager.py:1395,大小 = num_graph_workers)。每张图占一个 worker 线程,线程里跑一个 thread-local 的 ExecutionProcessor(manager.py:135)。

3. 单图级:ready 队列调度循环(重点)

核心全在 _on_graph_execution(manager.py:943)。它维护一个节点队列 ExecutionQueue[NodeExecutionEntry],然后是一个 while 循环。我把它拆成几个动作讲。

3.1 预填队列

不是只塞起始节点。它从数据库捞出所有状态为 RUNNING/QUEUED/TERMINATED/REVIEW 的节点执行,全部入队(manager.py:1006)。这一步让断点续跑成为可能:进程崩了重启,这张图能从「上次跑到哪」接着跑(on_graph_execution 里对各种历史状态都有「continuing where it left off」分支,manager.py:845 起)。

3.2 主循环:取一个、跑一个、收输出

# manager.py:1024 起(精简)
while not execution_queue.empty():
if cancel.is_set(): break
queued_node_exec = execution_queue.get()
# 1) 预扣费(dry-run 跳过),余额不足就优雅停
billing.charge_usage(node_exec=queued_node_exec, ...)
# 2) 异步把这个节点丢到 node_execution_loop 上跑
node_execution_task = asyncio.run_coroutine_threadsafe(
self.on_node_execution(node_exec=queued_node_exec, ...),
self.node_execution_loop)
# 3) 内层 poll:等输出冒出来,边出边路由给下游
while execution_queue.empty() and (running_node_execution or running_node_evaluation):
if output := inflight_exec.pop_output():
asyncio.run_coroutine_threadsafe(
self._process_node_output(output=output, ...), # → 入队下游
self.node_evaluation_loop)

几个关键设计:

  • 三条 event loop / 线程分工。 on_graph_executor_start(manager.py:794)起了两个独立的事件循环:node_execution_loop(真跑块)和 node_evaluation_loop(处理输出、路由下游)。主调度循环在 worker 线程里同步轮询,把活儿 run_coroutine_threadsafe 扔到这两个 loop 上。这样块的 IO 等待不会卡住调度。

  • 节点产出是流式的、边产边路由。 on_node_execution 把块 yield 的每个输出存进 NodeExecutionProgress(executor/utils.py:1410);主循环 pop_output() 一拿到就立刻路由,不等节点完全跑完。所以一个流式 LLM 块吐第一段就能触发下游。

  • 没活儿干时让出 CPU + 续租集群锁。 队列空、还有 inflight 时,time.sleep(0.1)cluster_lock.refresh()(manager.py:1175)——后者防止这张图的「我正在跑」分布式锁过期被别的 pod 抢走。

3.3 输出怎么变成下游的入队:_enqueue_next_nodes

这是 dataflow 真正「流」起来的地方(manager.py:389)。对当前节点的每条出边:

  1. parse_execution_output(见 02 章)按这条线的 selector 从输出里挖出要传的值;挖不到且 pin 名不匹配就跳过这条线(manager.py:439)。
  2. 原子地把这份数据 upsert 到下游节点「最早一个未完成的执行」里,或新建一个(manager.py:448 upsert_execution_input,外面套了 synchronized(...) 锁,防同一下游被并发重复入队)。
  3. 静态线补全:如果下游有静态输入 pin 还缺,用最近一次执行的输入补上(manager.py:457)。
  4. validate_exec 看下游输入是否凑齐(manager.py:479)。
    • 没凑齐 → log Skipped queueing,不入队(等其它入边把剩下的喂齐)。
    • 凑齐 → 入队,状态置 QUEUED(manager.py:490)。
  5. 静态线还要回头补已等待的执行:静态值刚到位,可能有一批之前因缺这个值而卡住(INCOMPLETE)的下游执行,把它们补全并重新入队(manager.py:506 起的循环)。

这一步就是「ready-driven」的本质:一个节点只有在所有必需输入都到齐时才入队执行。多入边的汇合点(join)靠的就是「每来一条边补一格,凑齐才放行」。

4. 执行一个节点的细节:execute_node

execute_node(manager.py:142)是真正调块前的「最后一公里」,做了几件容易忽略但很重要的事:

  • 凭据加锁取值。 对每个凭据字段,临执行前才去凭据库取真值,并取一个 Redis 锁(manager.py:296 creds_manager.acquire)。注释直说:一份凭据同一时刻只能被一个块用(manager.py:259),并发用同一 key 会串行化。finally 里保证释放锁(manager.py:366)。
  • 特殊块改写输入。 AgentExecutorBlock(子 agent)和 MCPToolBlock 需要把节点输入重新塑形(manager.py:197manager.py:204)。
  • dry-run 走仿真。 若是 dry-run 且该块没有专门的 dry-run 准备,就用 simulate_block 让 LLM 假装产出输出,不真调 API(manager.py:343,详见 04 章)。
  • 超时墙。 叶子计算块默认 30 分钟墙钟上限(blocks/_base.py:547 DEFAULT_BLOCK_EXECUTION_TIMEOUT_SECONDS);协调类块(AgentExecutor、AutoPilot)把 execution_timeout_seconds = None 显式豁免(blocks/agent.py:28),因为它们的子图各有自己的时限,外层卡表会误杀(manager.py:744)。

5. 取消、超时、失败:状态怎么收尾

循环结束后定最终状态(manager.py:1198):被取消 → TERMINATED;有错 → FAILED;有待人审 → REVIEW;否则 COMPLETED

  • 节点级失败分级(_on_node_execution,manager.py:764):ValueError 当「预期内的用户错」记 FAILED 但不报 Sentry;其它异常算「意外」会上报;CancelledErrorTERMINATED
  • 整图级未知错误会发限流的 Discord 告警(manager.py:1235)。
  • finally_cleanup_graph_execution(manager.py:1262)收尾。

6. 巧妙之处汇总

  • 同一个 add_graph_execution 收口所有入口的闸门(付费墙/并发),避免每个路由各管各的(executor/utils.py:1171)。
  • 预填队列 + 历史状态分支 = 天然的崩溃续跑(manager.py:839manager.py:1006)。
  • 边产边路由的流式驱动:LLM 块吐一段就能触发下游,而不是等整块跑完(NodeExecutionProgress.pop_output)。
  • join 用「每边补一格,凑齐放行」,不需要显式的依赖计数器(validate_execget_missing_input)。
  • 凭据 Redis 锁把「同一 key 并发」这个隐患在执行层堵死(manager.py:296)。

7. 边界与局限

  • 每图一个 worker 线程,单图内的并发靠 asyncio 而非多线程——CPU 密集的块会卡住该图的事件循环。
  • 「ready 队列」对环图友好,但是否终止不由引擎保证,取决于块逻辑。
  • 凭据串行化锁意味着「一个 agent 大量并发调同一个 API key 的块」会被强制排队。

8. 横向对比

  • Celery/Airflow 这类任务编排比:AutoGPT 没有预先算好的 DAG 拓扑序,而是运行时 ready 驱动——更像数据流虚拟机,天然支持动态分支和循环。
  • Temporal 的持久化工作流比:AutoGPT 靠「数据库里的节点执行状态 + 预填队列」实现续跑,粒度是节点,没有 Temporal 那种事件溯源级别的确定性重放。

9. 代码地图

主题文件符号
入口收口(校验+建记录+入队)autogpt_platform/backend/backend/executor/utils.pyadd_graph_execution
长驻消费/派发进程executor/manager.pyExecutionManager_consume_execution_run_handle_run_message
单图调度循环(心脏)executor/manager.pyExecutionProcessor._on_graph_execution
输出路由给下游executor/manager.py_enqueue_next_nodes_process_node_output
单节点执行(取凭据/超时/仿真)executor/manager.pyexecute_node
节点执行驱动 + 失败分级executor/manager.pyExecutionProcessor._on_node_execution
节点输出缓冲(流式)executor/utils.pyNodeExecutionProgress
输入凑齐判定executor/utils.pyvalidate_exec
超时默认值blocks/_base.pyDEFAULT_BLOCK_EXECUTION_TIMEOUT_SECONDS