跳到主要内容

Graphiti 写入主线 — 一条 episode 如何变成图谱

本章讲清 add_episode 端到端怎么走。读完你能说出:一段原文进来,经过哪几步、每步交给谁、最后图里多了什么。去重和时序的细节留给第 2 章

1. 先认识「episode」这个词

Graphiti 不直接处理「文档」,而是处理 episode——一段带时间戳的原始数据。它有三种来源(EpisodeType,nodes.py:54-77):

来源内容长这样典型场景
message"user: 我换 Nike 了"(actor: content)对话
text一段纯文本文档、笔记
json结构化 JSON 字符串业务数据

每个 episode 落库成一个 EpisodicNode,它是所有派生事实的溯源源头(provenance):后面抽出的实体、事实都通过边指回它。它的关键字段是 content(原文)和 valid_at(这段数据「何时发生」,注意不是「何时被写入」)——见 nodes.py:318-332

2. 顶层:add_episode 的六步

入口是 Graphiti.add_episode(graphiti.py:980)。剥掉 tracing 和异常处理,它的主体就是顺序六步:

add_episode(name, episode_body, reference_time, ...)

├─ 0. 定 group_id(图分区)+ 校验 entity_types

├─ 1. previous_episodes = retrieve_episodes(...) # 拉最近历史当上下文

├─ 2. extracted_nodes = extract_nodes(...) # LLM 抽实体候选

├─ 3. nodes, uuid_map = resolve_extracted_nodes(...) # 实体去重 → 见第2章

├─ 4. resolved/invalidated/new_edges =
│ _extract_and_resolve_edges(...) # 抽边 + 事实去重/失效 → 见第2章

├─ 5. hydrated_nodes = extract_attributes_from_nodes(...) # 抽属性+摘要

└─ 6. _process_episode_data(...) # 建 episodic 边 + 批量落库

对应源码区间:group_id 处理在 graphiti.py:1073-1082,六步主体在 graphiti.py:1086-1179

下面逐步看。

3. 第 0 步:group_id —— 图的分区

group_id 是 Graphiti 的「命名空间 / 分区」:不同用户、不同租户的数据用不同 group_id 隔离。代码里它甚至直接被当成数据库名用:

# 示意,非源码(逻辑见 graphiti.py:1073-1082)
if group_id is None:
group_id = get_default_group_id(self.driver.provider) # 用默认库
else:
validate_group_id(group_id)
if group_id != self.driver._database:
self.driver = self.driver.clone(database=group_id) # 切到该分区

重点看: 提供 group_id 时,driver 会 clone 出一个指向该分区的副本(graphiti.py:1079-1082)。所有后续读写都在这个分区内。

4. 第 1 步:拉历史 episode 当上下文

抽取不是孤立看当前这段话,而是带上最近若干条历史 episode 作为背景,好让 LLM 解析「他」「那家公司」这类指代和相对时间。

# 示意,非源码(逻辑见 graphiti.py:1087-1096)
previous_episodes = (
await self.retrieve_episodes(reference_time, last_n=RELEVANT_SCHEMA_LIMIT, group_ids=[group_id], source=source)
if previous_episode_uuids is None
else await EpisodicNode.get_by_uuids(self.driver, previous_episode_uuids)
)

RELEVANT_SCHEMA_LIMIT = 10(search/search_utils.py:64),即默认带最近 10 条。你也可以显式指定 previous_episode_uuids

5. 第 2 步:抽实体(extract_nodes)

extract_nodes(node_operations.py:70)把原文 + 历史 + 实体类型定义打包成 context,按 episode 来源选不同 prompt(message / text / json,见 _call_extraction_llm,node_operations.py:255),让 LLM 吐出实体列表。

几个值得注意的设计:

  • 类型用 ID 而非名字传给 LLM。 _build_entity_types_context(node_operations.py:152)给每个类型编号,Entity 永远是 0 号,自定义类型从 1 号起。LLM 返回 entity_type_id,代码再映射回类型名(_create_entity_nodes,node_operations.py:301-313)。
  • 同段内精确同名先就地合并。 _collapse_exact_duplicate_extracted_nodes(node_operations.py:336)在出 LLM 后立刻把规范化后同名的节点折叠成一个,冲突时保留「更具体」的那个(Person 优先于裸 Entity)。这是去重前的廉价预处理。
  • 多 episode 抽取带归属。 当传入的是 episode 列表,每个实体会带 episode_indices 标明它来自哪几段(node_operations.py:104-111)。

这一步产出的是候选实体——还没和图里已有实体对齐,那是下一步的事。

6. 第 3 步:实体去重(resolve_extracted_nodes)

这步决定「我刚抽到的『Kendra』是不是图里已经存在的那个 Kendra」。它是 Graphiti 性能上的关键优化点,完整机制见第 2 章。这里只记结论:

  • 它返回 (nodes, uuid_map, duplicates)(node_operations.py:627)。
  • uuid_map 是「候选节点 uuid → 规范节点 uuid」的映射,后面抽出的边要用它把端点指向规范节点(resolve_edge_pointers)。

7. 第 4 步:抽关系(extract_edges)+ 事实去重

现在节点定了,在节点之间抽「事实」。extract_edges(edge_operations.py:117)把已确定的节点名喂给 LLM,让它在这些名字之间抽三元组(source → relation → target)。

两个硬约束值得看:

  • 端点必须是已知实体。 LLM 返回的 source_entity_name / target_entity_name 必须能在传入的节点列表里找到,否则丢弃(edge_operations.py:217-230)。这避免 LLM 凭空造点。
  • 丢自环。 source 和 target 解析到同一个节点的边被丢掉(edge_operations.py:232-240)。

抽出的边会带上 LLM 解析出的 valid_at / invalid_at(事实何时开始/结束为真),并解析成 UTC datetime(edge_operations.py:274-288)。

抽完之后,_extract_and_resolve_edges(graphiti.py:631)先用 resolve_edge_pointers 把边端点按 uuid_map 重指向规范节点,再调 resolve_extracted_edges事实去重 + 矛盾检测 + 时序失效——这块同样留给第 2 章。它返回三组边:

返回含义
resolved_edges去重后的所有边(可能复用了已有边)
invalidated_edges被新信息推翻、需要盖 invalid_at 的旧边
new_edges真正新增、图里原来没有的边

8. 第 5 步:抽节点属性与摘要

extract_attributes_from_nodes(node_operations.py:726)做两件事:

  1. 按类型抽结构化属性。 若节点匹配某个自定义 Pydantic 类型,就调 LLM 填那个类型的字段(_extract_entity_attributes,node_operations.py:783)。
  2. 生成 / 更新摘要。 summary 是「这个实体周边事实的浓缩」。这里有个省钱优化:若摘要 + 边事实拼起来不超过 MAX_SUMMARY_CHARS * 2,就直接拼接、不调 LLM;太长才走 LLM 批量摘要(_extract_entity_summaries_batch,node_operations.py:833-887)。

注意 add_episode 调用时只把 new_edges(而非全部边)传进来做摘要(graphiti.py:1160-1167),注释说明:避免把图里已存在的旧事实重复塞进摘要。

9. 第 6 步:落库

_process_episode_data(graphiti.py:680)收尾:

  • build_episodic_edgesMENTIONS 边,把每个实体连回它来源的 episode(edge_operations.py:52)。带 node_episode_index_map 时,实体只连它实际出现的那几段,而非所有段(edge_operations.py:78-92)。
  • 把 episode 的 entity_edges 字段填上本次所有边的 uuid;若 store_raw_episode_content=False 则清空原文(graphiti.py:721-724)。
  • add_nodes_and_edges_bulk 一次性把 episodes、episodic 边、实体节点、实体边全部批量写入(graphiti.py:726-733)。

如果传了 saga(把多段 episode 串成一条「故事线」),还会建 HAS_EPISODENEXT_EPISODE 边把它们链起来(graphiti.py:737-779)。

10. 回看整条流水线

原文 "Kendra loves Adidas" (valid_at=3月)
│ extract_nodes

候选: [Kendra(Person), Adidas(Org)]
│ resolve_extracted_nodes → uuid_map

规范节点: [Kendra#已有, Adidas#新]
│ extract_edges (在规范节点间抽)

候选边: Kendra --LOVES(valid_at=3月)--> Adidas
│ resolve_extracted_edges (去重 + 看是否推翻旧事实)

resolved + invalidated + new
│ extract_attributes (摘要/属性)

add_nodes_and_edges_bulk → 图里:节点 + RELATES_TO 边 + MENTIONS 边

关于并发与顺序(踩坑提醒): 文档字符串明确建议每条 episode 顺序 await、串行加入,因为后一条的去重要看到前一条的结果(graphiti.py:1056-1059)。要批量请用 add_episode_bulk(graphiti.py:1230),它在内存里先整体去重再落库。

下一章看 ingestion 里最硬的两块:去重与时序失效

代码地图

主题文件符号
写入门面graphiti_core/graphiti.pyGraphiti.add_episode
批量写入graphiti_core/graphiti.pyGraphiti.add_episode_bulk
抽实体graphiti_core/utils/maintenance/node_operations.pyextract_nodes_create_entity_nodes
同段折叠graphiti_core/utils/maintenance/node_operations.py_collapse_exact_duplicate_extracted_nodes
抽关系graphiti_core/utils/maintenance/edge_operations.pyextract_edges
建溯源边graphiti_core/utils/maintenance/edge_operations.pybuild_episodic_edges
抽属性/摘要graphiti_core/utils/maintenance/node_operations.pyextract_attributes_from_nodes
落库收尾graphiti_core/graphiti.pyGraphiti._process_episode_data
episode 数据模型graphiti_core/nodes.pyEpisodicNodeEpisodeType