跳到主要内容

第 3 章:Runner、Event、Session 与中断恢复

本章把运行时的「外壳」讲清:谁启动执行(Runner)、流经系统的数据长什么样(Event)、对话和状态存哪(Session/State)、以及 ADK 的杀手锏——暂停后从断点恢复人类介入(HITL)

3.1 Runner:对外入口

Runner(runners.py:132)是你直接调用的对象。主方法 run_async(runners.py:937)。它做的事:

  1. 取/建 Session(_get_or_create_session)。
  2. 决定根节点。若根是 chat 模式的 LlmAgent,可能先用 _find_agent_to_run(runners.py:1622)挑出「上次活跃、该继续接话的那个 agent」,再用 build_node 包成节点(runners.py:1004)。
  3. 驱动节点(_run_node_async),把产出的 Event 逐个 yield 给你。
  4. 落库:每个该持久化的事件经 session_service.append_event(runners.py:744)写入历史并应用状态增量。
# 真实源码节选 runners.py:982-1004(简化)
if isinstance(self.agent, LlmAgent):
if self.agent.mode is None:
self.agent.mode = 'chat' # 根 LlmAgent 默认 chat
if self.agent.mode == 'chat':
session = await self._get_or_create_session(...)
agent_to_run = self._find_agent_to_run(session, self.agent)
agent_to_run = build_node(agent_to_run) # 一切皆节点
else:
raise ValueError("LlmAgent as root agent must have mode='chat' ...")

重点看:LlmAgent 必须是 chat 模式(task/single_turn 只能当被委派方);而且无论根是 agent 还是 workflow,最终都 build_node 成节点走统一的节点运行路径(_run_node_async)。

InMemoryRunner(runners.py:2151)是开箱即用的子类,内置内存版 session/artifact/memory 服务,适合本地跑和测试。

3.2 InvocationContext vs Context

两个 context 容易混:

是什么作用域文件
InvocationContext一次完整调用(一条用户消息触发的整轮)的全局把手:session、agent、run_config、plugin_manager、是否可恢复…整个 invocationagents/invocation_context.py
Context一个节点的一次执行的把手:读写 state、设 output/routerun_node()、请求授权/确认单个节点执行agents/context.py:118

Context 内部持有 InvocationContext(get_invocation_context,context.py:411)。节点代码里你拿到的是 Context

3.3 Event:流经一切的统一数据单元

Event(events/event.py:91)继承 LlmResponse,是模型回复、工具调用、状态变更、节点输出、中断请求的统一载体。关键字段:

字段含义定义处
content模型/工具的内容(继承自 LlmResponse)LlmResponse
actionsEventActions:状态增量、route、transfer、是否结束 agent 等「副作用」events/event.py:112
output节点输出值(workflow 用)events/event.py:115
node_info节点路径/名字(图里定位用)events/event.py:118
long_running_tool_ids触发暂停的长时工具调用 idevents/event.py:121
invocation_id / author / branch属于哪次调用、谁产出的、在哪个分支event.py

便利构造。 Event 有个 model_validator(_accept_convenience_kwargs,events/event.py:159)允许你写 Event(state={...})Event(route="x")Event(node_path=...),它会自动塞进对应的 actions/node_info。这就是样例里 yield Event(state={"topic": x}) 能直接更新状态的原因——state= 被转成 actions.state_delta

is_final_response(event.py:275)决定 agentic 循环停不停(见第 1 章)。

3.4 Session 与 State

Session(sessions/session.py:28)就三样核心:events(历史事件列表)、state(共享键值)、加上 id/app_name/user_id

State 的命名空间。 state 键支持前缀,代表不同存储范围:

前缀范围
(无前缀)当前 session
app:整个 app 共享
user:该用户跨 session
temp:临时,不持久化

这些前缀在 BaseNode.state_schema 校验时会被跳过(_base_node.py:122),并由 session service 落到对应范围。

append_event 是状态更新的唯一通道。 BaseSessionService.append_event(sessions/base_session_service.py:154)既把事件加进历史,又调 _update_session_state(base_session_service.py:204)把事件里的 state_delta 应用到 session.state。也就是说:状态变更必须通过事件落地——这正是「可恢复」的基础(状态是历史事件重放的结果)。

存储后端可换:内存(in_memory_session_service.py)、SQLite(sqlite_session_service.py)、通用数据库(database_session_service.py)、Vertex(vertex_ai_session_service.py),都实现 BaseSessionService 接口。

3.5 核心机制:中断恢复(resume)

它要解决的小问题。 一个长流程跑到一半需要外部输入(人类批准、OAuth、异步工具结果)。我们不想把整个 Python 进程挂在那等。理想是:把执行干净地停下,记录足够信息,等条件满足后从断点继续

思路:状态即事件历史。 ADK 不序列化「Python 调用栈」,而是把执行进度编码进 session 的事件流。恢复时,带上同一个 invocation_id 再调一次 run_async,框架重放历史事件重建出「哪些节点已完成、哪些在 WAITING」,然后只从未完成处继续。

看 Workflow 怎么恢复(_workflow.py:256-275):SETUP 阶段调 _scan_child_events(_workflow.py:734)扫这次 invocation 的历史事件,重建每个子节点的状态(_reconstruct_node_states)和完成顺序;已完成的节点在调度时被「拦截」直接返回缓存结果(check_interception,_workflow.py:558),不重跑,只重跑真正未完成的。

rerun_on_resume(_base_node.py:57)控制单个节点恢复行为:False(默认)= 用用户的恢复输入直接当该节点输出、不重跑;True = 从头重跑该节点。

3.6 核心机制:人类介入(HITL)

HITL 是 resume 的一个直接应用。两条路径:

路径 A:RequestInput(图里显式请求输入)。 节点 yield RequestInput(message=...)(events/request_input.py),BaseNode.run() 把它转成一个中断事件(_base_node.py:224-227)。看样例 workflows/request_input/agent.py:

# 真实源码节选 contributing/samples/workflows/request_input/agent.py
def request_human_review(draft: str):
yield RequestInput(
message="Please review ... 'approve', 'reject', or feedback ..."
)

def handle_human_review(node_input: str): # 恢复时 node_input = 人类的回复
if node_input == "reject":
yield Event(route="rejected")
elif node_input == "approve":
yield Event(route="approved")
else:
yield Event(state={"feedback": node_input}, route="revise") # 改→回到 draft_email

人类回复后,带 invocation_id 重新 run_async,handle_human_review 拿到回复作为 node_input,用 route 决定批准/拒绝/返工。(注意:该样例文件顶部标了 # NOT WORKING YET,说明此提交下该路径可能仍在完善中。)

路径 B:长时工具(long-running tool)。 工具标 is_long_running=True 时,它的调用被收集进 long_running_tool_ids(flows/llm_flows/functions.py:274),is_final_response 因此返回 False、不结束循环,而是触发暂停。授权(adk_request_credential,functions.py:59)和工具确认走的也是这套「把外部依赖表达成一次工具调用 + 一次中断」的模式。Context 提供 request_credential(context.py:755)、request_confirmation(context.py:783)等把手。

可恢复性开关。 要让这一切生效,App 需配 resumability_config(apps/app.py:90);Runner 据此判断 is_resumable(runners.py:1064)。

3.7 巧妙之处

  • 状态 = 事件重放结果,而非快照。 不存调用栈、不存「当前指针」,只存 append-only 的事件流;恢复时重建。这让恢复天然幂等、可跨进程、可换存储后端。
  • 中断与外部依赖统一表达。 人类批准、OAuth、异步工具——全被建模成「一次工具调用 + long_running/中断」,共用一条暂停-恢复通路,而不是三套机制。
  • append_event 是状态的单一真相源。 状态只能经事件改,杜绝了「内存里改了但没记到历史」导致恢复后状态漂移。

3.8 边界与坑

  • 暂停判定有「只看最后 2 个事件」的窗口限制:一个长时工具后若跟多条文本响应,暂停检查可能不触发(base_llm_flow.py:955-957 的 NOTE)。
  • 恢复依赖事件里 node_info.path 等元信息齐全;_derive_node_path(context.py:68)在多轮恢复时会从静态 agent 树兜底重建父路径,深度上限 _MAX_PARENT_DEPTH = 50(context.py:50)。
  • 没开 resumability_config 就没有 resume/HITL 能力。

3.9 接着读

单 agent、图、运行时都齐了,最后一块是多 agent 协作:transfer 与 Task API——见 04-task-api-and-delegation.md