跳到主要内容

节点基类、节点类型与全局状态

前几章讲「图怎么编译、怎么停续」。本章下到节点层:一个节点执行时经历什么、13 种节点都是谁、节点之间怎么传数据。

1. 节点的统一执行骨架

所有节点继承 BaseNode(nodes/base.py:20)。它把「执行一个节点」固化成一条统一流水线,各节点只需实现自己的 _run:

run(state): # nodes/base.py:189
① 若被用户 stop → 抛 IgnoreException
② 若执行次数 ≥ max_steps → 抛 IgnoreException(防循环跑飞)
③ 回调 on_node_start
④ result = self._run(exec_id) # ← 子类实现的真正逻辑
⑤ 把 result 的每个 key 写进全局变量池
⑥ current_step += 1
⑦ finally: 回调 on_node_end(带日志)

对应源码骨架:

# nodes/base.py:189-223(节选)
def run(self, state):
if self.stop_flag: raise IgnoreException('stop by user')
if self.current_step >= self.max_steps: raise IgnoreException('... max times')
self.callback_manager.on_node_start(...)
result = self._run(exec_id)
if result:
for key, value in result.items():
self.graph_state.set_variable(self.id, key, value) # ← 输出进变量池
self.current_step += 1
...
self.callback_manager.on_node_end(...)
return state

两个要点:

  • max_steps 在节点级强制(nodes/base.py:196)。 配合图级 recursion_limit(第 01 章),双重防护循环失控。
  • 节点输出统一进变量池(nodes/base.py:210-212)。 节点不直接互相调用,而是「我把结果存到 变量池[我的id][key],下游自己来取」——彻底解耦。

注意 run 返回的 state 原样透传:BISHENG 没把业务数据塞进 LangGraph 的 state(那只是个占位的 TempState,graph_engine.py:22),真正的数据流全走自己的 GraphState 变量池。

2. 13 种节点速览

节点干什么文件
start初始化时间/聊天历史/预设问题/用户信息,发开场白nodes/start/start.py
end终点,连到 LangGraph 的 ENDnodes/end/end.py
input等用户输入(对话或表单),解析上传文件nodes/input/input.py
output给用户发消息;可要求用户填写/选择(中断)nodes/output/output.py
llm调用大模型,支持单条/批处理、流式输出nodes/llm/llm.py
agent带工具的 agentnodes/agent/agent.py
condition按条件路由到不同分支nodes/condition/condition.py
code执行用户写的 Python main()nodes/code/code.py
rag检索增强生成nodes/rag/rag.py
knowledge_retriever知识库检索nodes/knowledge_retriever/...
qa_retrieverQA 库检索nodes/qa_retriever/...
tool调用工具nodes/tool/tool.py
report用变量填充 docx 模板生成报告nodes/report/report.py

(另有 note 注释节点和 fake_output 内部节点,不算可执行业务节点。)

3. 全局变量池:节点间怎么传数据

它要解决的小问题: LLM 节点要用「输入节点收到的那句话」,怎么拿?

思路: 一个共享的 GraphState(graph/graph_state.py:8),核心是个二级字典 variables_pool: {node_id: {key: value}}。每个节点把输出存进 自己id 名下,任何节点都能用 node_id.key 取别人的输出。

# graph/graph_state.py:43-56(节选)
def set_variable(self, node_id, key, value):
self.variables_pool.setdefault(node_id, {})[key] = value

def get_variable(self, node_id, key, count=None):
if key == 'chat_history':
return self.get_history_memory(count=count) # 特例:聊天历史
return self.variables_pool[node_id].get(key)

取数支持 node_id.key#index 语法访问数组/字典元素(get_variable_by_str,graph/graph_state.py:58)——比如批处理输出的第 N 项。

聊天历史是变量池里的特殊公民:用 LangChain 的 ConversationBufferWindowMemory 存,在 start 节点初始化(nodes/start/start.py:25),按 chat_history 这个 key 取(graph/graph_state.py:54)。

4. 模板变量:prompt 里的 {\{#node.key#}}

用户在 prompt 里写 请总结:{{#input_1.user_input#}},引擎要把它替换成真实值。这靠 PromptTemplateParser(nodes/prompt_template.py:8):

# 示意,非源码:变量替换两步走
template = PromptTemplateParser("总结:{{#input_1.user_input#}}")
vars = template.extract() # ['input_1.user_input']
# 引擎对每个 var 去变量池取值,组成 map
filled = template.format({"input_1.user_input": "今天天气..."})

节点基类把这套封装成 parse_msg_with_variables(nodes/base.py:143):提取变量名 → 逐个去全局池取值 → 回填。匹配用正则 WITH_VARIABLE_TMPL_REGEX(nodes/prompt_template.py:4),支持 node.key 带点的层级名。

5. 条件节点:路由怎么算

条件节点是「分支」的执行体。它在 _run 里逐个 case 求值,命中就记下目标分支;route_node 把结果交给 LangGraph 的条件边(nodes/condition/condition.py:19-47):

# nodes/condition/condition.py:19-34(节选)
for one in self._condition_cases:
if one.evaluate_conditions(self): # 这个 case 成立?
next_node_ids = self.get_next_node_id(one.id)
break
if next_node_ids is None:
self._next_node_id = self.get_next_node_id('right_handle') # 兜底:else 分支

单个条件支持 13 种比较运算(等于/包含/正则/大小比较…),见 ConditionOne.compare_two_value(nodes/condition/conidition_case.py:31);多条件用 and/or 组合(evaluate_conditions,conidition_case.py:71)。right_handle 是硬编码的 else 出口约定。

6. 代码节点:exec 执行用户代码(注意安全边界)

code 节点让用户写 Python。它用 CodeParser(nodes/code/code_parse.py:7)把代码 AST 解析后,用 exec 编译并执行,再调用其中的 main():

# nodes/code/code_parse.py:87-92(节选)
def parse_functions(self, node):
compiled_func = compile(ast.Module(body=[node], ...), "<string>", "exec")
exec(compiled_func, self.exec_globals, self.exec_locals) # ← 直接 exec

安全边界(重要): 这里是exec,没有沙箱——没有限制 import、没有资源/时间隔离。代码里 parse_imports(code_parse.py:60)甚至会真的 importlib.import_module 用户写的任意模块。所以「代码节点」假定workflow 的作者是可信的;它不是给不可信用户跑任意代码的安全沙箱。这是部署时要清楚的前提。

代码节点的输入/输出靠声明式映射:_parse_code_input 按声明从变量池取参数喂给 main(),_parse_code_output 校验 main() 返回的 dict 含所有声明的输出 key(nodes/code/code.py:45-62)。

7. 巧妙之处与边界

  • 节点零直接耦合(nodes/base.py:210)。 全靠「输出进池、按名取数」,所以前端能随意改连线而不用改节点代码。
  • LangGraph state 只是空壳(graph_engine.py:22)。 真正的数据流在自管的 GraphState,把 LangGraph 当纯调度器用——好处是数据模型完全自控,坏处是没法直接用 LangGraph 的 state 持久化能力。
  • 边界:代码节点无沙箱(code_parse.py:91)。 见上,信任模型是「作者可信」。
  • 边界:聊天历史等现场在内存。 变量池和 memory 都挂在 GraphState 实例上,随 Workflow 对象进内存(见第 03 章),不跨进程持久化。

8. 代码地图

主题文件符号
节点执行骨架nodes/base.pyBaseNode.run, _run, handle_input
变量替换nodes/base.py, nodes/prompt_template.pyparse_msg_with_variables, PromptTemplateParser
全局变量池graph/graph_state.pyGraphState.set_variable, get_variable_by_str
节点工厂表nodes/node_manage.pyNODE_CLASS_MAP
条件求值nodes/condition/conidition_case.pyConditionCases.evaluate_conditions, ConditionOne.compare_two_value
代码执行nodes/code/code_parse.pyCodeParser.parse_functions, exec_method
节点类型枚举common/node.pyNodeType, BaseNodeData