From 35d835973bf1b948576ba229c51e539f1950d7dc Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 23 Apr 2026 13:16:10 +0200 Subject: [PATCH] controlplane: cache-locality-aware worker claim + reap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both paths iterated p.workers map in randomized Go order, so claims and reaps were non-deterministic. That churned worker nodes — a new query might claim a worker on a freshly-provisioned node with a cold cache-proxy, and an idle reap might kill the worker on the longest- lived node (best cache hit rate). Track first-seen time per node in the pool (nodeFirstSeen map, populated when a worker pod reports Ready with a Spec.NodeName) and use it to order both paths: - findIdleWorkerLocked: prefer idle worker on the oldest-seen node, so sessions land on the node with the most warmed-up NVMe parquet pages. - reapIdleWorkers: sort reap candidates by nodeFirstSeen desc so excess idle workers are evicted from the newest-seen nodes first, letting Karpenter consolidate those and keeping the old nodes warm. Workers with no known nodeName (informer race on first few ms after spawn) sort as 'new' — get reaped first, claimed last — which is the safe direction. No RBAC changes. Per-CP state, no cross-CP coordination needed (store already prevents over-spawn; the local idleCount > minWorkers gate prevents over-reap). Map is pruned on last-worker-retire per node. Tests cover: stamp idempotency, empty-node guard, prune correctness, oldest-node-wins claim, unknown-node fallback, newest-node-first reap, minWorkers floor, idleTimeout gate. --- controlplane/k8s_pool.go | 134 +++++++++++++++++++---- controlplane/k8s_pool_test.go | 195 +++++++++++++++++++++++++++++++++- controlplane/worker_mgr.go | 1 + 3 files changed, 309 insertions(+), 21 deletions(-) diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 70760c3..6de1b90 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -11,6 +11,7 @@ import ( "fmt" "log/slog" "os" + "sort" "strconv" "strings" "sync" @@ -71,7 +72,14 @@ type K8sWorkerPool struct { stopInform chan struct{} spawnSem chan struct{} // limits concurrent pod creates to avoid overwhelming the K8s API retireSem chan struct{} // limits concurrent pod deletes to avoid overwhelming the K8s API - podReady sync.Map // podName -> chan string (pod IP); signaled by informer + podReady sync.Map // podName -> chan podReadyInfo; signaled by informer + + // nodeFirstSeen tracks the first time this CP observed a worker on each + // node. Used to prefer reaping workers on the newest-seen nodes and + // claiming idle workers on the oldest-seen nodes, so warm parquet pages + // on the local NVMe cache-proxy stay useful for longer. Per-CP state; + // CPs don't coordinate this (see idleReaper/findIdleWorker). + nodeFirstSeen map[string]time.Time spawnWarmWorkerFunc func(ctx context.Context, id int) error spawnWarmWorkerBackgroundFunc func(id int) @@ -158,6 +166,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( runtimeStore: cfg.RuntimeStore, spawnSem: make(chan struct{}, spawnConcurrency), retireSem: make(chan struct{}, retireConcurrency), + nodeFirstSeen: make(map[string]time.Time), } // Resolve CP pod UID for owner references @@ -299,7 +308,7 @@ func (p *K8sWorkerPool) startInformer() { if newPod.Status.PodIP != "" && newPod.Status.Phase == corev1.PodRunning { if ch, ok := p.podReady.LoadAndDelete(newPod.Name); ok { select { - case ch.(chan string) <- newPod.Status.PodIP: + case ch.(chan podReadyInfo) <- podReadyInfo{ip: newPod.Status.PodIP, nodeName: newPod.Spec.NodeName}: default: } } @@ -308,7 +317,7 @@ func (p *K8sWorkerPool) startInformer() { if newPod.Status.Phase == corev1.PodFailed || newPod.Status.Phase == corev1.PodSucceeded { // Unblock any waiter with an error signal if ch, ok := p.podReady.LoadAndDelete(newPod.Name); ok { - close(ch.(chan string)) + close(ch.(chan podReadyInfo)) } p.onPodTerminated(newPod) } @@ -606,7 +615,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { } // Wait for pod to get an IP via informer (no polling). - podIP, err := p.waitForPodReady(ctx, podName, 5*time.Minute) + ready, err := p.waitForPodReady(ctx, podName, 5*time.Minute) if err != nil { _ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ GracePeriodSeconds: int64Ptr(0), @@ -616,7 +625,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { } // Connect gRPC client - addr := fmt.Sprintf("%s:%d", podIP, p.workerPort) + addr := fmt.Sprintf("%s:%d", ready.ip, p.workerPort) token, serverCertPEM, err := p.readWorkerRPCSecurity(ctx, podName) if err != nil { return fmt.Errorf("read worker RPC security: %w", err) @@ -634,6 +643,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { w := &ManagedWorker{ ID: id, podName: podName, + nodeName: ready.nodeName, bearerToken: token, client: client, done: done, @@ -641,6 +651,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { p.mu.Lock() p.workers[id] = w + p.stampNodeFirstSeenLocked(ready.nodeName) workerCount := len(p.workers) observeWarmPoolLifecycleGauges(p.workers) p.mu.Unlock() @@ -723,36 +734,44 @@ func (p *K8sWorkerPool) createPodWithBackoff(ctx context.Context, pod *corev1.Po return nil // unreachable } +// podReadyInfo is signaled through podReady when a pod becomes Running with +// an IP. Carrying nodeName alongside the IP lets the spawn path stamp +// nodeName onto the ManagedWorker without a second API round-trip. +type podReadyInfo struct { + ip string + nodeName string +} + // waitForPodReady waits for a pod to become Running with an IP, using the // informer instead of polling the API. Falls back to a single API check // in case the informer event fired before we registered the channel. -func (p *K8sWorkerPool) waitForPodReady(ctx context.Context, podName string, timeout time.Duration) (string, error) { - ch := make(chan string, 1) +func (p *K8sWorkerPool) waitForPodReady(ctx context.Context, podName string, timeout time.Duration) (podReadyInfo, error) { + ch := make(chan podReadyInfo, 1) p.podReady.Store(podName, ch) defer p.podReady.Delete(podName) // Check once in case the pod is already running (informer event already fired). pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, podName, metav1.GetOptions{}) if err == nil && pod.Status.PodIP != "" && pod.Status.Phase == corev1.PodRunning { - return pod.Status.PodIP, nil + return podReadyInfo{ip: pod.Status.PodIP, nodeName: pod.Spec.NodeName}, nil } if err == nil && pod.Status.Phase == corev1.PodFailed { - return "", fmt.Errorf("pod %s failed", podName) + return podReadyInfo{}, fmt.Errorf("pod %s failed", podName) } timer := time.NewTimer(timeout) defer timer.Stop() select { - case ip, ok := <-ch: - if !ok || ip == "" { - return "", fmt.Errorf("pod %s failed or was deleted", podName) + case info, ok := <-ch: + if !ok || info.ip == "" { + return podReadyInfo{}, fmt.Errorf("pod %s failed or was deleted", podName) } - return ip, nil + return info, nil case <-timer.C: - return "", fmt.Errorf("timeout waiting for pod %s to become ready", podName) + return podReadyInfo{}, fmt.Errorf("timeout waiting for pod %s to become ready", podName) case <-ctx.Done(): - return "", ctx.Err() + return podReadyInfo{}, ctx.Err() } } @@ -1874,17 +1893,41 @@ func (p *K8sWorkerPool) reapIdleWorkers() { idleCount++ } } + + // Build the list of reap-eligible workers and sort by the first time + // this CP saw their node — newest node first. We prefer evicting workers + // on nodes we only just started using so older nodes keep their warm + // NVMe parquet cache for longer. Workers with no known nodeName sort as + // "new" (they're reaped first) to avoid stalling on stale state. + type candidate struct { + id int + w *ManagedWorker + seenAt time.Time + } + var candidates []candidate for id, w := range p.workers { + if p.isWarmIdleWorkerLocked(w) && !w.lastUsed.IsZero() && now.Sub(w.lastUsed) > p.idleTimeout { + candidates = append(candidates, candidate{id: id, w: w, seenAt: p.nodeSeenAtLocked(w.nodeName, now)}) + } + } + sort.Slice(candidates, func(i, j int) bool { + // Newest-seen nodes reaped first. + return candidates[i].seenAt.After(candidates[j].seenAt) + }) + + for _, c := range candidates { if idleCount <= p.minWorkers { break } - if p.isWarmIdleWorkerLocked(w) && !w.lastUsed.IsZero() && now.Sub(w.lastUsed) > p.idleTimeout { + { + id, w := c.id, c.w p.markWorkerRetiredLocked(w, RetireReasonIdleTimeout) toRetire = append(toRetire, struct { id int w *ManagedWorker }{id, w}) delete(p.workers, id) + p.pruneNodeFirstSeenLocked(w.nodeName) idleCount-- } } @@ -1957,18 +2000,71 @@ func (p *K8sWorkerPool) reapStuckActivatingWorkers() { // --- Shared scheduling helpers (same logic as FlightWorkerPool) --- +// stampNodeFirstSeenLocked records now as the first-seen time for nodeName +// if we haven't seen it before. Caller must hold p.mu. +func (p *K8sWorkerPool) stampNodeFirstSeenLocked(nodeName string) { + if nodeName == "" || p.nodeFirstSeen == nil { + return + } + if _, ok := p.nodeFirstSeen[nodeName]; !ok { + p.nodeFirstSeen[nodeName] = time.Now() + } +} + +// nodeSeenAtLocked returns the first-seen time for nodeName. Missing or +// unknown entries return `fallback` (typically time.Now()) so callers can +// treat them as "new" for ranking — newer sorts later, so unknown nodes +// get reaped first and claimed last, which is the safe direction. +// Caller must hold p.mu. +func (p *K8sWorkerPool) nodeSeenAtLocked(nodeName string, fallback time.Time) time.Time { + if nodeName == "" || p.nodeFirstSeen == nil { + return fallback + } + if t, ok := p.nodeFirstSeen[nodeName]; ok { + return t + } + return fallback +} + +// pruneNodeFirstSeenLocked drops nodeName from the first-seen map when no +// remaining worker references it, to keep the map bounded as nodes turn over. +// Caller must hold p.mu. +func (p *K8sWorkerPool) pruneNodeFirstSeenLocked(nodeName string) { + if nodeName == "" || p.nodeFirstSeen == nil { + return + } + for _, w := range p.workers { + if w.nodeName == nodeName { + return + } + } + delete(p.nodeFirstSeen, nodeName) +} + func (p *K8sWorkerPool) findIdleWorkerLocked() *ManagedWorker { + // Prefer the idle worker on the oldest-seen node so sessions land on + // cache-warm NVMe rather than a cold new node. Workers with no known + // nodeName sort as "new" (claimed last) to avoid stalling the claim + // path on stale bookkeeping. + now := time.Now() + var best *ManagedWorker + var bestSeenAt time.Time for _, w := range p.workers { select { case <-w.done: continue default: } - if p.isWarmIdleWorkerLocked(w) { - return w + if !p.isWarmIdleWorkerLocked(w) { + continue + } + seen := p.nodeSeenAtLocked(w.nodeName, now) + if best == nil || seen.Before(bestSeenAt) { + best = w + bestSeenAt = seen } } - return nil + return best } func (p *K8sWorkerPool) leastLoadedWorkerLocked() *ManagedWorker { diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index 653835b..c43e9d1 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -5,6 +5,7 @@ package controlplane import ( "context" "errors" + "fmt" "regexp" "strconv" "sync" @@ -234,8 +235,9 @@ func newTestK8sPool(t *testing.T, maxWorkers int) (*K8sWorkerPool, *fake.Clients workerImage: "duckgres:test", workerPort: 8816, secretName: "test-secret", - spawnSem: make(chan struct{}, 1), - retireSem: make(chan struct{}, 5), + spawnSem: make(chan struct{}, 1), + retireSem: make(chan struct{}, 5), + nodeFirstSeen: make(map[string]time.Time), } return pool, cs @@ -2096,3 +2098,192 @@ func TestRetireOneMismatchedVersionWorker_NoopWhenCPIDHasNoHashSuffix(t *testing t.Fatalf("expected no retirement calls, got %d", store.retireIdleCalls) } } + +// --- Node-age-aware scheduling tests --- +// +// These cover the two places the pool uses `nodeFirstSeen`: picking the next +// idle worker to claim (oldest node preferred) and picking which excess idle +// worker to reap (newest node preferred). Ordering must be deterministic so +// query sessions land on cache-warm nodes and Karpenter can consolidate the +// newest nodes first. + +// addIdleWorker inserts a ready warm-idle worker on nodeName with a matching +// nodeFirstSeen entry. idleFor controls how far in the past lastUsed is — +// must exceed idleTimeout for the reaper to consider it. +func addIdleWorker(t *testing.T, p *K8sWorkerPool, id int, nodeName string, nodeSeenAt time.Time, idleFor time.Duration) { + t.Helper() + w := &ManagedWorker{ + ID: id, + podName: fmt.Sprintf("worker-%d", id), + nodeName: nodeName, + done: make(chan struct{}), + lastUsed: time.Now().Add(-idleFor), + } + p.workers[id] = w + if _, ok := p.nodeFirstSeen[nodeName]; !ok { + p.nodeFirstSeen[nodeName] = nodeSeenAt + } +} + +func TestStampNodeFirstSeenLockedOnlySetsOnce(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + + pool.stampNodeFirstSeenLocked("node-a") + first := pool.nodeFirstSeen["node-a"] + + // Sleep long enough that the Now() reading would differ if we overwrote. + time.Sleep(2 * time.Millisecond) + pool.stampNodeFirstSeenLocked("node-a") + if pool.nodeFirstSeen["node-a"] != first { + t.Errorf("nodeFirstSeen overwritten on repeat stamp; want stable %v, got %v", first, pool.nodeFirstSeen["node-a"]) + } +} + +func TestStampNodeFirstSeenLockedIgnoresEmptyNodeName(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.stampNodeFirstSeenLocked("") + if len(pool.nodeFirstSeen) != 0 { + t.Errorf("empty nodeName should not be recorded, got %v", pool.nodeFirstSeen) + } +} + +func TestPruneNodeFirstSeenLockedRemovesOnlyOrphanedEntries(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + now := time.Now() + addIdleWorker(t, pool, 1, "node-a", now.Add(-1*time.Hour), time.Hour) + addIdleWorker(t, pool, 2, "node-a", now.Add(-1*time.Hour), time.Hour) + addIdleWorker(t, pool, 3, "node-b", now.Add(-10*time.Minute), time.Hour) + + // Remove one worker on node-a — node-a should stay because worker 2 remains. + delete(pool.workers, 1) + pool.pruneNodeFirstSeenLocked("node-a") + if _, ok := pool.nodeFirstSeen["node-a"]; !ok { + t.Error("node-a pruned while worker still references it") + } + + // Remove the last worker on node-a — entry should go. + delete(pool.workers, 2) + pool.pruneNodeFirstSeenLocked("node-a") + if _, ok := pool.nodeFirstSeen["node-a"]; ok { + t.Error("node-a not pruned after last worker removed") + } + // node-b untouched. + if _, ok := pool.nodeFirstSeen["node-b"]; !ok { + t.Error("pruning node-a should leave node-b alone") + } +} + +func TestFindIdleWorkerLockedPrefersOldestNode(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + now := time.Now() + addIdleWorker(t, pool, 1, "node-young", now.Add(-1*time.Minute), time.Hour) + addIdleWorker(t, pool, 2, "node-old", now.Add(-1*time.Hour), time.Hour) + addIdleWorker(t, pool, 3, "node-mid", now.Add(-10*time.Minute), time.Hour) + + chosen := pool.findIdleWorkerLocked() + if chosen == nil { + t.Fatal("expected to find an idle worker") + } + if chosen.nodeName != "node-old" { + t.Errorf("claim picked %q, want node-old (longest-lived cache)", chosen.nodeName) + } +} + +func TestFindIdleWorkerLockedUnknownNodeSortsLast(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + now := time.Now() + // Worker on a known old node + worker with no node info (e.g. race between + // spawn and the informer). The known-old node must win. + addIdleWorker(t, pool, 1, "node-old", now.Add(-1*time.Hour), time.Hour) + w := &ManagedWorker{ID: 2, podName: "worker-2", done: make(chan struct{}), lastUsed: time.Now().Add(-time.Hour)} + pool.workers[2] = w + + chosen := pool.findIdleWorkerLocked() + if chosen == nil || chosen.ID != 1 { + t.Errorf("expected worker on node-old (id=1), got %+v", chosen) + } +} + +func TestReapIdleWorkersEvictsNewestNodeFirst(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.minWorkers = 1 + pool.idleTimeout = 5 * time.Minute + + now := time.Now() + // 3 idle workers, 3 different nodes. minWorkers=1 so 2 get reaped. + // Expect: node-youngest + node-mid reaped (newest-first), node-old survives. + addIdleWorker(t, pool, 1, "node-old", now.Add(-1*time.Hour), time.Hour) + addIdleWorker(t, pool, 2, "node-youngest", now.Add(-1*time.Minute), time.Hour) + addIdleWorker(t, pool, 3, "node-mid", now.Add(-10*time.Minute), time.Hour) + + // Stub retireWorkerPod so the reaper doesn't try to talk to k8s. + var retired []string + pool.retireSem = make(chan struct{}, 5) + origClient := pool.clientset + _ = origClient + // Monkey-patch via clientset fake — easier: just mark workers retired and observe. + // Call the reap logic directly; spot-check deletions via p.workers. + + pool.reapIdleWorkers() + + // Drain any retire goroutines by giving them a chance (they just run fake k8s). + // We only assert on map state, which reapIdleWorkers mutates under the lock. + + if _, ok := pool.workers[1]; !ok { + t.Error("worker on node-old was reaped; expected to survive (oldest node)") + } + if _, ok := pool.workers[2]; ok { + t.Error("worker on node-youngest not reaped; expected to be evicted first") + } + if _, ok := pool.workers[3]; ok { + t.Error("worker on node-mid not reaped; should have been second eviction") + } + if _, ok := pool.nodeFirstSeen["node-youngest"]; ok { + t.Error("nodeFirstSeen entry for node-youngest not pruned after last worker reaped") + } + if _, ok := pool.nodeFirstSeen["node-mid"]; ok { + t.Error("nodeFirstSeen entry for node-mid not pruned after last worker reaped") + } + if _, ok := pool.nodeFirstSeen["node-old"]; !ok { + t.Error("nodeFirstSeen entry for node-old incorrectly pruned (worker still alive)") + } + _ = retired +} + +func TestReapIdleWorkersStopsAtMinWorkers(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.minWorkers = 2 + pool.idleTimeout = 5 * time.Minute + + now := time.Now() + addIdleWorker(t, pool, 1, "node-a", now.Add(-1*time.Hour), time.Hour) + addIdleWorker(t, pool, 2, "node-b", now.Add(-45*time.Minute), time.Hour) + addIdleWorker(t, pool, 3, "node-c", now.Add(-30*time.Minute), time.Hour) + + pool.reapIdleWorkers() + + // 3 idle - 2 minWorkers = 1 should be reaped (the newest, node-c). + if len(pool.workers) != 2 { + t.Fatalf("expected 2 workers after reap, got %d", len(pool.workers)) + } + if _, ok := pool.workers[3]; ok { + t.Error("expected worker on node-c (newest) to be reaped") + } +} + +func TestReapIdleWorkersSkipsWorkersWithinIdleTimeout(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.minWorkers = 0 + pool.idleTimeout = 5 * time.Minute + + now := time.Now() + // Idle for only 1 minute — well under idleTimeout. + addIdleWorker(t, pool, 1, "node-a", now.Add(-1*time.Hour), 1*time.Minute) + addIdleWorker(t, pool, 2, "node-b", now.Add(-30*time.Minute), 1*time.Minute) + + pool.reapIdleWorkers() + + if len(pool.workers) != 2 { + t.Errorf("expected both workers to survive (under idleTimeout); got %d remaining", len(pool.workers)) + } +} diff --git a/controlplane/worker_mgr.go b/controlplane/worker_mgr.go index cbf8b31..6234972 100644 --- a/controlplane/worker_mgr.go +++ b/controlplane/worker_mgr.go @@ -27,6 +27,7 @@ import ( type ManagedWorker struct { ID int podName string + nodeName string //nolint:unused // only set in kubernetes warm-pool; drives cache-locality-aware scheduling cmd *exec.Cmd socketPath string bearerToken string