跳到主要内容

Phoenix 摄取管线:一条 span 的旅程

本章讲什么: 跟着一条 span,从它以 OTLP protobuf 字节到达,到变成数据库里的一行,中间每一步发生了什么、为什么这么设计。这是 Phoenix 的写路径,也是整个平台的心脏。

3.1 第一步:OTLP 解码——把扁平 key 还原成嵌套结构

要解决的小问题: OpenTelemetry 的 span 属性是扁平的键值对。一个 LLM 调用会发出这样的 key:

llm.token_count.prompt = 12
llm.token_count.completion = 8
llm.input_messages.0.message.role = "user"
llm.input_messages.0.message.content = "hi"

但 Phoenix 想要的是嵌套的 JSON(llm.input_messages 是个数组,每项是个对象)。所以解码的核心就是"扁平 → 嵌套"的还原。

思路: 接收层对每个 OTLP span 调 decode_otlp_span,它把字节解成扁平 dict,再交给 unflatten 还原。

真实实现在 src/phoenix/trace/otel.py:70 decode_otlp_span,最关键的一行:

# src/phoenix/trace/otel.py:84
attributes = unflatten(load_json_strings(coerce_otlp_span_attributes(raw_attributes.items())))

它做三件事:load_json_strings 把"看起来是 JSON 字符串"的值解析回对象;unflattena.b.0.c 这种点号路径重建成 {"a": {"b": [{"c": ...}]}};最终包成一个 Span 数据类返回(otel.py:96)。

巧妙之处——"数组只用于对象"规则: 还原数组时有个陷阱。tags.0tags.1 这种数字子键,到底是数组下标,还是恰好叫 "0"/"1" 的普通 key?Phoenix 的判断是:只有当这组项里包含 Mapping(对象)时,才当数组还原。原始基本类型数组(如一串字符串 tag)保持原样。

# src/phoenix/trace/attributes.py:149 has_mapping
def has_mapping(sequence: Iterable[Any]) -> bool:
for item in sequence:
if isinstance(item, Mapping):
return True
return False

注释说得很直白(attributes.py:156-161):OTel 语义约定里,数组通常装的是结构化对象(retrieval.documents[0]llm.messages[1]),不是 ["tag1","tag2"] 这种基本类型数组——这个判断保证了"扁平→嵌套→再扁平"的来回转换是无损的。unflatten 本体在 attributes.py:101,用一棵 trie 来重建路径。

另一个细节——OTel gen_ai 语义的桥接: 同一段 decode_otlp_span 里,会把任何 OTel 标准的 gen_ai.* 属性合成出对应的 OpenInference 属性,但用 setdefault,所以已存在的 OpenInference key 优先(otel.py:82-83)。这让 Phoenix 既吃自家 OpenInference 探针,也吃通用 OTel gen_ai 探针。

3.2 第二步:入队即返回——为什么不直接写库

要解决的小问题: span 可能高速涌入(一个 agent 一秒钟几十上百个)。如果每条都同步写一次数据库,接收 RPC 会被磁盘 I/O 拖死,还会产生大量小事务。

思路: 接收层解码完只把 span 塞进内存队列就返回,真正的写由一个后台任务批量做。看接收侧:

# src/phoenix/server/grpc_server.py:46-52 Servicer.Export
for resource_spans in request.resource_spans:
project_name = get_project_name(resource_spans.resource.attributes)
for scope_span in resource_spans.scope_spans:
for otlp_span in scope_span.spans:
span = await run_in_threadpool(decode_otlp_span, otlp_span) # 解码丢到线程池
await self._enqueue_span(span, project_name) # 只是入队

_enqueue_span 的实现朴素到极致——就是往 deque 追加(bulk_inserter.py:121):

# src/phoenix/db/bulk_inserter.py:121
async def _enqueue_span(self, span: Span, project_name: str) -> None:
self._spans.append((span, project_name))

直觉: 接收层像餐厅前台"收单",厨房(BulkInserter)按批出菜。前台从不亲自下厨,所以接单永远快。

3.3 第三步:后台批量写——一个循环、一个事务

机制: BulkInserter__aenter__ 时启动一个 asyncio 后台任务 _bulk_insert(bulk_inserter.py:102)。这个循环是写路径的发动机:

_bulk_insert 循环(每轮):
① 若队列全空 → sleep(0.1s) 继续
② 取至多 max_ops_per_transaction(默认 1000)个"操作" → 一个事务里逐条执行
③ 取至多同样多的 span → _insert_spans 批量落库
④ 把各类标注队列 flush 进库,产出 DmlEvent 放进 event_queue
⑤ sleep(0.1s)

对应代码 bulk_inserter.py:126-155。注意它的退出条件(while 头部,:129-134):只要还在运行,任一队列还有残留,就继续——保证关停时把存量排干,不丢数据。

每条 span 的落库用 begin_nested() 包成 savepoint(bulk_inserter.py:172),所以单条 span 出错只回滚它自己,不连累整批:

# src/phoenix/db/bulk_inserter.py:171-178
try:
async with session.begin_nested():
result = await insert_span(session, span, project_name)
except Exception:
BULK_LOADER_SPAN_EXCEPTIONS.inc()
logger.exception(f"Failed to insert span with span_id={span.context.span_id}")

背压: BulkInserteris_full 属性(:89-91),当内存里堆积的 span 超过 max_spans_queue_size 就返回"满"。接收侧的 gRPC 拦截器 CapacityInterceptor(在 app.py:609 装配)据此拒绝新请求,避免内存爆掉——这是系统的安全阀。

3.4 落库逻辑:insert_span 在干什么

insert_span(src/phoenix/db/insertion/span.py:25)是一条 span 落盘的全部业务逻辑,远不止 INSERT 一行。它依次处理四件事:

(a) Trace upsert。 先按 trace_id 查 Trace 记录;有就更新它的时间窗(end_time/start_time),没有就新建并归到某个 Project(span.py:32-60)。

有个值得记的设计决策——已存在的 trace 忽略传入的 project_name:

# src/phoenix/db/insertion/span.py:37-40
if trace.id is not None:
# 允许用户把 trace 在 project 之间转移,所以已存在的 trace 沿用其 project_rowid,
# 传入的 project_name 被忽略。
project_rowid = trace.project_rowid

(b) Session 关联。 若 span 带 session.id 属性,关联或新建一个 ProjectSession(把多条 trace 归成一次"会话"),并维护其时间窗(span.py:62-99)。

(c) 累计统计沿父链传播。 这是最精妙的一段。每条 span 要记录"以我为根的子树里,总共多少 token、多少错误"。但 span 到达顺序不保证(子可能先于父到)。逻辑分两半:

  • 插入时,先把已经到达的孩子们的累计值汇总进自己(span.py:124-135,对 parent_id == 本 span_id 的行求和)。
  • 然后用一个递归 CTE 沿 parent_id 链往上,把本 span 的增量加到所有祖先身上(span.py:168-189):
# src/phoenix/db/insertion/span.py:168-178 (递归 CTE 找所有祖先)
ancestors = (
select(models.Span.id, models.Span.parent_id)
.where(models.Span.span_id == span.parent_id)
.cte(recursive=True)
)
child = ancestors.alias()
ancestors = ancestors.union_all(
select(models.Span.id, models.Span.parent_id).join(
child, models.Span.span_id == child.c.parent_id
)
)

注释点破了为什么需要向上传播(span.py:164-167):通常父晚于子到达,向上传播是 no-op;但万一子晚到,就得补更新所有祖先的累计值。乱序到达也能得到正确的子树累计——这是它的硬骨头。

(d) 幂等插入。 span 用 insert_on_conflict(... on_conflict=DO_NOTHING, unique_by=("span_id",)) 写入(span.py:136-160)。同一 span 重发(OTel 重试很常见)不会插重复;若 span_rowid is None 说明撞了已存在的,直接返回(span.py:162),跳过累计传播。

3.5 落库之后:成本计算与变更事件

成本计算。 写 span 的同一批里,若 span 该算成本(should_calculate_span_cost,bulk_inserter.py:183),就用 SpanCostCalculator.calculate_cost(start_time, attributes) 按当时的模型定价算出 SpanCost,绑上 span/trace rowid,批量 add_all(bulk_inserter.py:206-212)。定价表的内部细节在 server/daemons/span_cost_calculator.py(本章未深入)。

变更事件。 一批 span 写完,只要有 project 被动过,就发一个事件:

# src/phoenix/db/bulk_inserter.py:204-205
if project_ids:
self._event_queue.put(SpanInsertEvent(tuple(project_ids)))

SpanInsertEvent 携带受影响的 project id——class SpanInsertEvent(dml_event.py:45)本身是个一行 ... 的空壳,这个 id 元组其实是 ids 字段,继承自基类 DmlEvent(dml_event.py:18,经 ProjectDmlEvent → SpanDmlEvent → SpanInsertEvent 一路继承下来)。下游 DmlEventHandler 收到后做两件事:失效这些 project 的 dataloader 缓存(dml_event_handler.py_SpanDmlEventHandler._clear),以及推 GraphQL 订阅让 UI 实时刷新。这把"写"和"读侧缓存一致性"用事件总线解耦了。

3.6 这条管线在哪装配起来的

所有部件在 app.py_lifespan 里用一个 AsyncExitStack 串起来(src/phoenix/server/app.py:595-670):进入 BulkInserter 拿到三个回调(enqueue_annotations / enqueue_span / enqueue_operation),把 enqueue_span 交给 GrpcServer,再依次启动 dml_event_handler、各守护进程。最后 yield 出一个 dict,把这些回调挂到 FastAPI 的 app.state 上,供 HTTP 的 /v1/traces 路由复用同一条入队路径。

3.7 边界与坑

  • 内存队列 = 进程内、非持久。 span 在落盘前只在内存。进程崩溃会丢掉队列里尚未写入的 span。代价换来的是极高的接收吞吐。
  • 累计统计是"最终一致"的。 乱序到达期间,某一瞬间祖先的累计值可能暂时偏小,等子 span 到齐才补齐。
  • 单条 span 失败被吞掉(只计数 + 记日志)。 设计上不让一条坏 span 拖垮整批(bulk_inserter.py:174-178),但意味着坏数据是"丢弃"而非"重试"。

3.8 代码地图

主题文件符号
OTLP 接收src/phoenix/server/grpc_server.pyServicer.Export
OTLP 解码src/phoenix/trace/otel.pydecode_otlp_span_decode_value
扁平↔嵌套src/phoenix/trace/attributes.pyunflattenhas_mappingload_json_strings
批量写循环src/phoenix/db/bulk_inserter.pyBulkInserter._bulk_insert_insert_spansis_full
一条 span 落库src/phoenix/db/insertion/span.pyinsert_spanSpanInsertionEvent
变更事件src/phoenix/server/dml_event.pySpanInsertEventDmlEvent.ids
装配src/phoenix/server/app.py_lifespan