跳到主要内容

第 2 章:Workflow 图运行时(2.0 的招牌)

本章讲 ADK 2.0 最核心的新东西:一切皆节点 + 图调度循环。读完你能回答:edges=[("START", a, b)] 到底怎么变成「a 跑完触发 b」的执行?路由、扇出扇入、循环又是怎么从同一套机制里长出来的?

2.1 一切皆 BaseNode

BaseNode(workflow/_base_node.py:39)是所有可执行单元的基类——agent、工具、普通函数、子工作流都是它的子类或被包装成它。它定义了两个关键方法:

  • run()(_base_node.py:196,@final) —— 对外的统一入口,不可重写。它做输入校验、调子类的 _run_impl、再把每个 yield 归一化Event
  • _run_impl()(_base_node.py:232) —— 子类重写的「真正干活」处。可以 yield 任意东西。

归一化规则是理解整个系统的钥匙(_base_node.py:217-230):

_run_impl yield 的东西run() 把它变成
None跳过
Event原样传出(若带 output 则按 output_schema 校验)
RequestInput转成「请求人类输入」的中断 Event
其它任意值 xEvent(output=x)

这就是为什么样例里的函数节点可以直接 return node_input.upper()(裸值→自动包成 output 事件),也可以 yield Event(route=...)(显式控制路由)——两种写法走的是同一个归一化漏斗。

BaseNode 还自带一批通用旋钮:retry_config(重试)、timeout(超时)、input_schema/output_schema/state_schema(校验)、wait_for_output(见 JoinNode)、rerun_on_resume(恢复时是否重跑)。这些对 agent、函数、子图一视同仁,因为它们都是 BaseNode

START 是个哨兵节点(_base_node.py:255),name='__START__',永不执行,只用来标记图的入口。

2.2 edges DSL 怎么编译成 Graph

你写 edges=[...],Workflow.model_post_init(_workflow.py:171)在构造时调 Graph.from_edge_items(_graph.py:312)把它编译成校验过的有向图。

DSL 的几种写法(都映射到 Edge,_graph.py:62):

你写的含义
("START", a, b)链:START→a→b(相邻元素自动连边)
(a, (b, c, d))扇出:a 完成后同时触发 b、c、d
((b, c, d), join)扇入:b、c、d 都触发 join
(router, {"x": a, "y": b})路由:router 产出 route="x" 走 a,"y" 走 b
Edge(from_node=a, to_node=b, route="x")显式边对象

裸函数 / 裸 agent 出现在 edges 里会被 build_node 自动包成节点(_graph.py:94 导入)。Graph 构造后会跑一串校验(validate_graph,_graph.py:562):连通性、START 存在、重复节点名、DEFAULT_ROUTE 约束、无条件环检测等。

路由匹配。 get_next_pending_nodes(_graph.py:342)按节点产出的 route 值挑边:精确匹配 route,或落到 DEFAULT_ROUTE(_graph.py:97,值是 "__DEFAULT__")兜底。route 值类型限定为 bool | int | str(RouteValue,_graph.py:41)。

2.3 核心机制:Workflow._run_impl 调度循环

这是全章的心脏。WorkflowBaseNode,它的 _run_impl(_workflow.py:241)本身就是图调度循环——分三段:SETUP → LOOP → FINALIZE。

直觉:触发器缓冲(trigger buffer)。 调度不是「递归调用下游」,而是一个邮箱模型:每个节点有个收件箱(trigger_buffer,_workflow.py:108)。某节点完成 → 往它下游节点的收件箱投递触发器;循环不断「把收件箱里有信的、且空闲的节点」启动起来。这样并发、扇出、汇合、循环都用同一个机制表达。

┌──────────────── LOOP(while True)──────────────┐
│ │
SETUP │ _schedule_ready_nodes: │
┌─────┐ │ 扫 trigger_buffer,把「有触发器且空闲」的节点 │
│seed │──┼──▶ 用 NodeRunner 起成 asyncio.Task │
│START│ │ │ │
│后继 │ │ ▼ │
└─────┘ │ asyncio.wait(FIRST_COMPLETED) 等任一完成 │
│ │ │
│ ▼ │
│ _handle_completion: │
│ 节点完成 → 按图的边给下游投递触发器 │
│ (扇出/路由/扇入在这里决定) │
│ │ │
│ 没有 pending_tasks 了 ─┐ │
└────────────────────────────┼───────────────────┘

FINALIZE:收集终端节点输出

SETUP(_workflow.py:256-275):建一个 _LoopState(每次执行独立、不持久化),扫历史事件用于恢复(_scan_child_events),给 START 的后继投递初始触发器(_seed_start_triggers,_workflow.py:391)。

LOOP(_run_loop,_workflow.py:301):

# 真实源码节选 workflow/_workflow.py:314-323
while True:
self._schedule_ready_nodes(loop_state, ctx)
if not loop_state.pending_tasks:
break
done, _ = await asyncio.wait(
loop_state.pending_tasks.values(),
return_when=asyncio.FIRST_COMPLETED,
)

重点看:_schedule_ready_nodes 把就绪节点变成并发 asyncio.Task,然后 asyncio.wait(FIRST_COMPLETED) 等任意一个先完成——这就是并发的来源

节点完成后投递下游(_buffer_downstream_triggers,_workflow.py:673):用 get_next_pending_nodes 找出该节点 route 对应的下游,给每个下游的收件箱投触发器。若下游是「需要所有前驱」的节点(_requires_all_predecessors),则等所有前驱都 COMPLETED 才投递,并把各前驱输出打包成 dict(_workflow.py:692-715)——这就是 fan-in。

2.4 从同一循环里长出来的特性

下面这些「特性」其实都不是单独代码,而是上面调度循环的特例。

2.4.1 顺序与扇出扇入

顺序就是链 ("START", a, b, c)。扇出扇入看样例 workflows/fan_out_fan_in/agent.py:

# 真实源码节选 contributing/samples/workflows/fan_out_fan_in/agent.py
root_agent = Workflow(
name="root_agent",
edges=[(
"START",
(make_uppercase, count_characters, reverse_string), # 扇出:三个并行
join_node, # 扇入:汇合
aggregate,
)],
)

join_nodeJoinNode(workflow/_join_node.py:41),它重写 _requires_all_predecessors 返回 True(_join_node.py:47),所以调度循环会等三个并行节点全完成、把它们输出打包成 dict(键是节点名)再触发它。aggregate 里就能 node_input['make_uppercase'] 取各分支结果。

2.4.2 条件路由

看样例 workflows/route/agent.py:一个函数节点 yield Event(route=category.category),然后用 routing map 决定走向:

# 真实源码节选 contributing/samples/workflows/route/agent.py
edges=[
("START", process_input, classify_input, route_on_category),
(route_on_category, {
"question": answer_question,
"statement": comment_on_statement,
"other": handle_other,
}),
]

节点设 ctx.route(或 yield Event(route=...)),routeContext 的属性(agents/context.py:367),调度循环据此用 get_next_pending_nodes 选边。

2.4.3 循环

循环 = 一条指回上游的路由边。看 workflows/loop/agent.py:

# 真实源码节选 contributing/samples/workflows/loop/agent.py
edges=[
("START", process_input, generate_headline, evaluate_headline, route_headline),
(route_headline, {"unrelated": generate_headline}), # 不合格 → 回到 generate_headline
]

评估不通过就路由回 generate_headline,直到合格(route 不再命中这条边)才往下走。没有「Loop 节点类型」——循环只是图里的一条回边。(注:老的 LoopAgent/SequentialAgent/ParallelAgent 仍在,但已 @deprecated,见 agents/sequential_agent.py:48,官方建议改用 Workflow。)

2.4.4 并发上限

Workflow.max_concurrency(_workflow.py:156)限制图调度的并行节点数。注意它只管图边触发的节点;通过 ctx.run_node() 动态调起的节点不受限(_at_concurrency_limit,_workflow.py:442),否则会死锁(父节点 inline 等子节点)。

2.4.5 动态节点(ctx.run_node)

除了静态图边,节点在运行中可以动态调起另一个节点:ctx.run_node(some_node, node_input=...)(agents/context.py:422),await 拿到它的输出。文档明确警告:必须直接 await,不要包 asyncio.create_task,否则错误被吞、父节点中断时子任务不会被取消(context.py:441-445)。动态节点由 DynamicNodeScheduler 管理(类定义在 workflow/_dynamic_node_scheduler.py:109;Workflow 侧的接线点是 _make_schedule_dynamic_node,_workflow.py:623,它构造出这个调度器),这是 Task API 委派的底层(第 4 章)。

2.4.6 重试与超时

retry_configtimeoutBaseNode 字段,由 NodeRunner 统一执行:超时用 _run_node_loop_with_timeout(_node_runner.py:275),超时抛 NodeTimeoutError(workflow/_errors.py),可被 retry_config 重试(_attempt_retry,_node_runner.py:172)。样例见 workflows/retry/agent.py

2.5 NodeRunner:跑「一个节点的一次执行」

调度循环只决定「该跑谁」,真正跑一个节点的是 NodeRunner(workflow/_node_runner.py:49)。它的职责:

  • 建子 Context(_create_child_context,_node_runner.py:190):派生 node_path、隔离作用域(isolation_scope)。
  • 执行节点、处理超时/重试(_execute_node,_node_runner.py:242)。
  • 把节点产出的 Event 富化并入队:盖 author、盖 isolation_scope(_enrich_event,_node_runner.py:403),把 ctx.state 增量刷进事件(_flush_deltas,_node_runner.py:381)。
  • 处理 use_as_output(_node_runner.py:101):某节点输出同时代表父节点输出时,抑制重复的输出事件。

节点状态机(NodeStatus,workflow/_node_status.py:22):PENDING → RUNNING → COMPLETED,或中途 WAITING(等输入/等汇合)、FAILEDCANCELLED。调度循环用这些状态决定能否给节点投新触发器(_schedule_ready_nodes 里跳过 RUNNING 和「有未决中断的 WAITING」节点,_workflow.py:424-430)。

2.6 函数节点的参数绑定(为什么函数能直接读 state)

样例里函数节点既能 def f(node_input: str) 又能 def f(ctx) 还能 def f(appended_text: str)(自动从 state 注入)。这由 FunctionNode._bind_parameters(workflow/_function_node.py:323)实现:

  • 参数叫 ctx → 注入 Context;叫 node_input → 注入上游输入。
  • 其它参数:parameter_binding='state'(默认)从 ctx.state 取同名键;='node_input'node_input dict 取(用于把函数当 agent 工具时)。

workflows/state/agent.pyread_state_via_param(appended_text: str)——appended_text 自动从 state 注入。Workflow._validate_state_schema(_workflow.py:213)还会在构造期校验:函数节点参数必须在 state_schema 里声明,否则报错。

2.7 巧妙之处

  • _run_impl 即调度器。 Workflow 不是「调度器对象 + 节点对象」两层,而是「Workflow 本身是个节点,它的 _run_impl 就是调度循环」——于是子工作流天然可嵌套(子图也是个节点,被外层当普通节点跑)。
  • 邮箱(trigger buffer)统一了所有控制流。 顺序、并行、路由、循环、汇合,全是「给收件箱投触发器 + 起就绪节点」的不同投递模式,没有为每种控制流写专门引擎。
  • 确定性回放排序。 多任务同时完成时 asyncio.wait 返回的是无序集合,Workflow 用恢复时记录的完成顺序(ReplaySequenceBarrier + recovered_sequence,_workflow.py:355)重排,保证恢复执行的处理顺序与历史一致——这是「可恢复」的关键细节。
  • 归一化漏斗。 函数随便 return 个值或 yield Event(...),BaseNode.run() 统一收口成 Event(_base_node.py:217),开发者心智负担极低。

2.8 边界与坑

  • task 模式的 LlmAgent 不能当静态图节点(_validate_no_task_mode_graph_nodes,_workflow.py:184),因为调度器目前在重入时会覆盖 node_input,任务简报会丢;只能作为 chat 协调者的子 agent 或经 ctx.run_node 动态调起(第 4 章)。
  • wait_for_output=True 的节点若一直不产出 output/route,会永久 WAITING(死锁)——文档称之为用户配置错误(_base_node.py:71)。
  • 一个 workflow 至多一个终端输出;多个终端节点都产出会报错(_finalize,_workflow.py:808)。

2.9 接着读

图怎么跑懂了,但「谁触发这一切、事件落哪、怎么暂停恢复」在运行时外层——见 03-runner-events-sessions.md