第 02 章 · 状态的真实载体:channel 与 reducer
本章讲什么: 你以为状态是一个 dict,其实引擎里每个状态键 都是一个独立 channel,每个 channel 自带多个写入怎么合并的规则(reducer)。本章讲清 channel 接口、5 种核心 channel 的合并语义,以及
StateGraph的边如何被编译成 channel 拓扑(含 fan-in 屏障)。
2.1 直觉:状态 = 一堆带合并规则的格子
回到第 01 章的 apply_writes:它把写入按 channel 分组,然后 channels[chan].update(vals)。合并逻辑不在引擎里,在每个 channel 自己身上。
所以 LangGraph 的状态模型是:
- 状态 schema(那个
TypedDict)里的每个键 = 一个 channel。 - 键上的
Annotated[类型, reducer]决定它用哪种 channel / 合并函数。 - 一步内对同一键的多个写入,交给那个 channel 的
update()决定怎么合。
这解释了第 01 章那个最小例子:x: Annotated[list, reducer] 让 x 成为一个用 reducer 累加的 channel,所以 {"x": [0.75]} 是追加,不是覆盖。
2.2 channel 的统一接口
所有 channel 实现 BaseChannel(channels/base.py:19-121)。核心四个方法:
| 方法 | 干什么 | 谁来调 |
|---|---|---|
update(values) | 收一步内若干写入,合并进自身;返回是否改变 | apply_writes,每步末 |
get() | 读当前值;空则抛 EmptyChannelError | 节点读状态时 |
consume() | 被订阅的任务跑过了的回调,可借此清空自己 | apply_writes 消费触发 |
finish() | 图要结束了的回调,defer channel 借此放行 | apply_writes 收尾 |
update 的 docstring 把语义说死了(channels/base.py:90-99):每步末尾对所有 channel 调用;更新顺序任意;没更新时用空序列调用。注意顺序任意:所以 reducer 必须是与顺序无关、或顺序已被上层定好的。
2.3 五种核心 channel(逐个看合并语义)
LastValue —— 默认:一步只能收一个值
没加 reducer 的键就是它。一步收到 >1 个值直接报错(channels/last_value.py:56-67):
def update(self, values):
if len(values) == 0:
return False
if len(values) != 1: # 一步多个写入 → 报错
raise InvalidUpdateError("...Can receive only one value per step...")
self.value = values[-1]
return True
坑: 两个并行节点同一步都写同一个无 reducer 的键,会撞 InvalidUpdateError。解决办法就是给键加 reducer。
BinaryOperatorAggregate —— reducer 的归宿
Annotated[list, operator.add] 这类编译成它。它把一步内多个值用二元操作逐个折叠(channels/binop.py:109-130,删减):
def update(self, values):
if self.value is MISSING: # 首个值作为初值
self.value, values = values[0], values[1:]
for value in values:
is_overwrite, ov = _get_overwrite(value)
if is_overwrite:
self.value = ov # Overwrite():绕过 reducer 直接覆盖
else:
self.value = self.operator(self.value, value) # 否则:折叠
return True
两个精妙点:支持 Overwrite(value=...) 原语绕过 reducer 强制覆盖(types.py:937-978);并且 __eq__ 对 lambda reducer 宽容处理(binop.py:40-48,因为所有 lambda 都叫 <lambda>,身份比较不可靠)。
add_messages —— 给消息列表的专用 reducer
agent 对话历史几乎都用它(MessagesState)。它不是简单 append,而是按消息 ID 去重/更新(graph/message.py:60-90 docstring):默认 append-only,除非新消息与已有消息 ID 相同则替换。这让流式更新同一条 AI 消息、删除某条消息成为可能。
Topic —— PubSub 列表,可选跨步累积
用于多个生产者往一个列表里塞的场景(channels/topic.py:23-94)。它把嵌套 list 自动 flatten,accumulate=False 时每步读完即清空(topic.py:77-85):
def update(self, values):
if not self.accumulate:
self.values = [] # 不累积 → 每步先清空
self.values.extend(_flatten(values)) # 嵌套 list 拍平后塞入
return updated
引擎用一个内部 Topic(TASKS)来排队 Send 任务(第 04 章)。
NamedBarrierValue —— fan-in 的栅栏
这是等所有上游都到齐才放行的关键(channels/named_barrier_value.py:13-81)。它记一个应到名单 names 和已到名单 seen,只有 seen == names 才 is_available:
def update(self, values): # 每个上游报到时写自己的名字
for value in values:
if value in self.names:
self.seen.add(value)
return updated
def is_available(self):
return self.seen == self.names # 全到齐才放行
def consume(self): # 放行后清空,准备下一轮
if self.seen == self.names:
self.seen = set()
return True
这正是 add_edge(["a", "b"], "c")(等 a 和 b 都完成才跑 c)的底层实现。
2.4 关键一跳:compile() 怎么把边译成 channel
这是把图翻译成 channel 拓扑的地方,在 CompiledStateGraph.attach_node / attach_edge。三条规则:
规则一:每个节点配一个入口触发 channel branch:to:<节点名>(graph/state.py:1512-1521):
branch_channel = _CHANNEL_BRANCH_TO.format(key) # "branch:to:节点名"
self.channels[branch_channel] = (
LastValueAfterFinish(Any) if node.defer # defer 节点:等 finish 才触发
else EphemeralValue(Any, guard=False) # 普通节点:用完即弃
)
self.nodes[key] = PregelNode(triggers=[branch_channel], ...) # 节点订阅这个 channel
所以激活一个节点 = 往它的 branch:to:X channel 写一下。EphemeralValue(短暂值)用完即弃,正好对应触发一次。
规则二:普通边 A→B = 让 A 写 B 的入口 channel(graph/state.py:1537-1545):
def attach_edge(self, starts, end):
if isinstance(starts, str): # 单源边 A→B
self.nodes[starts].writers.append(
ChannelWrite((ChannelWriteEntry(_CHANNEL_BRANCH_TO.format(end), None),))
) # A 完成时,写 branch:to:B → 触发 B
规则三:多源边(fan-in)[A,B]→C = 用屏障 channel(graph/state.py:1546-1561):
channel_name = f"join:{'+'.join(starts)}:{end}"
self.channels[channel_name] = NamedBarrierValue(str, set(starts)) # 屏障,名单=所有上游
self.nodes[end].triggers.append(channel_name) # C 订阅屏障
for start in starts:
self.nodes[start].writers.append( # 每个上游完成时
ChannelWrite((ChannelWriteEntry(channel_name, start),)) # 往屏障写自己名字
)
把三条串起来看一张图:
add_edge(START,"A") add_edge(["A","B"],"C")
┌──────┐ ┌──────┐ ┌──────┐
│START │ │ A │ │ B │
└──┬───┘ └──┬───┘ └──┬───┘
│ 写 branch:to:A │写 join │写 join
▼ ▼ :A+B:C ▼
┌──────┐ ┌─────────────────────┐
│ A │ 订阅 │ NamedBarrierValue │ 等 A、B 都报到
└──────┘ branch:to:A │ names={A,B} │ 才 is_available
└──────────┬──────────┘
│ 触发
▼
┌──────┐
│ C │ 订阅 join:A+B:C
└──────┘
2.5 defer:把节点推迟到图末尾再跑
add_node(..., defer=True) 的节点,用 LastValueAfterFinish / NamedBarrierValueAfterFinish(state.py:1514-1517、1549-1554)。这类 channel 即使收齐了值,也要等 apply_writes 末尾那个 finish() 信号(第 01 章动作⑤)才 is_available。
用 途: 经典 map-reduce 的 reduce 节点——你想等所有并行 map 分支彻底跑完(而不只是这一步到齐)才聚合,defer 就是干这个的。
2.6 关键细节 / 坑
- channel 按类型相等去重,不是按名字。
_add_schema里同名 channel 若类型不同会报错,但LastValue例外(state.py:354-364)——它被当作默认/可被覆盖。 - managed value 不进 channel 体系:像
is_last_step(managed/is_last_step.py)这类运行时算出来、不持久化的值走另一条路(managed/base.py),输入/输出 schema 里禁止它们(state.py:346-352)。 __root__单 channel 模式:状态 schema 不是 dict 而是单个类型时,整个状态就一个名为__root__的 channel(state.py:1505及compile里多处__root__分支)。