Langfuse — 架构与原理
30 秒导读: Langfuse 是一个开源的「LLM 工程平台」。你的 AI 应用每调用一次大模型,就往 Langfuse 发一条记录(叫 trace/observation);Langfuse 把这些记录可靠地存进列式数据库 ClickHouse,然后在网页上给你看每次调用的输入/输出/耗时/花了多少钱,还能自动用「另一个大模型当评委」给每条记录打分。本文讲清楚:一条记录从 SDK 发出到落库、再到被自动评测打分,中间到底发生了什么。
1. 这是什么(零基础也能懂)
一句话定义: Langfuse 是给 LLM 应用用的「飞行记录仪 + 自动质检台」——记录每次模型调用的全过程,并能在记录上自动跑评测。
它解决谁的什么问题。 假设你写了一个客服机器人,背后串了好几次大模型调用(先检索、再总结、再生成回复)。线上出问题时你想知道:
- 这次对话里,模型到底吃进了什么 prompt、吐出了什么?
- 哪一步慢?这次调用烧了多少 token、花了多少钱?
- 模型的回答质量怎么样——能不能自动给它打个分,而不是人工一条条看?
Langfuse 就是接住这些问题的地方。
它能做什么(核心功能):
| 功能 | 白话 |
|---|---|
| 可观测性(Tracing) | 记录每次 LLM 调用及其上下游步骤,网页上可视化时间线 |
| 评测(Evaluations) | 自动给记录打分:LLM-as-judge、代码评测、人工标注、用户反馈 |
| 数据集 / 实验 | 攒一批测试样例,反复跑你的应用做基准对比 |
| Prompt 管理 | 版本化管理 prompt,带缓存 |
| 成本核算 | 按模型单价 × token 数,自动算每次调用的花费 |
用起来什么样。 应用侧用 SDK 埋点,SDK 在后台把事件批量 POST 到 Langfuse:
# 示意,非源码 —— 应用侧最小用法
from langfuse import observe, get_client
@observe() # 装饰器:自动记一条 trace
def answer(question: str) -> str:
client = get_client()
# ... 调用 OpenAI / Anthropic ...
client.update_current_observation(
input=question, output="...",
model="gpt-4o", usage={"input": 120, "output": 80},
)
return "..."
# SDK 把这些事件攒成一批,后台 POST /api/public/ingestion
服务端收到的不是「一条最终记录」,而是一串增量事件(创建、若干次更新),Langfuse 负责把它们合并成数据库里的一行。这一点是理解整个系统的钥匙。
一句话直觉: 把 Langfuse 想成「LLM 版的 Sentry + Datadog」——只是它记录的不是异常和 CPU,而是每次模型调用的 prompt、输出、token、成本,而且自带一个能用大模型当评委的自动质检流水线。
2. 顶层全景(它大概怎么转)
Langfuse 是一个 pnpm monorepo,真正的运行体是两个 Node 服务,外加三个有状态后端:
| 部件 | 干什么 | 在哪 |
|---|---|---|
web | Next.js 应用:对外的 HTTP API(含 /api/public/ingestion)+ 前端 UI | web/ |
worker | 后台进程:消费各种 BullMQ 队列,做合并落库、评测、导出、清理 | worker/ |
@langfuse/shared | web 与 worker 共享的领域逻辑(ClickHouse 读写、合并、模型匹配、队列定义) | packages/shared/ |
| PostgreSQL | 配置类数据:项目、API key、评测配置、prompt、模型单价(Prisma 管) | — |
| ClickHouse | 海量轨迹数据:traces / observations / scores(列式,扛分析查询) | — |
| Redis | BullMQ 队列底座 + 各种缓存(模型 匹配、已处理去重、no-config) | — |
| S3 / Blob | 事件原始 JSON 的长期存储,也是合并时的「真相来源」 | — |
主线:一条 trace 怎么落库 + 怎么被评测(高层,先不进代码):
应用 SDK 批量 POST 事件
│
▼
[web] /api/public/ingestion ── 鉴权 / 限流 / 校验
│
│ ① 把每个事件的原始 JSON 写进 S3(长期存 + 当缓存)
│ ② 给每个实体往 Redis「分片队列」塞一个轻量任务
▼
[Redis] ingestion-queue(按 projectId-entityId 哈希分片)
│
▼
[worker] IngestionService
├─ 按前缀把该实体的所有事件从 S3 拉回来
├─ 时间排序后逐个合并成「一行」(create 打底,update 覆盖)
├─ 补算:模型匹配 → token 计数 → 成本
├─ 攒进 ClickhouseWriter 的内存批,定时刷盘到 ClickHouse
└─ 若该 project 配了评测 → 往 trace-upsert 队列再塞一个任务
│
▼
[worker] 评测引擎(evalService)
├─ 取该 project 的评测配置,按 filter 判断这条 trace 是否要评
├─ 从 ClickHouse 把 trace 的字段抽成模板变量
├─ 用「评委大模型」跑 LLM-as-judge,拿到结构化分数
└─ 把分数当成一条新 score 事件回写 → 又走一遍 ingestion(自循环要防!)
看懂这张图,你就掌握了 Langfuse 的「大盘」:一切都是异步的、事件驱动的;写路径刻意把「快速接收」和「慢速合并」拆开,中间用 S3 + Redis 解耦。
3. 为什么这样设计(三个一句话直觉)
在钻进各章之前,先建立三个核心直觉,它们贯穿全篇:
-
为什么先写 S3 再排队,而不是直接写库? LLM 流量是突发的、批量的;直接同步写 ClickHouse 会把接收端拖垮。先把原始 JSON 落到 S3(便宜、抗压),HTTP 立刻返回 207;真正的合并落库交给 worker 慢慢做。S3 同时是「这条实体的全部事件」的真相来源,worker 合并时按前缀一次性捞回。
-
为什么一行记录要「合并」出来? SDK 是流式上报的:先发 create,模型还没答完就先发了一半,答完再发 update 补 output/usage。同一个 observation id 会对应多条事件。Langfuse 的不变量是:create 打底、按时间排序后逐条覆盖、不可变字段(id/project/时间戳)永远以最早的为准。
-
为什么评测要「防自循环」? 评测本身要调大模型,这次调用又会被记成一条 trace;如果不拦,这条评测 trace 又触发评测……无限循环。Langfuse 用
langfuse-环境前缀给内部 trace 打标,在评测入口直接跳过它们。