跳到主要内容

Langfuse — 架构与原理

30 秒导读: Langfuse 是一个开源的「LLM 工程平台」。你的 AI 应用每调用一次大模型,就往 Langfuse 发一条记录(叫 trace/observation);Langfuse 把这些记录可靠地存进列式数据库 ClickHouse,然后在网页上给你看每次调用的输入/输出/耗时/花了多少钱,还能自动用「另一个大模型当评委」给每条记录打分。本文讲清楚:一条记录从 SDK 发出到落库、再到被自动评测打分,中间到底发生了什么。

1. 这是什么(零基础也能懂)

一句话定义: Langfuse 是给 LLM 应用用的「飞行记录仪 + 自动质检台」——记录每次模型调用的全过程,并能在记录上自动跑评测。

它解决谁的什么问题。 假设你写了一个客服机器人,背后串了好几次大模型调用(先检索、再总结、再生成回复)。线上出问题时你想知道:

  • 这次对话里,模型到底吃进了什么 prompt、吐出了什么?
  • 哪一步慢?这次调用烧了多少 token、花了多少钱?
  • 模型的回答质量怎么样——能不能自动给它打个分,而不是人工一条条看?

Langfuse 就是接住这些问题的地方。

它能做什么(核心功能):

功能白话
可观测性(Tracing)记录每次 LLM 调用及其上下游步骤,网页上可视化时间线
评测(Evaluations)自动给记录打分:LLM-as-judge、代码评测、人工标注、用户反馈
数据集 / 实验攒一批测试样例,反复跑你的应用做基准对比
Prompt 管理版本化管理 prompt,带缓存
成本核算按模型单价 × token 数,自动算每次调用的花费

用起来什么样。 应用侧用 SDK 埋点,SDK 在后台把事件批量 POST 到 Langfuse:

# 示意,非源码 —— 应用侧最小用法
from langfuse import observe, get_client

@observe() # 装饰器:自动记一条 trace
def answer(question: str) -> str:
client = get_client()
# ... 调用 OpenAI / Anthropic ...
client.update_current_observation(
input=question, output="...",
model="gpt-4o", usage={"input": 120, "output": 80},
)
return "..."
# SDK 把这些事件攒成一批,后台 POST /api/public/ingestion

服务端收到的不是「一条最终记录」,而是一串增量事件(创建、若干次更新),Langfuse 负责把它们合并成数据库里的一行。这一点是理解整个系统的钥匙。

一句话直觉: 把 Langfuse 想成「LLM 版的 Sentry + Datadog」——只是它记录的不是异常和 CPU,而是每次模型调用的 prompt、输出、token、成本,而且自带一个能用大模型当评委的自动质检流水线。

2. 顶层全景(它大概怎么转)

Langfuse 是一个 pnpm monorepo,真正的运行体是两个 Node 服务,外加三个有状态后端:

部件干什么在哪
webNext.js 应用:对外的 HTTP API(含 /api/public/ingestion)+ 前端 UIweb/
worker后台进程:消费各种 BullMQ 队列,做合并落库、评测、导出、清理worker/
@langfuse/sharedweb 与 worker 共享的领域逻辑(ClickHouse 读写、合并、模型匹配、队列定义)packages/shared/
PostgreSQL配置类数据:项目、API key、评测配置、prompt、模型单价(Prisma 管)
ClickHouse海量轨迹数据:traces / observations / scores(列式,扛分析查询)
RedisBullMQ 队列底座 + 各种缓存(模型匹配、已处理去重、no-config)
S3 / Blob事件原始 JSON 的长期存储,也是合并时的「真相来源」

主线:一条 trace 怎么落库 + 怎么被评测(高层,先不进代码):

应用 SDK 批量 POST 事件


[web] /api/public/ingestion ── 鉴权 / 限流 / 校验

│ ① 把每个事件的原始 JSON 写进 S3(长期存 + 当缓存)
│ ② 给每个实体往 Redis「分片队列」塞一个轻量任务

[Redis] ingestion-queue(按 projectId-entityId 哈希分片)


[worker] IngestionService
├─ 按前缀把该实体的所有事件从 S3 拉回来
├─ 时间排序后逐个合并成「一行」(create 打底,update 覆盖)
├─ 补算:模型匹配 → token 计数 → 成本
├─ 攒进 ClickhouseWriter 的内存批,定时刷盘到 ClickHouse
└─ 若该 project 配了评测 → 往 trace-upsert 队列再塞一个任务


[worker] 评测引擎(evalService)
├─ 取该 project 的评测配置,按 filter 判断这条 trace 是否要评
├─ 从 ClickHouse 把 trace 的字段抽成模板变量
├─ 用「评委大模型」跑 LLM-as-judge,拿到结构化分数
└─ 把分数当成一条新 score 事件回写 → 又走一遍 ingestion(自循环要防!)

看懂这张图,你就掌握了 Langfuse 的「大盘」:一切都是异步的、事件驱动的;写路径刻意把「快速接收」和「慢速合并」拆开,中间用 S3 + Redis 解耦。

3. 为什么这样设计(三个一句话直觉)

在钻进各章之前,先建立三个核心直觉,它们贯穿全篇:

  • 为什么先写 S3 再排队,而不是直接写库? LLM 流量是突发的、批量的;直接同步写 ClickHouse 会把接收端拖垮。先把原始 JSON 落到 S3(便宜、抗压),HTTP 立刻返回 207;真正的合并落库交给 worker 慢慢做。S3 同时是「这条实体的全部事件」的真相来源,worker 合并时按前缀一次性捞回。

  • 为什么一行记录要「合并」出来? SDK 是流式上报的:先发 create,模型还没答完就先发了一半,答完再发 update 补 output/usage。同一个 observation id 会对应多条事件。Langfuse 的不变量是:create 打底、按时间排序后逐条覆盖、不可变字段(id/project/时间戳)永远以最早的为准。

  • 为什么评测要「防自循环」? 评测本身要调大模型,这次调用又会被记成一条 trace;如果不拦,这条评测 trace 又触发评测……无限循环。Langfuse 用 langfuse- 环境前缀给内部 trace 打标,在评测入口直接跳过它们。

4. 阅读地图(建议顺序)

按「写 → 读 → 评 → 精」的顺序读最顺:

  1. 01-ingestion-pipeline.md — 写路径前半段。processEventBatch 怎么把一批事件写 S3、按实体分片塞进 Redis 队列;worker 怎么把它们捞回来。先读这章建立「事件流」心智模型。
  2. 02-merge-and-write.md — 写路径后半段。IngestionService.mergeAndWrite 怎么把多条事件合并成一行、补算 token 与成本;ClickhouseWriter 怎么攒批、容错、防超大行。这是工程含量最高的一章。
  3. 03-evaluation-engine.md — 评测路径。trace 落库后怎么触发评测、配置怎么匹配、变量怎么抽、LLM-as-judge 怎么打分、分数怎么回写。
  4. 04-deep-dive-and-tricks.md — 精华与边界。把散落在各章的巧妙设计(分片队列、分区锁、no-config 缓存、Decimal64 钳制等)集中提炼,并给出局限与横向对比。

全文引用锚定到 commit f3625434。路径相对克隆根 langfuse/(本 shelf 内 id 前缀)。