From 5b9a7256d8531b459ce5dc9a57d6d8bf714820bc Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 23 Apr 2026 17:04:46 +0200 Subject: [PATCH 1/2] controlplane: reconcile stranded worker pods + fix shutdown ordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two prod workers on an older deploy (198b09d) survived two subsequent rollouts with the DB row already in state=retired, still serving queries — including the unpatched httpfs extension that #411 bundled to avoid the stoi overflow crash on large parquet files. The root cause is a gap between ShutdownAll and the janitor's orphan sweep: 1. ShutdownAll marked each worker retired in the DB *before* issuing the K8s pod delete. Delete was fire-and-forget (`_ = ...Delete`), so on API hiccup or CP SIGKILL mid-shutdown the pod survived while the DB row was already terminal. 2. ListOrphanedWorkers explicitly excludes retired/lost rows (they would otherwise loop the janitor on 404s), and bare worker pods carry no owner reference, so nothing in the cluster reaped them. configstore/store.go:847 already referenced a `K8sWorkerPool.cleanupOrphanedWorkerPods` that "handles [this] leak case authoritatively". That function did not exist in the codebase. Fix 1 — ShutdownAll 3-step CAS chain per worker: MarkWorkerDraining (idle/hot_idle/... → draining) → pods.Delete() → RetireDrainingWorker (draining → retired) The intermediate `draining` state fences the worker against claims from other CPs — their claim queries match state=idle/hot_idle. On pod-delete failure the row stays in draining; ListOrphanedWorkers picks up draining rows whose owner CP has expired, so orphan cleanup retires + deletes the pod automatically (~50s after the CP heartbeat times out). Fix 2 — cleanupOrphanedWorkerPods reconciler: Lists worker pods by `app=duckgres-worker` label every janitor tick (leader-only) and deletes those whose DB row is retired/lost or missing. An age grace protects newborn pods whose DB row is still being upserted. Catches the existing stranded pods and any future failures even when the draining chain is interrupted (e.g. kubelet hangs after accepting the Delete). Tests (14 new, all passing under -tags kubernetes): - 7 reconciler cases: retired/lost/missing/live/young/NotFound/non-worker - 5 ShutdownAll cases: happy chain, ordering, CAS-miss skip, delete-failure-leaves-draining, NotFound-counts-as-success - 2 janitor cases: reconciler is invoked, runs after the version reaper --- controlplane/configstore/store.go | 59 +++- controlplane/janitor.go | 9 + controlplane/janitor_test.go | 55 ++++ controlplane/k8s_pool.go | 157 +++++++++- controlplane/k8s_pool_test.go | 471 ++++++++++++++++++++++++++++++ controlplane/multitenant.go | 7 + controlplane/worker_pool.go | 2 + 7 files changed, 747 insertions(+), 13 deletions(-) diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 45e11ec..6493de1 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -666,6 +666,58 @@ func (cs *ConfigStore) RetireIdleWorker(workerID int, reason string) (bool, erro return result.RowsAffected > 0, nil } +// MarkWorkerDraining atomically transitions a worker into the draining state +// if and only if it is still owned by the caller and not already terminal. It +// returns true when the transition happened. +// +// Used by ShutdownAll to fence a worker before issuing its K8s pod delete: no +// other CP can claim the worker once it's draining (ClaimIdleWorker and +// ClaimHotIdleWorker filter on state=idle and state=hot_idle respectively), +// so the pod-delete/DB-retire chain can proceed without a claim race. If the +// CP then crashes before the final retired transition, ListOrphanedWorkers +// includes draining rows whose owner CP has expired, so orphan cleanup +// retires the worker and deletes the pod. +// +// The ownerCPInstanceID guard prevents a stale CP from moving a worker that +// has already been taken over by a successor. +func (cs *ConfigStore) MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) { + drainableStates := []WorkerState{ + WorkerStateSpawning, + WorkerStateIdle, + WorkerStateReserved, + WorkerStateActivating, + WorkerStateHot, + WorkerStateHotIdle, + } + result := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())). + Where("worker_id = ? AND owner_cp_instance_id = ? AND state IN ?", workerID, ownerCPInstanceID, drainableStates). + Updates(map[string]any{ + "state": WorkerStateDraining, + "updated_at": time.Now(), + }) + if result.Error != nil { + return false, fmt.Errorf("mark worker %d draining: %w", workerID, result.Error) + } + return result.RowsAffected > 0, nil +} + +// RetireDrainingWorker atomically transitions a draining worker to retired. +// Returns true if the transition happened, false if the worker was no longer +// in draining (e.g. already retired by an orphan sweep after a CP restart). +func (cs *ConfigStore) RetireDrainingWorker(workerID int, reason string) (bool, error) { + result := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())). + Where("worker_id = ? AND state = ?", workerID, WorkerStateDraining). + Updates(map[string]any{ + "state": WorkerStateRetired, + "retire_reason": reason, + "updated_at": time.Now(), + }) + if result.Error != nil { + return false, fmt.Errorf("retire draining worker %d: %w", workerID, result.Error) + } + return result.RowsAffected > 0, nil +} + // TakeOverWorker transfers durable worker ownership to a new control-plane // instance when the caller still has the expected prior owner_epoch. func (cs *ConfigStore) TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*WorkerRecord, error) { @@ -842,9 +894,10 @@ func (cs *ConfigStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePre // already been marked expired long enough ago to pass the orphan grace cutoff. // Retired/lost rows are deliberately excluded: their pods are either already // gone (in which case re-listing the row would loop the janitor on a 404 from -// the K8s delete forever) or were leaked when the previous CP died mid-delete, -// and that leak case is handled authoritatively by the K8s label-based startup -// scan in K8sWorkerPool.cleanupOrphanedWorkerPods. +// the K8s delete forever) or were leaked when the previous CP died mid-delete. +// That leak case is handled by the K8s label-based reconciler in +// K8sWorkerPool.cleanupOrphanedWorkerPods, which runs every janitor tick on +// the leader and deletes pods whose DB row is retired/lost or missing. func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, error) { var workers []WorkerRecord cleanupStates := []WorkerState{ diff --git a/controlplane/janitor.go b/controlplane/janitor.go index ac9fc7f..99325a8 100644 --- a/controlplane/janitor.go +++ b/controlplane/janitor.go @@ -38,6 +38,7 @@ type ControlPlaneJanitor struct { retireLocalWorker func(workerID int, reason string) bool // retires from in-memory pool + pod, returns false if not local reconcileWarmCapacity func() retireMismatchedVersionWorker func() // reaps one warm idle worker whose Deployment version differs from this CP's (leader-only) + cleanupOrphanedWorkerPods func() // deletes K8s worker pods whose DB row is terminal (retired/lost) or missing (leader-only) } func NewControlPlaneJanitor(store controlPlaneExpiryStore, interval, expiryTimeout time.Duration) *ControlPlaneJanitor { @@ -158,6 +159,14 @@ func (j *ControlPlaneJanitor) runOnce() { j.retireMismatchedVersionWorker() } + // Reconcile K8s pods against the DB state store: delete any worker pod + // whose DB row is terminal (retired/lost) or missing entirely. Catches + // pods leaked by a previous CP that died mid-shutdown (ShutdownAll marked + // the row retired before the K8s delete completed). + if j.cleanupOrphanedWorkerPods != nil { + j.cleanupOrphanedWorkerPods() + } + if j.reconcileWarmCapacity != nil { j.reconcileWarmCapacity() } diff --git a/controlplane/janitor_test.go b/controlplane/janitor_test.go index 8547c48..b593b1a 100644 --- a/controlplane/janitor_test.go +++ b/controlplane/janitor_test.go @@ -195,6 +195,61 @@ func TestControlPlaneJanitorRunReconcilesWarmCapacity(t *testing.T) { } } +func TestControlPlaneJanitorRunInvokesStrandedPodReconciler(t *testing.T) { + // Every tick the janitor must invoke the stranded-pod reconciler so that + // pods leaked by a previous CP (which marked its workers retired but + // failed to delete the K8s pod) get cleaned up automatically. + store := &captureControlPlaneExpiryStore{} + janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second) + + var mu sync.Mutex + calls := 0 + janitor.cleanupOrphanedWorkerPods = func() { + mu.Lock() + defer mu.Unlock() + calls++ + } + + janitor.runOnce() + + mu.Lock() + defer mu.Unlock() + if calls != 1 { + t.Fatalf("expected janitor to invoke stranded-pod reconciler exactly once, got %d", calls) + } +} + +func TestControlPlaneJanitorRunInvokesReconcilerAfterVersionReaper(t *testing.T) { + // Ordering matters: retireMismatchedVersionWorker retires an idle worker + // (setting its row to retired) and deletes the pod. If the pod delete + // fails, the stranded-pod reconciler in the next tick's pass must catch + // it. Running them in order within the same tick ensures a delete failure + // gets a retry within ~5s instead of waiting for the next full tick. + store := &captureControlPlaneExpiryStore{} + janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second) + + var mu sync.Mutex + var order []string + janitor.retireMismatchedVersionWorker = func() { + mu.Lock() + defer mu.Unlock() + order = append(order, "version_reap") + } + janitor.cleanupOrphanedWorkerPods = func() { + mu.Lock() + defer mu.Unlock() + order = append(order, "reconcile_stranded") + } + + janitor.runOnce() + + mu.Lock() + defer mu.Unlock() + if len(order) != 2 || order[0] != "version_reap" || order[1] != "reconcile_stranded" { + t.Fatalf("expected version_reap → reconcile_stranded ordering, got %v", order) + } +} + func TestControlPlaneJanitorRunInvokesVersionReaperBeforeReconcile(t *testing.T) { // The version-aware reaper must run before reconcileWarmCapacity in the // same tick so that a retired-this-tick worker's warm slot is replenished diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 6de1b90..9df01f4 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -266,6 +266,76 @@ func (p *K8sWorkerPool) RetireOneMismatchedVersionWorker(ctx context.Context) bo return false } +// cleanupOrphanedWorkerPods deletes worker pods whose DB row is in a terminal +// state (retired/lost) or has no DB row at all, reconciling K8s against the +// state store. Runs from the janitor loop (leader-only). +// +// This closes a gap between ShutdownAll and the janitor's orphan sweep. +// ShutdownAll marks the worker row retired in the DB before issuing the K8s +// pod delete, and the delete is fire-and-forget — so if the delete fails (API +// hiccup) or the CP is SIGKILL'd mid-shutdown, the pod survives while the DB +// row is already terminal. ListOrphanedWorkers explicitly excludes +// terminal-state rows, so without this reconciler those pods live forever. +// Bare worker pods have no owner reference, so nothing else in the cluster +// reaps them either. +// +// minAge protects newly-spawned pods: the spawn path creates the pod BEFORE +// upserting the DB row, so there's a brief window where a live pod has no DB +// record. Without the age gate the reconciler would race the spawner. +// +// Returns the number of pods deleted this call. +func (p *K8sWorkerPool) cleanupOrphanedWorkerPods(ctx context.Context, minAge time.Duration) int { + if p.runtimeStore == nil || p.clientset == nil { + return 0 + } + pods, err := p.clientset.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "app=duckgres-worker", + }) + if err != nil { + slog.Warn("Stranded-pod reconciler failed to list worker pods.", "error", err) + return 0 + } + cutoff := time.Now().Add(-minAge) + deleted := 0 + for i := range pods.Items { + pod := &pods.Items[i] + if pod.CreationTimestamp.Time.After(cutoff) { + continue + } + idStr := pod.Labels["duckgres/worker-id"] + if idStr == "" { + continue + } + workerID, err := strconv.Atoi(idStr) + if err != nil { + continue + } + rec, err := p.runtimeStore.GetWorkerRecord(workerID) + if err != nil { + slog.Warn("Stranded-pod reconciler failed to load worker record.", "worker_id", workerID, "error", err) + continue + } + dbState := "missing" + if rec != nil { + if rec.State != configstore.WorkerStateRetired && rec.State != configstore.WorkerStateLost { + continue + } + dbState = string(rec.State) + } + gracePeriod := int64(10) + if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + }); err != nil && !errors.IsNotFound(err) { + slog.Warn("Stranded-pod reconciler failed to delete pod.", "pod", pod.Name, "worker_id", workerID, "error", err) + continue + } + _ = p.deleteWorkerRPCSecret(ctx, pod.Name) + slog.Info("Stranded worker pod reconciled.", "pod", pod.Name, "worker_id", workerID, "db_state", dbState) + deleted++ + } + return deleted +} + func (p *K8sWorkerPool) resolveCPUID(ctx context.Context) error { pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.cpID, metav1.GetOptions{}) if err != nil { @@ -1800,7 +1870,25 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat } } -// ShutdownAll stops all workers by deleting their pods. +// ShutdownAll stops all workers by deleting their pods. Per worker it runs a +// 3-step CAS chain against the runtime store and K8s API: +// +// 1. MarkWorkerDraining — atomic SQL CAS from a non-terminal state to +// draining. Fences the worker against claims by other CPs: their claim +// queries match state=idle/hot_idle, which no longer apply. If the CAS +// misses (row already terminal or owned by another CP) the worker is +// skipped entirely. +// 2. K8s pod delete. Only reached after the CAS succeeds. NotFound is +// treated as success (the pod is gone by some other path). +// 3. RetireDrainingWorker — atomic SQL CAS draining → retired. Only reached +// on successful pod delete. On delete failure the row stays in +// draining, which lets ListOrphanedWorkers pick it up once the CP's +// heartbeat expires, and lets cleanupOrphanedWorkerPods delete the pod +// by label on the next janitor tick. +// +// This ordering closes the old race where the DB row was flipped to retired +// before the pod delete: if the delete failed, the pod survived forever +// because terminal-state rows are excluded from ListOrphanedWorkers. func (p *K8sWorkerPool) ShutdownAll() { p.mu.Lock() if p.shuttingDown { @@ -1810,7 +1898,6 @@ func (p *K8sWorkerPool) ShutdownAll() { p.shuttingDown = true workers := make([]*ManagedWorker, 0, len(p.workers)) for _, w := range p.workers { - p.markWorkerRetiredLocked(w, RetireReasonShutdown) workers = append(workers, w) } p.mu.Unlock() @@ -1821,15 +1908,55 @@ func (p *K8sWorkerPool) ShutdownAll() { ctx := context.Background() for _, w := range workers { podName := p.workerPodName(w) - gracePeriod := int64(10) slog.Info("Shutting down K8s worker.", "id", w.ID, "pod", podName) - _ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ + + // Step 1: CAS to draining. Skip the worker on CAS miss or error — + // there's no safe way to proceed if we don't own the row. + if p.runtimeStore != nil { + transitioned, err := p.runtimeStore.MarkWorkerDraining(w.ID, p.cpInstanceID) + if err != nil { + slog.Warn("ShutdownAll: CAS to draining failed; orphan sweep will reconcile.", + "worker_id", w.ID, "error", err) + continue + } + if !transitioned { + slog.Debug("ShutdownAll: worker not owned by us or already terminal; skipping.", + "worker_id", w.ID) + continue + } + } + + // Step 2: delete pod. Leave the row in draining on any error other + // than NotFound so recovery paths pick it up. + gracePeriod := int64(10) + if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ GracePeriodSeconds: &gracePeriod, - }) + }); err != nil && !errors.IsNotFound(err) { + slog.Warn("ShutdownAll: pod delete failed; worker left in draining for orphan sweep/reconciler.", + "id", w.ID, "pod", podName, "error", err) + continue + } _ = p.deleteWorkerRPCSecret(ctx, podName) if w.client != nil { _ = w.client.Close() } + + // Step 3: final CAS to retired. If this fails (network blip during + // shutdown) the row stays in draining and the orphan sweep handles + // it once this CP's heartbeat expires. + if p.runtimeStore != nil { + if _, err := p.runtimeStore.RetireDrainingWorker(w.ID, RetireReasonShutdown); err != nil { + slog.Warn("ShutdownAll: CAS to retired failed; orphan sweep will reconcile.", + "worker_id", w.ID, "error", err) + continue + } + } + + // In-memory lifecycle + metrics. Intentionally skips persistence + // (we've already persisted the retired state via the CAS chain). + p.mu.Lock() + p.markWorkerRetiredInMemoryLocked(w, RetireReasonShutdown) + p.mu.Unlock() } p.mu.Lock() @@ -2276,6 +2403,21 @@ func (p *K8sWorkerPool) spawnWarmWorkerBackground(id int) { } func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string) { + p.markWorkerRetiredInMemoryLocked(w, reason) + workerState := configstore.WorkerStateRetired + if reason == RetireReasonCrash { + workerState = configstore.WorkerStateLost + } + p.persistWorkerRecord(p.workerRecordFor(w.ID, w, w.OwnerEpoch(), workerState, reason, nil)) +} + +// markWorkerRetiredInMemoryLocked performs only the in-memory lifecycle +// transition and metrics bookkeeping for a worker retirement, without +// persisting to the runtime store. Used by callers that have already +// advanced the DB state via a scoped CAS (e.g. ShutdownAll's draining +// chain) and don't want an unconditional UpsertWorkerRecord to overwrite +// fields set by that CAS. +func (p *K8sWorkerPool) markWorkerRetiredInMemoryLocked(w *ManagedWorker, reason string) { lifecycle := w.SharedState().NormalizedLifecycle() if lifecycle == WorkerLifecycleHot || lifecycle == WorkerLifecycleHotIdle { observeHotWorkerSessions(w.peakSessions) @@ -2285,11 +2427,6 @@ func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string) return } _ = w.SetSharedState(nextState) - workerState := configstore.WorkerStateRetired - if reason == RetireReasonCrash { - workerState = configstore.WorkerStateLost - } - p.persistWorkerRecord(p.workerRecordFor(w.ID, w, w.OwnerEpoch(), workerState, reason, nil)) observeWorkerRetirement(reason) observeWarmPoolLifecycleGauges(p.workers) } diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index c43e9d1..678536e 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -62,6 +62,26 @@ type captureRuntimeWorkerStore struct { retireIdleCalledReasons []string retireIdleErr error retireIdleMisses map[int]bool + preloadedRecords map[int]*configstore.WorkerRecord + getRecordErrIDs map[int]error + markDrainingCalls int + markDrainingCalledIDs []int + markDrainingCalledCPs []string + markDrainingMisses map[int]bool + markDrainingErr error + retireDrainingCalls int + retireDrainingCalledIDs []int + retireDrainingReasons []string + retireDrainingMisses map[int]bool + retireDrainingErr error + // events records a unified, ordered timeline of state transitions on + // this store so tests can assert happens-before relationships (e.g. + // that pod-delete occurs between markDraining and retireDraining). + events []string +} + +func (s *captureRuntimeWorkerStore) recordEvent(evt string) { + s.events = append(s.events, evt) } func (s *captureRuntimeWorkerStore) UpsertWorkerRecord(record *configstore.WorkerRecord) error { @@ -157,6 +177,16 @@ func (s *captureRuntimeWorkerStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceI func (s *captureRuntimeWorkerStore) GetWorkerRecord(workerID int) (*configstore.WorkerRecord, error) { s.mu.Lock() defer s.mu.Unlock() + if err, ok := s.getRecordErrIDs[workerID]; ok { + return nil, err + } + if rec, ok := s.preloadedRecords[workerID]; ok { + if rec == nil { + return nil, nil + } + record := *rec + return &record, nil + } if s.claimed != nil && s.claimed.WorkerID == workerID { record := *s.claimed return &record, nil @@ -204,6 +234,38 @@ func (s *captureRuntimeWorkerStore) RetireIdleWorker(workerID int, reason string return true, nil } +func (s *captureRuntimeWorkerStore) MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.markDrainingCalls++ + s.markDrainingCalledIDs = append(s.markDrainingCalledIDs, workerID) + s.markDrainingCalledCPs = append(s.markDrainingCalledCPs, ownerCPInstanceID) + s.recordEvent(fmt.Sprintf("draining:%d", workerID)) + if s.markDrainingErr != nil { + return false, s.markDrainingErr + } + if s.markDrainingMisses[workerID] { + return false, nil + } + return true, nil +} + +func (s *captureRuntimeWorkerStore) RetireDrainingWorker(workerID int, reason string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.retireDrainingCalls++ + s.retireDrainingCalledIDs = append(s.retireDrainingCalledIDs, workerID) + s.retireDrainingReasons = append(s.retireDrainingReasons, reason) + s.recordEvent(fmt.Sprintf("retired:%d", workerID)) + if s.retireDrainingErr != nil { + return false, s.retireDrainingErr + } + if s.retireDrainingMisses[workerID] { + return false, nil + } + return true, nil +} + func newTestK8sPool(t *testing.T, maxWorkers int) (*K8sWorkerPool, *fake.Clientset) { t.Helper() cs := fake.NewSimpleClientset() @@ -2099,6 +2161,415 @@ func TestRetireOneMismatchedVersionWorker_NoopWhenCPIDHasNoHashSuffix(t *testing } } +// --- Stranded-pod reconciler tests --- +// +// cleanupOrphanedWorkerPods closes a gap left by ShutdownAll: the CP marks the +// worker row terminal (retired/lost) in the DB before issuing the K8s pod +// delete, and the delete is fire-and-forget. If the delete fails (API hiccup, +// CP SIGKILL'd mid-shutdown), the pod survives forever because: +// - ListOrphanedWorkers excludes terminal states, so orphan cleanup ignores it +// - Bare worker pods have no owner reference, so nothing else reaps them +// These tests pin the expected behavior of the K8s-label-based reconciler. + +// strandedReconcilerPool wires a K8sWorkerPool with a fake clientset and store +// for reconciler tests. Ownership labels aren't checked by the reconciler, so +// we keep the setup minimal. +func strandedReconcilerPool(t *testing.T, store RuntimeWorkerStore) (*K8sWorkerPool, *fake.Clientset) { + t.Helper() + cs := fake.NewClientset() + pool := &K8sWorkerPool{ + workers: make(map[int]*ManagedWorker), + shutdownCh: make(chan struct{}), + stopInform: make(chan struct{}), + clientset: cs, + namespace: "default", + cpID: "duckgres-new-aaaaa", + cpInstanceID: "duckgres-new-aaaaa-boot", + runtimeStore: store, + retireSem: make(chan struct{}, 5), + } + return pool, cs +} + +func createStrandedWorkerPod(t *testing.T, cs *fake.Clientset, name, workerIDLabel string, age time.Duration) { + t.Helper() + _, err := cs.CoreV1().Pods("default").Create(context.Background(), &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + CreationTimestamp: metav1.NewTime(time.Now().Add(-age)), + Labels: map[string]string{ + "app": "duckgres-worker", + "duckgres/worker-id": workerIDLabel, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create pod %q: %v", name, err) + } +} + +func podExists(t *testing.T, cs *fake.Clientset, name string) bool { + t.Helper() + _, err := cs.CoreV1().Pods("default").Get(context.Background(), name, metav1.GetOptions{}) + if err == nil { + return true + } + if k8serrors.IsNotFound(err) { + return false + } + t.Fatalf("unexpected error fetching pod %q: %v", name, err) + return false +} + +func TestCleanupOrphanedWorkerPods_DeletesPodWhenDBStateRetired(t *testing.T) { + // This is the exact prod scenario: worker's DB row is state=retired (a + // previous CP marked it during ShutdownAll) but the K8s pod survived + // because the delete failed or was interrupted. The reconciler must catch + // this and delete the pod. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 31758: {WorkerID: 31758, State: configstore.WorkerStateRetired}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-old-worker-31758", "31758", 10*time.Minute) + + deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute) + if deleted != 1 { + t.Fatalf("expected 1 pod deleted, got %d", deleted) + } + if podExists(t, cs, "duckgres-old-worker-31758") { + t.Fatal("expected stranded pod to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_DeletesPodWhenDBStateLost(t *testing.T) { + // lost is the DB state assigned when a worker is retired with reason=crash + // (see markWorkerRetiredLocked). These pods are also terminal-in-DB and + // must be reconciled. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 42: {WorkerID: 42, State: configstore.WorkerStateLost}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-lost-worker-42", "42", 10*time.Minute) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 1 { + t.Fatalf("expected 1 pod deleted, got %d", deleted) + } + if podExists(t, cs, "duckgres-lost-worker-42") { + t.Fatal("expected lost-state pod to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_DeletesPodWhenDBRecordMissing(t *testing.T) { + // No DB row exists at all for this worker-id: fully orphaned pod, likely + // from a worker row that was purged while the pod kept running. Treat it + // the same as a terminal-state pod. + store := &captureRuntimeWorkerStore{} + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-ghost-worker-99", "99", 10*time.Minute) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 1 { + t.Fatalf("expected 1 pod deleted, got %d", deleted) + } + if podExists(t, cs, "duckgres-ghost-worker-99") { + t.Fatal("expected ghost pod with no DB row to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_LeavesLivePodAlone(t *testing.T) { + // Workers in any non-terminal state (idle, reserved, activating, hot, + // hot_idle, spawning, draining) are part of the normal lifecycle — the + // reconciler must not disturb them. This test covers the common live + // state (idle). Other states follow the same code path. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 7: {WorkerID: 7, State: configstore.WorkerStateIdle}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-live-worker-7", "7", 10*time.Minute) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 0 { + t.Fatalf("expected no pods deleted for live worker, got %d", deleted) + } + if !podExists(t, cs, "duckgres-live-worker-7") { + t.Fatal("expected idle worker pod to survive reconciliation") + } +} + +func TestCleanupOrphanedWorkerPods_SkipsYoungPod(t *testing.T) { + // Spawning workers create the pod BEFORE inserting the DB row. Without a + // grace window on pod age, the reconciler would delete freshly-spawned + // pods in the ~100ms race window between pod creation and DB upsert. + store := &captureRuntimeWorkerStore{} // no record yet — newborn + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-newborn-worker-11", "11", 30*time.Second) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 0 { + t.Fatalf("expected young pod to be skipped, got deleted=%d", deleted) + } + if !podExists(t, cs, "duckgres-newborn-worker-11") { + t.Fatal("expected newborn pod to survive (under grace window)") + } +} + +func TestCleanupOrphanedWorkerPods_TreatsNotFoundAsSuccess(t *testing.T) { + // If two CPs both become leader for a moment during a split-brain, or the + // pod was evicted by kubelet between our List and Delete, the delete will + // return NotFound. The reconciler must treat that as success. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 50: {WorkerID: 50, State: configstore.WorkerStateRetired}, + 51: {WorkerID: 51, State: configstore.WorkerStateRetired}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-gone-worker-50", "50", 10*time.Minute) + createStrandedWorkerPod(t, cs, "duckgres-stale-worker-51", "51", 10*time.Minute) + + // Make the DELETE for worker 50 return NotFound (simulating race). + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + if da.GetName() == "duckgres-gone-worker-50" { + return true, nil, k8serrors.NewNotFound(corev1.Resource("pods"), da.GetName()) + } + return false, nil, nil + }) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 2 { + t.Fatalf("expected 2 pods deleted (NotFound counts as success), got %d", deleted) + } + if podExists(t, cs, "duckgres-stale-worker-51") { + t.Fatal("expected worker 51's pod to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_IgnoresNonWorkerPods(t *testing.T) { + // Only pods carrying the duckgres-worker app label should be considered. + // This guards against accidentally reaping CP pods or other workloads + // that happened to be scheduled into the duckgres namespace. + store := &captureRuntimeWorkerStore{} + pool, cs := strandedReconcilerPool(t, store) + // Non-worker pod (missing app=duckgres-worker label) — must be ignored. + _, err := cs.CoreV1().Pods("default").Create(context.Background(), &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-other-pod", + Namespace: "default", + CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour)), + Labels: map[string]string{"app": "something-else"}, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create other pod: %v", err) + } + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 0 { + t.Fatalf("expected no deletions, got %d", deleted) + } + if !podExists(t, cs, "some-other-pod") { + t.Fatal("non-worker pod must survive reconciliation") + } +} + +// --- ShutdownAll draining-chain tests --- +// +// ShutdownAll is called when the CP pod receives SIGTERM from Kubernetes. The +// old implementation marked each worker retired in the DB and then fire-and- +// forget deleted the pod — on delete failure the DB row moved on but the pod +// survived forever (ListOrphanedWorkers excludes terminal states). These +// tests pin the new 3-step chain: +// +// 1. MarkWorkerDraining: atomic CAS idle/hot_idle/... → draining. Fences +// the worker against claims by other CPs (their claim queries match +// state=idle/hot_idle, which no longer applies). +// 2. K8s pod delete. +// 3. RetireDrainingWorker: atomic CAS draining → retired. Only reached on +// successful pod-delete — so on delete failure the row stays in +// draining, where ListOrphanedWorkers picks it up once the CP's +// heartbeat expires, or cleanupOrphanedWorkerPods handles it by pod +// label regardless of DB state. + +func shutdownTestPool(t *testing.T, store *captureRuntimeWorkerStore) (*K8sWorkerPool, *fake.Clientset) { + t.Helper() + pool, cs := newTestK8sPool(t, 5) + pool.runtimeStore = store + // Intercept pod deletions so the test can assert that Delete is invoked + // strictly between MarkWorkerDraining and RetireDrainingWorker. + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + store.mu.Lock() + store.recordEvent(fmt.Sprintf("delete:%s", da.GetName())) + store.mu.Unlock() + return false, nil, nil // fall through so the fake actually removes the pod + }) + return pool, cs +} + +func addShutdownWorker(t *testing.T, p *K8sWorkerPool, cs *fake.Clientset, id int) *ManagedWorker { + t.Helper() + w := &ManagedWorker{ + ID: id, + podName: fmt.Sprintf("worker-%d", id), + done: make(chan struct{}), + } + p.workers[id] = w + _, err := cs.CoreV1().Pods(p.namespace).Create(context.Background(), &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: w.podName, + Namespace: p.namespace, + Labels: map[string]string{"app": "duckgres-worker", "duckgres/worker-id": strconv.Itoa(id)}, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create pod %q: %v", w.podName, err) + } + return w +} + +func TestShutdownAll_UsesDrainingChainPerWorker(t *testing.T) { + // Per worker: MarkWorkerDraining → Delete pod → RetireDrainingWorker. + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 1) + addShutdownWorker(t, pool, cs, 2) + + pool.ShutdownAll() + + if store.markDrainingCalls != 2 { + t.Fatalf("expected 2 MarkWorkerDraining calls, got %d", store.markDrainingCalls) + } + if store.retireDrainingCalls != 2 { + t.Fatalf("expected 2 RetireDrainingWorker calls, got %d", store.retireDrainingCalls) + } + for _, reason := range store.retireDrainingReasons { + if reason != RetireReasonShutdown { + t.Fatalf("expected retire reason=%q, got %q", RetireReasonShutdown, reason) + } + } + for _, name := range []string{"worker-1", "worker-2"} { + if podExists(t, cs, name) { + t.Fatalf("expected pod %q to be deleted", name) + } + } +} + +func TestShutdownAll_DrainBeforeDeleteBeforeRetire(t *testing.T) { + // Enforces the happens-before chain for a single worker: the SQL CAS to + // draining must complete before the K8s delete, and the K8s delete must + // complete before the SQL CAS to retired. If the order were swapped, + // another CP could claim the worker mid-delete (delete → claim → fail), + // or a crash between delete and retire would leave a stranded pod that + // the orphan sweep can't see (excludes terminal states). + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 42) + + pool.ShutdownAll() + + wantSuffix := []string{"draining:42", "delete:worker-42", "retired:42"} + if len(store.events) < len(wantSuffix) { + t.Fatalf("expected at least %d events, got %d: %v", len(wantSuffix), len(store.events), store.events) + } + for i, want := range wantSuffix { + if store.events[i] != want { + t.Fatalf("event[%d] = %q, want %q (full events: %v)", i, store.events[i], want, store.events) + } + } +} + +func TestShutdownAll_SkipsWorkerWhenMarkDrainingCASMisses(t *testing.T) { + // MarkWorkerDraining returns false when the row is already terminal (e.g. + // the worker was retired on another path between list and CAS) or owned + // by a different CP. In that case there's nothing to drain, so we must + // neither delete the pod nor call RetireDrainingWorker (which would + // transition from a state that isn't draining, never matching). + store := &captureRuntimeWorkerStore{ + markDrainingMisses: map[int]bool{99: true}, + } + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 99) + addShutdownWorker(t, pool, cs, 100) + + pool.ShutdownAll() + + // Worker 99 should be skipped entirely after the CAS miss: no pod delete, + // no RetireDrainingWorker call. Worker 100 should proceed normally. + for _, event := range store.events { + if event == "delete:worker-99" { + t.Fatal("expected no pod delete for worker 99 (MarkDraining CAS missed)") + } + } + for _, id := range store.retireDrainingCalledIDs { + if id == 99 { + t.Fatal("expected no RetireDrainingWorker for worker 99 after CAS miss") + } + } + if !podExists(t, cs, "worker-99") { + t.Fatal("expected pod worker-99 to survive — its DB row wasn't owned by us") + } + if podExists(t, cs, "worker-100") { + t.Fatal("expected worker-100 pod to be deleted") + } +} + +func TestShutdownAll_LeavesInDrainingWhenPodDeleteFails(t *testing.T) { + // On pod-delete failure the worker row stays in draining. That's the + // signal for recovery paths: + // - Once this CP's heartbeat expires, ListOrphanedWorkers picks up + // draining rows owned by expired CPs and retires them. + // - cleanupOrphanedWorkerPods sees the pod by label and deletes it + // regardless of DB state. + // What we must NOT do is call RetireDrainingWorker, since that would + // clear the signal and let a stranded pod linger indefinitely. + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 7) + // Make the Delete fail with a non-NotFound error. + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + if da.GetName() == "worker-7" { + return true, nil, errors.New("api server timeout") + } + return false, nil, nil + }) + + pool.ShutdownAll() + + if store.markDrainingCalls != 1 { + t.Fatalf("expected 1 MarkDraining call, got %d", store.markDrainingCalls) + } + if store.retireDrainingCalls != 0 { + t.Fatalf("expected no RetireDrainingWorker call after delete failure, got %d", store.retireDrainingCalls) + } +} + +func TestShutdownAll_TreatsPodNotFoundAsDeleteSuccess(t *testing.T) { + // NotFound means another actor already removed the pod (node eviction, + // a racing CP during split-brain, manual kubectl delete). The state + // machine effectively reached "pod gone", so we should proceed to the + // final retire CAS rather than leaving the worker pinned in draining. + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 8) + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + if da.GetName() == "worker-8" { + return true, nil, k8serrors.NewNotFound(corev1.Resource("pods"), da.GetName()) + } + return false, nil, nil + }) + + pool.ShutdownAll() + + if store.retireDrainingCalls != 1 { + t.Fatalf("expected RetireDrainingWorker even when delete returned NotFound, got calls=%d", store.retireDrainingCalls) + } +} + // --- Node-age-aware scheduling tests --- // // These cover the two places the pool uses `nodeFirstSeen`: picking the next diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index bbf31e5..32a5802 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -245,6 +245,13 @@ func SetupMultiTenant( defer cancel() router.sharedPool.RetireOneMismatchedVersionWorker(ctx) } + janitor.cleanupOrphanedWorkerPods = func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if n := router.sharedPool.cleanupOrphanedWorkerPods(ctx, 2*time.Minute); n > 0 { + slog.Info("Stranded worker pods reconciled.", "count", n) + } + } janitorLeader, err := NewJanitorLeaderManager(namespace, cpInstanceID, janitor) if err != nil { return nil, nil, nil, nil, nil, err diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 7f249f0..8afe6d4 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -82,6 +82,8 @@ type RuntimeWorkerStore interface { GetWorkerRecord(workerID int) (*configstore.WorkerRecord, error) TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*configstore.WorkerRecord, error) RetireIdleWorker(workerID int, reason string) (bool, error) + MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) + RetireDrainingWorker(workerID int, reason string) (bool, error) } // K8sPoolFactory creates a K8sWorkerPool. Registered at init time by the From 7e232097f61076aa646218837d89a4e9005b4c91 Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Thu, 23 Apr 2026 18:38:33 +0200 Subject: [PATCH 2/2] configstore: GetWorkerRecord returns (nil, nil) for missing rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old implementation wrapped gorm.ErrRecordNotFound as a plain error, which made the "no DB row at all" branch of cleanupOrphanedWorkerPods unreachable in production — the reconciler would log and skip on every tick, so stranded pods with no matching worker_records row would never be cleaned up. The captureRuntimeWorkerStore fake returned (nil, nil) for absent rows, so the unit tests papered over the mismatch. Fix: treat ErrRecordNotFound as a normal state (common Go idiom, cf. sql.ErrNoRows), matching the fake and the reconciler's contract. Other DB errors still propagate. Added a postgres-backed test pinning the contract so future changes can't silently regress the reconciler's missing-row recovery path. GetWorkerRecord has exactly one non-test caller today, so this contract change has no other blast radius. --- controlplane/configstore/store.go | 12 ++++++++++-- .../configstore/runtime_store_postgres_test.go | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 6493de1..f9560ca 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -505,10 +505,18 @@ func (cs *ConfigStore) ListWorkerRecordsByStatesBefore(states []WorkerState, upd return workers, nil } -// GetWorkerRecord returns a runtime worker row by worker id. +// GetWorkerRecord returns a runtime worker row by worker id. Returns +// (nil, nil) when no row matches — "not found" is a normal state for +// callers like cleanupOrphanedWorkerPods that need to distinguish between +// a known terminal row and no row at all. Any other DB error is wrapped +// and returned so callers can log and retry on the next tick. func (cs *ConfigStore) GetWorkerRecord(workerID int) (*WorkerRecord, error) { var record WorkerRecord - if err := cs.db.Table(cs.runtimeTable(record.TableName())).First(&record, "worker_id = ?", workerID).Error; err != nil { + err := cs.db.Table(cs.runtimeTable(record.TableName())).First(&record, "worker_id = ?", workerID).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } return nil, fmt.Errorf("get worker record: %w", err) } return &record, nil diff --git a/tests/configstore/runtime_store_postgres_test.go b/tests/configstore/runtime_store_postgres_test.go index e5bc80d..d502dff 100644 --- a/tests/configstore/runtime_store_postgres_test.go +++ b/tests/configstore/runtime_store_postgres_test.go @@ -218,6 +218,24 @@ func TestClaimIdleWorkerRespectsOrgCapPostgres(t *testing.T) { } } +func TestGetWorkerRecordReturnsNilNilForMissingRow(t *testing.T) { + // cleanupOrphanedWorkerPods in k8s_pool.go treats (nil, nil) as "no DB + // row — this pod is fully orphaned and safe to delete" and a non-nil + // error as "skip this tick and retry." If GetWorkerRecord wrapped + // gorm.ErrRecordNotFound as an error, the missing-row branch of the + // reconciler would be unreachable in production and stranded pods with + // no DB row would never be cleaned up. This test pins the contract. + store := newIsolatedConfigStore(t) + + record, err := store.GetWorkerRecord(99999) + if err != nil { + t.Fatalf("expected (nil, nil) for missing row, got err=%v", err) + } + if record != nil { + t.Fatalf("expected nil record for missing row, got %#v", record) + } +} + func TestExpireControlPlaneInstancesPostgres(t *testing.T) { store := newIsolatedConfigStore(t)