三大模块:环境 ↔ LLM 怎么缝合
本章讲什么:RAGEN 把“让 LLM 在环境里多轮交互”这件事拆成三个职责分明的模块。看懂这三个模块怎么 传接数据,是看懂 StarPO 主循环的前提。
3.1 为什么要拆三块
要解决的小问题。 LLM 只会“吞 token、吐 token”;环境只会“收动作、吐状态+奖励”。两边语言不通, 中间需要一层翻译;而且为了训练效率,还得同时跑几百个环境(一个 batch 里有很多并行轨迹)。
思路。 把这三件事分开:
| 模块 | 只管 | 不管 |
|---|---|---|
EnvStateManager | 并行环境池的 reset/step/收集指标 | token、prompt 拼装 |
ContextManager | 状态↔token 双向翻译、loss mask、奖励张量 | 环境怎么 step |
LLMAgentProxy | 调度两者 + 调 LLM 生成 | 具体怎么算 token / 怎么 step 环境 |
三者都在 agent_proxy.py:193 的 LLMAgentProxy.__init__ 里被一次性实例化(train/val 各一套 ctx/es 管理器)。
3.2 Environment State Manager:一批环境怎么管
环境不是一个,是一整阵。 EnvStateManager 的核心是一个 self.envs 列表,每个条目是一个环境实例
加元数据。它按“env_groups 个组 × group_size 个副本”的结构创建(es_manager.py:69 的 _init_env_instances)。
这个组结构很重要:同一组里的所有副本用同一个 seed,也就是面对同一个初始状态,只是采样出不同
轨迹——这正是 GRPO / 奖励方差过滤赖以工作的“组”。看 reset 里怎么撑 seed(es_manager.py:115):
# 示意,非源码(改编自 es_manager.py:_expand_seed)
# env_groups=3, group_size=2:同组同 seed,跨组 seed+1
# 结果:[seed, seed, seed+1, seed+1, seed+2, seed+2]
def _expand_seed(seed):
seeds = [[seed + i] * group_size for i in range(env_groups)]
return sum(seeds, []) # 拍平
一个环境条目里有什么。 每个 entry 是个 dict(es_manager.py:84):tag(如 SimpleSokoban)、group_id、
env_id、env(真环境对象)、status(EnvStatus,记 truncated/terminated/num_actions/rewards)。
step 做什么。 EnvStateManager.step(es_manager.py:172)收一批 {env_id, llm_response, actions},
对每个环境依次执行动作。几个关键细节:
- 动作要过一道“查表”。
_extract_map_valid_actions(es_manager.py:384)把文本动作(如"down")映到 环境的动作码;不在action_lookup里的被丢弃。 - 无效动作有惩罚。若解析出的动作个数和管理器接受的不一致,记一笔
format_penalty(默认 -0.1,es_manager.py:235)。 - 做完的环境不再输出。
step只把没结束的环境放进env_outputs(es_manager.py:285)——这样下一轮 生成只针对还活着的轨迹,省 GPU。
并行加速。 若环境 parallel_friendly 且 max_workers>1,同 tag 的 reset/step 会走一个 ThreadPoolExecutor
(es_manager.py:104)。对 Sokoban 这种生成谜题费时、或 Search 这种要走网络的环境很有用。
收尾算指标。 get_rollout_states(es_manager.py:290)把每条轨迹的 success/num_actions/自定义指标
聚成 cache['metrics'],还会算 pass@k(k=group_size):同组里只要有一条成功就计 1(es_manager.py:356)。
3.3 环境接口:Gym 风格的 BaseEnv
所有环境只需实现两个方法。 ragen/env/base.py:5 的 BaseEnv 是个 ABC,必须实现:
reset(seed, **kwargs)→ 返回渲染后的初始状态(文本或图像)。约定:同 seed 同环境。step(action)→ 返回(observation, reward, done, info)。
info 里的键会被当成指标收集(如 sokoban 的 action_is_effective/success,sokoban/env.py:54)。
两种动作空间。 BaseDiscreteActionEnv(离散,如 Sokoban/FrozenLake)和 BaseLanguageBasedEnv(文本,
如 Countdown)。Sokoban 还同时继承了 gym_sokoban 的 GymSokobanEnv(sokoban/env.py:14)——复用现成的游戏逻辑,
只包一层文本渲染。
注册一个新环境很便宜。 在 ragen/env/__init__.py:23 的 REGISTERED_ENVS 和 REGISTERED_ENV_CONFIGS
两个字典里加一行,再在 config/envs.yaml 的 custom_envs 里写个 tag 条目(指定 env_type、env_instruction、
max_actions_per_traj 等)即可。注意:WebShop / Search / Alfworld 是“可选依赖”,__init__.py:49 用 try/except ImportError 包住——装了才注册。
3.4 Context Manager:状态 ↔ token 的双向翻译
这是工程含量最高的一块(ctx_manager.py 有 1600+ 行)。它两个方向都要管。
方向一:环境输 出 → LLM 输入(get_lm_inputs)
get_lm_inputs(ctx_manager.py:1350)是总入口,按 prepare_for_update 和 context_window_mode 分派:
prepare_for_update=False(推理)→_build_infer_samples:把历史拼成 chat 消息,末尾补上<think>(或<answer>)作为生成提示(ctx_manager.py:1260)。prepare_for_update=True(训练)→ 按模式分派给_build_samples_full/_build_single_turn_samples/_build_limited_multi_turn_samples。
每轮状态怎么拼成 prompt。 看 _build_turn_state_content(ctx_manager.py:616):每轮都拼一段
“Turn N: + State + 还剩几个动作 + 要求严格输出 <think>…</think><answer>…</answer> 格式”。系统提示由
_build_system_content(ctx_manager.py:541)拼,里面嵌了该环境的 instruction(含动作表、grid 词表等,
在 _init_prefix_lookup,ctx_manager.py:115)。
方向二:LLM 输出 → 环境动作(get_env_inputs)
get_env_inputs(ctx_manager.py:1398)把模型回复 decode 回文本,调 _parse_response(ctx_manager.py:196)
正则抽出动作。这里有两个要点:
# 示意,非源码(改编自 ctx_manager.py:_parse_response)
# 开了 think 就要两段都在;只要 answer 则只抽 answer
pattern = r'<think>(.*?)</think>\s*<answer>(.*?)</answer>' # enable_think=True
match = re.search(pattern, response, re.DOTALL)
# 动作用 action_sep(如 "||")切开,超过 max_actions_per_turn 的被截掉
actions = [a.strip() for a in action_content.split(action_sep) if a.strip()]
注意模 型生成时不含 <think> 开头(生成提示里已给了),所以 get_env_inputs 要手动把 <think>
拼回去再解析(ctx_manager.py:1406)。
loss mask:只学“该学的 token”
这是 RAGEN 最巧的点之一。多轮对话的 token 里,只有 assistant 说的那些应该被训练,系统提示、
环境状态都不该。get_masks_and_scores(ctx_manager.py:38)用一个巧劲的手法认出 assistant 轮:
# 示意,非源码(改编自 ctx_manager.py:get_masks_and_scores)
# Qwen 里每个 turn 开头是 <|im_start|>(special_token)
# 累计出现次数的奇偶,就能区分谁是谁说的
turn_starts = (input_ids == special_token).int()
turn_indicators = turn_starts.cumsum(dim=-1)
# 奇数且 >1 是 assistant(>1 跳过系统提示)
response_mask = (turn_indicators % 2 == 1) & (turn_indicators > 1)
这里 loss_mask 和 response_mask 有区别:开了 enable_response_mask 时 loss_mask 也只算 assistant
轮;否则 loss_mask 是“系统提示之后全学”(ctx_manager.py:48)。这个选项 train.py:137 里断言只支持
qwen / llama-3。
奖励怎么贴到 token
默认(use_turn_scores=False)把整条轨迹的奖励求和,贴到最后一个 token 上(ctx_manager.py:67)。随后过
_normalize_score_tensor(ctx_manager.py:223)做组内归一化(grouping 可选 state/inductive/batch,method 可选
mean_std / mean / asym_clip / identity)——这实质上是把 GRPO 的“组内减均值除方差”提前到了这里做。
3.5 三个 context window 模式
RAGEN 支持三种“一条轨迹怎么变成训练样本”的模式(agent_proxy.context_window_mode):
| 模式 | 一条轨迹 → 几个样本 | 只学哪些 assistant 轮 | 构造函数 |
|---|---|---|---|
full(默认) | 1 个(整条多轮对话) | 所有 assistant 轮 | _build_samples_full |
limited_multi_turn | 每轮 1 个(带滑动窗口历史) | 只最后一轮 | _build_limited_multi_turn_samples |
single_turn | 每轮 1 个(可只带当前轮) | 只该轮 | _build_single_turn_samples |
后两种模式把一条轨迹拆成多个样本,所以多了个 episode_ids(ctx_manager.py:971)标记“这些样本原本是同
一条 episode”——后面过滤器和 GRPO 去重都依赖它(见第 2/3 章)。train.py:154 的校验也要求这些模式下
样本数乘上 max_turn 才够 mini-batch。
3.6 LLM 生成的三种后端
generate_sequences(agent_proxy.py:203)根据 actor 类型分派:
RayWorkerGroup:训练时的真身,走 veRL 的分布式 vLLM,需要先pad_dataproto_to_divisor对齐 world_size。VllmWrapperWg(agent_proxy.py:40):评估 / 调试用的本地 vLLM。它还会顺手算每个回复的逐 token 熵 (agent_proxy.py:113),这是 V1 就有的“实例级熵”指标。ApiCallingWrapperWg(agent_proxy.py:139):调外部 API 模型(如评估闭源模型),走ConcurrentLLM。
3.7 巧妙之处
- “做完就出队”省算力。
step只返回未完成的环境(es_manager.py:285),rollout 主循环看到env_outputs为空就提前 break(agent_proxy.py:278),不会白跑剩余轮数。 - 多模态状态统一处理。
_handle_mm_state(es_manager.py:396)把图像状态转成 PIL 图像列表;<images>占位符则在_update_cache_history(es_manager.py:378)里按图片数("<images>" * len(next_state))拼出,文本与多模态走同一条路。 - 超长上下文从最旧的轮次截起。
_apply_max_length(ctx_manager.py:456)保留系统提示和当前轮, 从头按“user-assistant-reward”三条一组地删。
3.8 代码地图
| 主题 | 文件 | 符号 |
|---|---|---|
| 环境池初始化 / seed 扩张 | ragen/llm_agent/es_manager.py | _init_env_instances · reset · _expand_seed |
| 环境 step + 无效动作惩罚 | ragen/llm_agent/es_manager.py | step · _extract_map_valid_actions |
| 指标聚合 + pass@k | ragen/llm_agent/es_manager.py | get_rollout_states |
| 环境抽象接口 | ragen/env/base.py | BaseEnv · BaseDiscreteActionEnv · BaseLanguageBasedEnv |
| Sokoban 范例 | ragen/env/sokoban/env.py | SokobanEnv.reset · .step · ._render_text |
| 状态→token | ragen/llm_agent/ctx_manager.py | get_lm_inputs · _build_turn_state_content · _init_prefix_lookup |
| token→动作 | ragen/llm_agent/ctx_manager.py | get_env_inputs · _parse_response |
| loss mask / 奖励张量 | ragen/llm_agent/ctx_manager.py | get_masks_and_scores · _normalize_score_tensor |
| LLM 生成后端 | ragen/llm_agent/agent_proxy.py | generate_sequences · VllmWrapperWg · ApiCallingWrapperWg |