Ragas 执行与实验
本章讲底座:评测和测试集生成都要发成百上千次 LLM 请求,
Executor负责把它们并发跑完且不互相拖垮;以及 Ragas 正在用@experiment取代evaluate()的新范式。
1. 为什么需要一个专门的执行器
评测 100 条样本 × 4 个指标 = 400 个任务,每个任务里还可能套 2~3 次 LLM 调用。串行跑会等到天荒地老;裸 asyncio.gather 又会一口气打满 API 限流、且一个失败可能拖垮全部。
Executor 解决四件事:限制并发数、单任务失败不影响全局、结果按提交顺序排回、可中途取消。它在 evaluate 和 testset 生成里被反复复用。
2. 主线:submit → 包装 → 并发跑 → 排序还原
executor.submit(metric.single_turn_ascore, sample, ...) ← 攒任务
│ 每个任务被 wrap_callable_with_index 包一层:
│ · 记录提交序号 counter
│ · try/except: 失败时(默认)吞掉 → 返回 (counter, np.nan)
▼
executor.aresults() / results()
│ _process_jobs: 把所有任务转协程,as_completed 限并发跑
│ (可选 batch_size 分批,带嵌套进度条)
▼
sorted(results, key=lambda x: x[0]) ← 按 counter 排回提交顺序
│
▼
[r[1] for r in ...] ← 剥掉序号,返回纯结果列表
索引是关键。 并发完成顺序是乱的,但调用方要求结果顺序 == 提交顺序。wrap_callable_with_index 给每个任务绑一个 counter,完成时返回 (counter, result)(src/ragas/executor.py:64-86);aresults 最后按 counter 排序再剥掉序号(src/ragas/executor.py:193-202)。evaluate 就是靠这个顺序把结果切回「第 i 行第 j 个指标」(src/ragas/evaluation.py:292-300)。
容错。 wrap_callable_with_index 里,默认 raise_exceptions=False 时把异常记 log 并返回 np.nan,让这一条样本失败不毁掉 整批(src/ragas/executor.py:71-84)。这就是为什么调试时要开 raise_exceptions=True。
限并发。 _process_coroutines 用 as_completed(coroutines, max_workers, cancel_check=...) 把并发上限钉在 run_config.max_workers(src/ragas/executor.py:180-191),避免打爆 API。
可取消。 cancel() 置一个 threading.Event,批处理循环每批前检查 is_cancelled()(src/ragas/executor.py:56-62、156-158)。evaluate(..., return_executor=True) 把 executor 还给你,就为了能调 executor.cancel()。
幂等与可复用。 _process_jobs 一进来就把 jobs 拷贝一份再清空原列表,防止重复执行(src/ragas/executor.py:115-117);_jobs_processed 跨多次 run 持续累加,保证索引连续(src/ragas/executor.py:98-103)。
3. nest_asyncio:在 Jupyter 里跑异步的妥协
Ragas 大量用户在 Jupyter notebook 里用,而 notebook 本身已经有一个跑着的事件循环——这时再 asyncio.run 会报「event loop already running」。
Ragas 的应对是 nest_asyncio(允许事件循环嵌套):同步入口 results() 先 apply_nest_asyncio() 打补丁再 run(src/ragas/executor.py:204-215)。
但 nest_asyncio 在生产异步服务里会引发问题,所以 evaluate 给了 allow_nest_asyncio=False:此时走纯 asyncio.run 新建事件循环,不打补丁(src/ragas/evaluation.py:475-484)。而 aevaluate(纯异步版)从头就不打补丁(src/ragas/evaluation.py:79-82)。这是「兼容 notebook」与「生产安全」之间的取舍。
4. 回调追踪:把一次评测变成一棵 trace 树
Ragas 复用 LangChain 的回调机制,把每次评测组织成嵌套的 trace 树,便于在 Langfuse / MLflow 等工具里可视化:
ragas evaluation (ChainType.EVALUATION)
└─ row 0 (ChainType.ROW)
├─ faithfulness (ChainType.METRIC)
│ ├─ StatementGenerator… (ChainType.RAGAS_PROMPT)
│ └─ NLIStatement… (ChainType.RAGAS_PROMPT)
└─ answer_relevancy (ChainType.METRIC)
evaluate 先用 new_group 开顶层评测组,再为每行开 row i 子组,把指标任务 submit 到这个 row 组里(src/ragas/evaluation.py:236-276)。每个指标的 single_turn_ascore 又开自己的 METRIC 组,每次 PydanticPrompt.generate 再开 RAGAS_PROMPT 组(src/ragas/prompt/pydantic_prompt.py:231-236)。层层 new_group 嵌套就长出这棵树。RagasTracer 收集这些 trace 存进 EvaluationResult.ragas_traces(src/ragas/evaluation.py:217-218、324)。
成本核算挂在同一机制上。 传 token_usage_parser 时挂一个 CostCallbackHandler,顺着回调收集 token 用量算钱(src/ragas/evaluation.py:221-225)。
5. 新范式:@experiment 取代 evaluate()
evaluate() 和 aevaluate() 都已挂 DeprecationWarning,明确指向 @experiment(src/ragas/evaluation.py:447-453、105-111)。理念变了:
evaluate()(旧) | @experiment(新) | |
|---|---|---|
| 你给什么 | 已经算好的样本 + 指标列表 | 一个函数:对一行输入,跑你的应用 + 算分 |
| 谁跑应用 | 你自己先跑好填进 dataset | 实验框架在循环里替你跑 |
| 产出 | 内存里的 EvaluationResult | 落盘的 Experiment(可换 backend 存储) |
| 复现 | 靠你自己 | 配 version_experiment 做 git 版本化 |
用起来什么样(示意,非源码):
# 示意,非源码:@experiment 的形状
from ragas import experiment
@experiment()
async def my_eval(row):
answer = await my_rag_app(row["question"]) # 跑你自己的应用
score = await faithfulness.ascore(...) # 算分
return {"question": row["question"], "answer": answer, "score": score.value}
result = await my_eval.arun(dataset) # 逐行跑、收集、落盘
实现很轻。 experiment 是个装饰器工厂,把函数包成 ExperimentWrapper(src/ragas/experiment.py:201-232)。arun 给实验起个可记忆的随机名(memorable_names),为数据集每行建一个协程任务,用 asyncio.as_completed 并发跑,逐个收集进 Experiment 视图,单行失败只打印警告继续,最后 save() 落盘(src/ragas/experiment.py:141-198)。
注意它这里用的是裸 asyncio.as_completed,没用 Executor(没有限并发/索引排序那套)——新范式下并发控制更简单直接,结果顺序也不强求(append 到视图即可)。
git 版本化保证可复现。 version_experiment 用 GitPython 把当前代码状态提交、可选开一个 ragas/<实验名> 分支,返回 commit hash(src/ragas/experiment.py:21-100)。把「这次实验跑的是哪版代码」钉死——评测科学的最后一块拼图。
6. 边界与坑
evaluate/aevaluate会消失。 现在能用但有弃用警告,新项目应直接上@experiment。- 默认裁判是 gpt-4o-mini + 要 OpenAI key。 没给 LLM 的指标会 new 一个 OpenAI
gpt-4o-mini(src/ragas/evaluation.py:174-180,该默认装配位于aevaluate,evaluate委托给它)——没配 key 会失败。 - nest_asyncio 副作用。 在生产异步服务里用
evaluate()默认值可能搅乱事件循环;务必allow_nest_asyncio=False或改用aevaluate/@experiment。 @experiment容错更松。 它只打印Warning: Task failed(src/ragas/experiment.py:185-187),失败的行直接不进结果,容易让人没注意到丢了样本。
7. 代码地图
| 主题 | 文件 | 关键符号 |
|---|---|---|
| 异步执行器 | src/ragas/executor.py | Executor, submit, wrap_callable_with_index, aresults, cancel, run_async_batch |
| 评测装配/编排 | src/ragas/evaluation.py | evaluate, aevaluate |
| 必需列校验 | src/ragas/validation.py | validate_required_columns, validate_supported_metrics |
| nest_asyncio 工具 | src/ragas/async_utils.py | apply_nest_asyncio, run, as_completed, process_futures |
| 回调追踪 | src/ragas/callbacks.py | new_group, RagasTracer, ChainType |
| 实验范式 | src/ragas/experiment.py | experiment, ExperimentWrapper, arun, version_experiment |
| 结果对象 | src/ragas/dataset_schema.py | EvaluationResult, SingleTurnSample, MultiTurnSample |