第 04 章 · 动态控制流:Send 与 Command
本章讲什么: 静态边只能画固定的流向。但 agent 经常需要运行时才知道要 跑什么——比如对这 N 个子任务各跑一遍(数量运行时才定),或根据结果跳到某节点并顺手改状态。LangGraph 用两个原语解决:
Send和Command。
4.1 Send —— 运行时 fan-out(map-reduce)
它要解决的小问题: 你有一个 subjects 列表,长度运行时才知道,想对每个 subject 并行跑同一个节点,各自带不同输入。静态边做不到(边在编译期就定死了)。
用起来什么样(types.py:683-708 官方示例,删减):
from langgraph.types import Send
def continue_to_jokes(state):
# 运行时为每个 subject 造一个 Send:跑 generate_joke,带不同 arg
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
builder.add_conditional_edges(START, continue_to_jokes)
# 两个 subject → generate_joke 被并行跑两次,结果聚合回主状态
Send 本身就是个轻量数据类:目标节点名 + 要发给它的参数(types.py:711-736):
class Send:
__slots__ = ("node", "arg", "timeout")
def __init__(self, node, arg, *, timeout=None):
self.node = node # 发给哪个节点
self.arg = arg # 发什么(可与主状态不同!)
self.timeout = TimeoutPolicy.coerce(timeout)
底层怎么跑? Send 不走 channel 触发那条路,而是排成 PUSH 任务。引擎有个内部 Topic 叫 TASKS 专门排队 Send;prepare_next_tasks 开头先消费它(_algo.py:442-466):
tasks_channel = channels.get(TASKS) # 内部 Topic,装本步要 PUSH 的 Send
if tasks_channel and tasks_channel.is_available():
for idx, _ in enumerate(tasks_channel.get()):
if task := prepare_single_task((PUSH, idx), ...): # PUSH = Send 任务
tasks.append(task)
# 之后才处理 PULL(被 channel 触发的常规节点)
所以一步的任务分两类:PUSH(Send 推来的)+ PULL(channel 触发的)。两者在同一超步并行执行、统一在栅栏处合并——map-reduce 的 map 是并行 PUSH,reduce 通常用一个带 reducer 的 channel(或 defer 节点)聚合。
精妙处: Send.arg 可以和主状态完全不同——map 出去的子任务带的是切片输入,不是整个状态。这让 fan-out 干净利落,子任务互不干扰。