跳到主要内容

调度器:请求怎么找到一台机器

这一章讲 Beam 控制面的心脏:一个 ContainerRequest(「请给我跑一个这样的容器」)是怎么被匹配到一台具体 worker 的,匹配不上又怎么办。代码主要在 pkg/scheduler/

1. 它要解决的小问题

抽象层会源源不断地说「我要再开 3 个容器」。但容器不能凭空出现——得有一台有足够 CPU/内存/对的 GPU 型号、且没被占满的机器来跑它。调度器就是那个「把请求和机器撮合」的中间人;撮合不上时,它还得去开新机器

2. 思路 / 直觉:一条 Redis backlog + 一个批处理循环

Beam 没有用「请求直达 worker」的同步模型,而是经 Redis 解耦

  • 抽象层调 Scheduler.Run(request),请求被推进 Redis 的 backlog(一个队列)。
  • 调度器另有一个独立循环 StartProcessingRequests,不停从 backlog 批量弹出请求来处理。

这样做的好处:生产(要容器)和消费(撮合)速度解耦,且调度状态天然可以多副本共享 backlog。

Scheduler.Run(req) StartProcessingRequests()(独立 goroutine)
│ │ 每 50ms
│ 设置并发配额 ▼
│ 检查重复容器 PopN(512) 一批请求
▼ │
addRequestToBacklog ──RPush──► Redis backlog ──LPop──► 逐个 processRequest

关键常量(scheduler.go:25-31):批大小 requestProcessingBatchSize=512、轮询间隔 requestProcessingInterval=50ms

3. 入口:Scheduler.Run 做了什么

Run 不直接调度,它只做「入队前的准备」(scheduler.go:212 func (s *Scheduler) Run):

  1. 查重:若该 ContainerId 已是 pending/running,返回 ContainerAlreadyScheduledError,避免重复起。
  2. 附检查点:若开了自动检查点且有可用检查点,把 request.Checkpoint 填上(为后面 CRIU 恢复铺路)。
  3. 并发配额getConcurrencyLimit 取 workspace 的并发上限并 SetContainerStateWithConcurrencyLimit 落库。
  4. 入 backlogaddRequestToBacklog

注意一个细节:addRequestToBacklog (scheduler.go:990) 里第一次(RetryCount==0)是立即入队、不延迟;之后每次重试都走指数退避 calculateBackoffDelay(1s 起、5s 封顶,scheduler.go:1100),并在超过 maxScheduleRetryCount=120 次或 maxScheduleRetryDuration=10min放弃

4. 核心:三级调度顺序

请求被弹出后交给 schedulingAttempt.run()reserve.go:28)。这里把调度顺序写得非常明确:

run():
① scheduleOnAvailableWorker() 就绪容量优先:有空闲 worker 直接派
└ 成功 → 返回
② reservePendingWorkerCapacity() 其次:预留一台「正在启动中」的 worker 的容量
└ 成功 → 重新入队等它起来
③ tryPrivatePoolFallback() 私有池兜底
④ provisionWorker() 最后:调云厂商 API 开一台新机器

源码注释原话(reserve.go:31-32):「use ready capacity first, wait on pending capacity second, and only provision when neither path can fit」。这条「能用现成的就别开新机」的优先级,是省钱省时间的核心。

4.1 第一级:选一个空闲 worker(过滤 + 打分)

selectWorkerFromWorkersByStatusscheduler.go:869)是撮合的核心,分两步。

第一步:一串过滤器,逐层筛掉不合格的 worker。 这是并列的几道关卡,用表更清楚:

过滤器函数筛掉谁
池选择器filterWorkersByPoolSelector请求指定了私有池 → 只留该池的 worker;没指定 → 排除「要求选择器」的 worker
私有 agent 存活filterLivePrivateAgentWorkers私有池里排除掉「机器已离线」的 worker
资源filterWorkersByResourcesCPU/内存不够、GPU 型号不匹配、被 cordon 的
标志filterWorkersByFlags非抢占请求排除掉可抢占 worker
状态filterWorkersByStatus只留指定生命周期状态(如 Available)

资源过滤里有个巧妙的内存超额预留capacityMemoryForSchedulingscheduler.go:809)把请求内存乘 1.25 (memory*125+99)/100——给容器留 25% 余量,避免刚好卡死触发 OOM。

第二步:给活下来的 worker 打分,取最高分。

# 示意,非源码:评分逻辑对应 scoreWorkerForRequest
def score(worker, request):
s = worker.priority # 池优先级是基础分
if worker.status == "available":
s += 10 # 现成可用的 +10(scoreAvailableWorker)
if request.requires_gpu():
s -= gpu_priority_modifier(...) # GPU 型号在请求偏好列表里越靠后,扣得越多
return s
# 重点看:分相同时,再比「空闲容量」——GPU 数权重极高(×1_000_000)

真实实现 scoreWorkerForRequestscheduler.go:949)+ 平局打破 workerFreeCapacityScorescheduler.go:975)。后者把空闲 GPU 数乘以 100 万,意思是优先把活塞到 GPU 最空的机器,让 GPU 利用率更均衡。

4.2 第二级:预留「待启动」worker 的容量

如果没有现成空闲 worker,但已经有一台机器正在启动中(status=Pending),调度器不会傻等也不会立刻又开一台,而是在那台未来的机器上先预扣容量——provisioningTracker.reserveCapacityreserve.go:287)。

这块用一个 provisioningTracker 维护两张映射:reservations(worker → 预留)和 requestReservations(请求 → 预留)。预留成功后请求重新入队,等那台机器起来就能直接命中第一级。预留有 TTL 兜底(pendingWorkerReservationTTL=30s)防止泄漏。

4.3 第三级:调云 API 开新机

都不行才 provisionWorkerreserve.go:94):

  1. getControllers 找到能满足请求的 worker 池控制器(按 GPU 型号筛池)。
  2. workerProvisioningController 应用退避(避免对失败的池猛开机,provisioning_backoff.go)。
  3. addReservation 先在调度器内存里建一个「占位 worker」预扣容量,再起一个 goroutine workerProvisioningAttempt.run() 真正调 controller.AddWorker(cpu, mem, gpu)
  4. 请求重新入队等新机器。

controller.AddWorker 的实现因池而异:本地 K8s(pool_local.go)、外部云厂商(pool_external.go,对接 pkg/providers/ 里的 Hetzner/Vast/Shadeform 等)、私有 agent 池(pool_agent.go)。

5. 派发:怎么把请求交给 worker

选到 worker 后 scheduleRequestscheduler.go:435):更新容器分配的 GPU、附上镜像/构建仓库凭证(attachImageCredentials/attachBuildRegistryCredentials,都带缓存),然后 pushWorkerRequestScheduleContainerRequest

派发落到 Redis(pkg/repository/worker_redis.go:941):

  • 先拿该 worker 的分布式锁,校验它仍 Available
  • updateWorkerCapacityLocked(RemoveCapacity) 原子扣减该 worker 的空闲资源;
  • RPush 把请求 JSON 推进 SchedulerWorkerRequests(workerId) 队列;
  • 推送失败会回滚容量(把扣掉的加回去),保证容量账目一致。

worker 那头就是 LPop 这个队列拿活(GetNextContainerRequestworker_redis.go:1022)。调度器和 worker 完全靠这条 Redis 队列通信,不直接 RPC。

6. 巧妙之处

  • 三级调度的显式排序reserve.go):把「就绪 > 待启预留 > 开新机」写成线性的几个 if return,可读性极高,也直接编码了成本优先级。
  • 内存 1.25× 超额预留capacityMemoryForScheduling):用一行整数运算给每个容器留 OOM 余量,朴素但有效。
  • 派发容量扣减带回滚ScheduleContainerRequest):Redis 没有跨 key 事务,作者用「先扣减、push 失败再补回」的补偿模式保证账目不漂。
  • provisioning 预留 + TTL:避免「N 个请求同时到、各自开一台机」的惊群,又用 TTL 防止预留泄漏。

7. 边界与局限

  • 调度状态、worker 状态、backlog 全在 Redis;Redis 抖动会直接放大成调度失败重试。
  • 评分是贪心选最高分,不是全局最优装箱;高并发下可能产生碎片。
  • 重试上限是固定的 120 次 / 10 分钟(maxScheduleRetryCount/maxScheduleRetryDuration),超了请求直接判失败。

8. 横向对比

和同 shelf 的「单机 agent 运行时」(如直接 subprocess 跑工具的项目)相比,Beam 是集群级调度:它要解决的是「跨几十台带 GPU 的机器弹性分配」,所以才有 backlog、评分、provisioning 这套重机制。轻量 agent 框架通常没有这一层,直接在本机起进程。Beam 的 Sandbox 抽象(见 04-abstractions.md)才是它和「agent 沙箱」类项目正面重叠的部分。

9. 代码地图

主题文件符号
调度入口pkg/scheduler/scheduler.goScheduler.Run
批处理循环pkg/scheduler/scheduler.goStartProcessingRequests
三级调度顺序pkg/scheduler/reserve.goschedulingAttempt.runrunWaitingOrProvisioning
worker 过滤pkg/scheduler/scheduler.gofilterWorkersByResourcesfilterWorkersByPoolSelector
worker 打分pkg/scheduler/scheduler.goscoreWorkerForRequestworkerFreeCapacityScore
内存超额预留pkg/scheduler/scheduler.gocapacityMemoryForScheduling
待启容量预留pkg/scheduler/reserve.goprovisioningTracker.reserveCapacity
开新机pkg/scheduler/reserve.goworkerProvisioningAttempt.run
退避pkg/scheduler/scheduler.goaddRequestToBacklogcalculateBackoffDelay
派发落 Redispkg/repository/worker_redis.goScheduleContainerRequestGetNextContainerRequest