Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 115 additions & 19 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"log/slog"
"os"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -71,7 +72,14 @@ type K8sWorkerPool struct {
stopInform chan struct{}
spawnSem chan struct{} // limits concurrent pod creates to avoid overwhelming the K8s API
retireSem chan struct{} // limits concurrent pod deletes to avoid overwhelming the K8s API
podReady sync.Map // podName -> chan string (pod IP); signaled by informer
podReady sync.Map // podName -> chan podReadyInfo; signaled by informer

// nodeFirstSeen tracks the first time this CP observed a worker on each
// node. Used to prefer reaping workers on the newest-seen nodes and
// claiming idle workers on the oldest-seen nodes, so warm parquet pages
// on the local NVMe cache-proxy stay useful for longer. Per-CP state;
// CPs don't coordinate this (see idleReaper/findIdleWorker).
nodeFirstSeen map[string]time.Time

spawnWarmWorkerFunc func(ctx context.Context, id int) error
spawnWarmWorkerBackgroundFunc func(id int)
Expand Down Expand Up @@ -158,6 +166,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) (
runtimeStore: cfg.RuntimeStore,
spawnSem: make(chan struct{}, spawnConcurrency),
retireSem: make(chan struct{}, retireConcurrency),
nodeFirstSeen: make(map[string]time.Time),
}

// Resolve CP pod UID for owner references
Expand Down Expand Up @@ -299,7 +308,7 @@ func (p *K8sWorkerPool) startInformer() {
if newPod.Status.PodIP != "" && newPod.Status.Phase == corev1.PodRunning {
if ch, ok := p.podReady.LoadAndDelete(newPod.Name); ok {
select {
case ch.(chan string) <- newPod.Status.PodIP:
case ch.(chan podReadyInfo) <- podReadyInfo{ip: newPod.Status.PodIP, nodeName: newPod.Spec.NodeName}:
default:
}
}
Expand All @@ -308,7 +317,7 @@ func (p *K8sWorkerPool) startInformer() {
if newPod.Status.Phase == corev1.PodFailed || newPod.Status.Phase == corev1.PodSucceeded {
// Unblock any waiter with an error signal
if ch, ok := p.podReady.LoadAndDelete(newPod.Name); ok {
close(ch.(chan string))
close(ch.(chan podReadyInfo))
}
p.onPodTerminated(newPod)
}
Expand Down Expand Up @@ -606,7 +615,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error {
}

// Wait for pod to get an IP via informer (no polling).
podIP, err := p.waitForPodReady(ctx, podName, 5*time.Minute)
ready, err := p.waitForPodReady(ctx, podName, 5*time.Minute)
if err != nil {
_ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{
GracePeriodSeconds: int64Ptr(0),
Expand All @@ -616,7 +625,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error {
}

// Connect gRPC client
addr := fmt.Sprintf("%s:%d", podIP, p.workerPort)
addr := fmt.Sprintf("%s:%d", ready.ip, p.workerPort)
token, serverCertPEM, err := p.readWorkerRPCSecurity(ctx, podName)
if err != nil {
return fmt.Errorf("read worker RPC security: %w", err)
Expand All @@ -634,13 +643,15 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error {
w := &ManagedWorker{
ID: id,
podName: podName,
nodeName: ready.nodeName,
bearerToken: token,
client: client,
done: done,
}

p.mu.Lock()
p.workers[id] = w
p.stampNodeFirstSeenLocked(ready.nodeName)
workerCount := len(p.workers)
observeWarmPoolLifecycleGauges(p.workers)
p.mu.Unlock()
Expand Down Expand Up @@ -723,36 +734,44 @@ func (p *K8sWorkerPool) createPodWithBackoff(ctx context.Context, pod *corev1.Po
return nil // unreachable
}

// podReadyInfo is signaled through podReady when a pod becomes Running with
// an IP. Carrying nodeName alongside the IP lets the spawn path stamp
// nodeName onto the ManagedWorker without a second API round-trip.
type podReadyInfo struct {
ip string
nodeName string
}

// waitForPodReady waits for a pod to become Running with an IP, using the
// informer instead of polling the API. Falls back to a single API check
// in case the informer event fired before we registered the channel.
func (p *K8sWorkerPool) waitForPodReady(ctx context.Context, podName string, timeout time.Duration) (string, error) {
ch := make(chan string, 1)
func (p *K8sWorkerPool) waitForPodReady(ctx context.Context, podName string, timeout time.Duration) (podReadyInfo, error) {
ch := make(chan podReadyInfo, 1)
p.podReady.Store(podName, ch)
defer p.podReady.Delete(podName)

// Check once in case the pod is already running (informer event already fired).
pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, podName, metav1.GetOptions{})
if err == nil && pod.Status.PodIP != "" && pod.Status.Phase == corev1.PodRunning {
return pod.Status.PodIP, nil
return podReadyInfo{ip: pod.Status.PodIP, nodeName: pod.Spec.NodeName}, nil
}
if err == nil && pod.Status.Phase == corev1.PodFailed {
return "", fmt.Errorf("pod %s failed", podName)
return podReadyInfo{}, fmt.Errorf("pod %s failed", podName)
}

timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case ip, ok := <-ch:
if !ok || ip == "" {
return "", fmt.Errorf("pod %s failed or was deleted", podName)
case info, ok := <-ch:
if !ok || info.ip == "" {
return podReadyInfo{}, fmt.Errorf("pod %s failed or was deleted", podName)
}
return ip, nil
return info, nil
case <-timer.C:
return "", fmt.Errorf("timeout waiting for pod %s to become ready", podName)
return podReadyInfo{}, fmt.Errorf("timeout waiting for pod %s to become ready", podName)
case <-ctx.Done():
return "", ctx.Err()
return podReadyInfo{}, ctx.Err()
}
}

Expand Down Expand Up @@ -1874,17 +1893,41 @@ func (p *K8sWorkerPool) reapIdleWorkers() {
idleCount++
}
}

// Build the list of reap-eligible workers and sort by the first time
// this CP saw their node — newest node first. We prefer evicting workers
// on nodes we only just started using so older nodes keep their warm
// NVMe parquet cache for longer. Workers with no known nodeName sort as
// "new" (they're reaped first) to avoid stalling on stale state.
type candidate struct {
id int
w *ManagedWorker
seenAt time.Time
}
var candidates []candidate
for id, w := range p.workers {
if p.isWarmIdleWorkerLocked(w) && !w.lastUsed.IsZero() && now.Sub(w.lastUsed) > p.idleTimeout {
candidates = append(candidates, candidate{id: id, w: w, seenAt: p.nodeSeenAtLocked(w.nodeName, now)})
}
}
sort.Slice(candidates, func(i, j int) bool {
// Newest-seen nodes reaped first.
return candidates[i].seenAt.After(candidates[j].seenAt)
})

for _, c := range candidates {
if idleCount <= p.minWorkers {
break
}
if p.isWarmIdleWorkerLocked(w) && !w.lastUsed.IsZero() && now.Sub(w.lastUsed) > p.idleTimeout {
{
id, w := c.id, c.w
p.markWorkerRetiredLocked(w, RetireReasonIdleTimeout)
toRetire = append(toRetire, struct {
id int
w *ManagedWorker
}{id, w})
delete(p.workers, id)
p.pruneNodeFirstSeenLocked(w.nodeName)
idleCount--
}
}
Expand Down Expand Up @@ -1957,18 +2000,71 @@ func (p *K8sWorkerPool) reapStuckActivatingWorkers() {

// --- Shared scheduling helpers (same logic as FlightWorkerPool) ---

// stampNodeFirstSeenLocked records now as the first-seen time for nodeName
// if we haven't seen it before. Caller must hold p.mu.
func (p *K8sWorkerPool) stampNodeFirstSeenLocked(nodeName string) {
if nodeName == "" || p.nodeFirstSeen == nil {
return
}
if _, ok := p.nodeFirstSeen[nodeName]; !ok {
p.nodeFirstSeen[nodeName] = time.Now()
}
}

// nodeSeenAtLocked returns the first-seen time for nodeName. Missing or
// unknown entries return `fallback` (typically time.Now()) so callers can
// treat them as "new" for ranking — newer sorts later, so unknown nodes
// get reaped first and claimed last, which is the safe direction.
// Caller must hold p.mu.
func (p *K8sWorkerPool) nodeSeenAtLocked(nodeName string, fallback time.Time) time.Time {
if nodeName == "" || p.nodeFirstSeen == nil {
return fallback
}
if t, ok := p.nodeFirstSeen[nodeName]; ok {
return t
}
return fallback
}

// pruneNodeFirstSeenLocked drops nodeName from the first-seen map when no
// remaining worker references it, to keep the map bounded as nodes turn over.
// Caller must hold p.mu.
func (p *K8sWorkerPool) pruneNodeFirstSeenLocked(nodeName string) {
if nodeName == "" || p.nodeFirstSeen == nil {
return
}
for _, w := range p.workers {
if w.nodeName == nodeName {
return
}
}
delete(p.nodeFirstSeen, nodeName)
}

func (p *K8sWorkerPool) findIdleWorkerLocked() *ManagedWorker {
// Prefer the idle worker on the oldest-seen node so sessions land on
// cache-warm NVMe rather than a cold new node. Workers with no known
// nodeName sort as "new" (claimed last) to avoid stalling the claim
// path on stale bookkeeping.
now := time.Now()
var best *ManagedWorker
var bestSeenAt time.Time
for _, w := range p.workers {
select {
case <-w.done:
continue
default:
}
if p.isWarmIdleWorkerLocked(w) {
return w
if !p.isWarmIdleWorkerLocked(w) {
continue
}
seen := p.nodeSeenAtLocked(w.nodeName, now)
if best == nil || seen.Before(bestSeenAt) {
best = w
bestSeenAt = seen
}
}
return nil
return best
}

func (p *K8sWorkerPool) leastLoadedWorkerLocked() *ManagedWorker {
Expand Down
Loading
Loading