跳到主要内容

第 04 章 · 动态控制流:Send 与 Command

本章讲什么: 静态边只能画固定的流向。但 agent 经常需要运行时才知道要跑什么——比如对这 N 个子任务各跑一遍(数量运行时才定),或根据结果跳到某节点并顺手改状态。LangGraph 用两个原语解决:SendCommand

4.1 Send —— 运行时 fan-out(map-reduce)

它要解决的小问题: 你有一个 subjects 列表,长度运行时才知道,想对每个 subject 并行跑同一个节点,各自带不同输入。静态边做不到(边在编译期就定死了)。

用起来什么样(types.py:683-708 官方示例,删减):

from langgraph.types import Send

def continue_to_jokes(state):
# 运行时为每个 subject 造一个 Send:跑 generate_joke,带不同 arg
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

builder.add_conditional_edges(START, continue_to_jokes)
# 两个 subject → generate_joke 被并行跑两次,结果聚合回主状态

Send 本身就是个轻量数据类:目标节点名 + 要发给它的参数(types.py:711-736):

class Send:
__slots__ = ("node", "arg", "timeout")
def __init__(self, node, arg, *, timeout=None):
self.node = node # 发给哪个节点
self.arg = arg # 发什么(可与主状态不同!)
self.timeout = TimeoutPolicy.coerce(timeout)

底层怎么跑? Send 不走 channel 触发那条路,而是排成 PUSH 任务。引擎有个内部 Topic 叫 TASKS 专门排队 Send;prepare_next_tasks 开头先消费它(_algo.py:442-466):

tasks_channel = channels.get(TASKS) # 内部 Topic,装本步要 PUSH 的 Send
if tasks_channel and tasks_channel.is_available():
for idx, _ in enumerate(tasks_channel.get()):
if task := prepare_single_task((PUSH, idx), ...): # PUSH = Send 任务
tasks.append(task)
# 之后才处理 PULL(被 channel 触发的常规节点)

所以一步的任务分两类:PUSH(Send 推来的)+ PULL(channel 触发的)。两者在同一超步并行执行、统一在栅栏处合并——map-reduce 的 map 是并行 PUSH,reduce 通常用一个带 reducer 的 channel(或 defer 节点)聚合。

精妙处: Send.arg 可以和主状态完全不同——map 出去的子任务带的是切片输入,不是整个状态。这让 fan-out 干净利落,子任务互不干扰。

4.2 Command —— 节点的多合一返回值

它要解决的小问题: 节点经常想既更新状态、又决定下一步去哪。传统做法要拆成节点写状态 + 条件边决定流向两处。Command 让节点在一个返回值里同时表达。

Command 的字段(types.py:758-784):

字段作用
update要写进状态的增量(和普通节点返回的 dict 同义)
goto下一步去哪:节点名 / 节点名序列 / Send / Send 序列
resumeinterrupt() 喂恢复值(第 03 章)
graph发给当前图(None)还是父图(Command.PARENT)

用起来什么样(示意,非源码,基于字段语义):

from langgraph.types import Command

def router(state) -> Command:
if state["score"] > 0.8:
return Command(update={"status": "ok"}, goto="finish") # 改状态 + 跳转
return Command(update={"retries": state["retries"] + 1}, goto="retry")

底层怎么处理? attach_node 里的 _get_updates 专门拆 Command(state.py:1450-1471,删减):

elif isinstance(input, Command):
if input.graph == Command.PARENT: # 发给父图的,本图不处理
return None
return [(k, v) for k, v in input._update_as_tuples() if k in output_keys]

goto_control_branch 机制(写 branch:to:目标 或排 Send)转成下一步的触发。Command.PARENT 是子图向父图上报的通道——子图节点可以用它跳到父图的节点,这是嵌套图(subgraph)协作的关键。

update 的灵活解析(types.py:793-806):它接受 dict、(key, value) 元组列表、带注解的 pydantic/dataclass,甚至单值(包成 __root__)。这让 Command 能适配各种状态 schema。

4.3 两者怎么配合

节点返回什么?
┌───────────────┬──────────────────┬─────────────────────┐
│ dict 增量 │ [Send(...), ...] │ Command(update,goto) │
▼ ▼ ▼
写状态 channel 排进 TASKS Topic 既写状态、又触发 goto
(PULL 触发后续) (下一步 PUSH 并行) (可跨图:Command.PARENT)
  • 要并行铺开 N 个同类子任务Send(map-reduce 的 map)。
  • 要在一个节点里改状态 + 有条件跳转Command(尤其适合无显式边的 agent 图)。
  • 要从子图跳回父图Command(graph=Command.PARENT, goto=...)
  • 要恢复一个 interruptCommand(resume=...)(第 03 章)。

4.4 关键细节 / 坑

  • Command(goto=...) 不需要预先 add_edge:这正是 edgeless graph(无边图)的玩法——所有流向都由节点返回的 Command 动态决定。add_node(..., destinations=...) 只用于画图渲染,不影响执行(state.py:410-412 的 warning)。
  • Send 的并行度受 channel 合并约束:多个 Send 落到同一无 reducer 的键会撞 LastValue 的一步一值限制(第 02 章),所以 map-reduce 的聚合键几乎总要带 reducer。
  • Command 是 ToolOutputMixin(types.py:759):意味着工具节点可以直接返回 Command,让工具调用结果顺带驱动控制流。