diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 45e11ec..f9560ca 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -505,10 +505,18 @@ func (cs *ConfigStore) ListWorkerRecordsByStatesBefore(states []WorkerState, upd return workers, nil } -// GetWorkerRecord returns a runtime worker row by worker id. +// GetWorkerRecord returns a runtime worker row by worker id. Returns +// (nil, nil) when no row matches — "not found" is a normal state for +// callers like cleanupOrphanedWorkerPods that need to distinguish between +// a known terminal row and no row at all. Any other DB error is wrapped +// and returned so callers can log and retry on the next tick. func (cs *ConfigStore) GetWorkerRecord(workerID int) (*WorkerRecord, error) { var record WorkerRecord - if err := cs.db.Table(cs.runtimeTable(record.TableName())).First(&record, "worker_id = ?", workerID).Error; err != nil { + err := cs.db.Table(cs.runtimeTable(record.TableName())).First(&record, "worker_id = ?", workerID).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } return nil, fmt.Errorf("get worker record: %w", err) } return &record, nil @@ -666,6 +674,58 @@ func (cs *ConfigStore) RetireIdleWorker(workerID int, reason string) (bool, erro return result.RowsAffected > 0, nil } +// MarkWorkerDraining atomically transitions a worker into the draining state +// if and only if it is still owned by the caller and not already terminal. It +// returns true when the transition happened. +// +// Used by ShutdownAll to fence a worker before issuing its K8s pod delete: no +// other CP can claim the worker once it's draining (ClaimIdleWorker and +// ClaimHotIdleWorker filter on state=idle and state=hot_idle respectively), +// so the pod-delete/DB-retire chain can proceed without a claim race. If the +// CP then crashes before the final retired transition, ListOrphanedWorkers +// includes draining rows whose owner CP has expired, so orphan cleanup +// retires the worker and deletes the pod. +// +// The ownerCPInstanceID guard prevents a stale CP from moving a worker that +// has already been taken over by a successor. +func (cs *ConfigStore) MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) { + drainableStates := []WorkerState{ + WorkerStateSpawning, + WorkerStateIdle, + WorkerStateReserved, + WorkerStateActivating, + WorkerStateHot, + WorkerStateHotIdle, + } + result := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())). + Where("worker_id = ? AND owner_cp_instance_id = ? AND state IN ?", workerID, ownerCPInstanceID, drainableStates). + Updates(map[string]any{ + "state": WorkerStateDraining, + "updated_at": time.Now(), + }) + if result.Error != nil { + return false, fmt.Errorf("mark worker %d draining: %w", workerID, result.Error) + } + return result.RowsAffected > 0, nil +} + +// RetireDrainingWorker atomically transitions a draining worker to retired. +// Returns true if the transition happened, false if the worker was no longer +// in draining (e.g. already retired by an orphan sweep after a CP restart). +func (cs *ConfigStore) RetireDrainingWorker(workerID int, reason string) (bool, error) { + result := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())). + Where("worker_id = ? AND state = ?", workerID, WorkerStateDraining). + Updates(map[string]any{ + "state": WorkerStateRetired, + "retire_reason": reason, + "updated_at": time.Now(), + }) + if result.Error != nil { + return false, fmt.Errorf("retire draining worker %d: %w", workerID, result.Error) + } + return result.RowsAffected > 0, nil +} + // TakeOverWorker transfers durable worker ownership to a new control-plane // instance when the caller still has the expected prior owner_epoch. func (cs *ConfigStore) TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*WorkerRecord, error) { @@ -842,9 +902,10 @@ func (cs *ConfigStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePre // already been marked expired long enough ago to pass the orphan grace cutoff. // Retired/lost rows are deliberately excluded: their pods are either already // gone (in which case re-listing the row would loop the janitor on a 404 from -// the K8s delete forever) or were leaked when the previous CP died mid-delete, -// and that leak case is handled authoritatively by the K8s label-based startup -// scan in K8sWorkerPool.cleanupOrphanedWorkerPods. +// the K8s delete forever) or were leaked when the previous CP died mid-delete. +// That leak case is handled by the K8s label-based reconciler in +// K8sWorkerPool.cleanupOrphanedWorkerPods, which runs every janitor tick on +// the leader and deletes pods whose DB row is retired/lost or missing. func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, error) { var workers []WorkerRecord cleanupStates := []WorkerState{ diff --git a/controlplane/janitor.go b/controlplane/janitor.go index ac9fc7f..99325a8 100644 --- a/controlplane/janitor.go +++ b/controlplane/janitor.go @@ -38,6 +38,7 @@ type ControlPlaneJanitor struct { retireLocalWorker func(workerID int, reason string) bool // retires from in-memory pool + pod, returns false if not local reconcileWarmCapacity func() retireMismatchedVersionWorker func() // reaps one warm idle worker whose Deployment version differs from this CP's (leader-only) + cleanupOrphanedWorkerPods func() // deletes K8s worker pods whose DB row is terminal (retired/lost) or missing (leader-only) } func NewControlPlaneJanitor(store controlPlaneExpiryStore, interval, expiryTimeout time.Duration) *ControlPlaneJanitor { @@ -158,6 +159,14 @@ func (j *ControlPlaneJanitor) runOnce() { j.retireMismatchedVersionWorker() } + // Reconcile K8s pods against the DB state store: delete any worker pod + // whose DB row is terminal (retired/lost) or missing entirely. Catches + // pods leaked by a previous CP that died mid-shutdown (ShutdownAll marked + // the row retired before the K8s delete completed). + if j.cleanupOrphanedWorkerPods != nil { + j.cleanupOrphanedWorkerPods() + } + if j.reconcileWarmCapacity != nil { j.reconcileWarmCapacity() } diff --git a/controlplane/janitor_test.go b/controlplane/janitor_test.go index 8547c48..b593b1a 100644 --- a/controlplane/janitor_test.go +++ b/controlplane/janitor_test.go @@ -195,6 +195,61 @@ func TestControlPlaneJanitorRunReconcilesWarmCapacity(t *testing.T) { } } +func TestControlPlaneJanitorRunInvokesStrandedPodReconciler(t *testing.T) { + // Every tick the janitor must invoke the stranded-pod reconciler so that + // pods leaked by a previous CP (which marked its workers retired but + // failed to delete the K8s pod) get cleaned up automatically. + store := &captureControlPlaneExpiryStore{} + janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second) + + var mu sync.Mutex + calls := 0 + janitor.cleanupOrphanedWorkerPods = func() { + mu.Lock() + defer mu.Unlock() + calls++ + } + + janitor.runOnce() + + mu.Lock() + defer mu.Unlock() + if calls != 1 { + t.Fatalf("expected janitor to invoke stranded-pod reconciler exactly once, got %d", calls) + } +} + +func TestControlPlaneJanitorRunInvokesReconcilerAfterVersionReaper(t *testing.T) { + // Ordering matters: retireMismatchedVersionWorker retires an idle worker + // (setting its row to retired) and deletes the pod. If the pod delete + // fails, the stranded-pod reconciler in the next tick's pass must catch + // it. Running them in order within the same tick ensures a delete failure + // gets a retry within ~5s instead of waiting for the next full tick. + store := &captureControlPlaneExpiryStore{} + janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second) + + var mu sync.Mutex + var order []string + janitor.retireMismatchedVersionWorker = func() { + mu.Lock() + defer mu.Unlock() + order = append(order, "version_reap") + } + janitor.cleanupOrphanedWorkerPods = func() { + mu.Lock() + defer mu.Unlock() + order = append(order, "reconcile_stranded") + } + + janitor.runOnce() + + mu.Lock() + defer mu.Unlock() + if len(order) != 2 || order[0] != "version_reap" || order[1] != "reconcile_stranded" { + t.Fatalf("expected version_reap → reconcile_stranded ordering, got %v", order) + } +} + func TestControlPlaneJanitorRunInvokesVersionReaperBeforeReconcile(t *testing.T) { // The version-aware reaper must run before reconcileWarmCapacity in the // same tick so that a retired-this-tick worker's warm slot is replenished diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 6de1b90..9df01f4 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -266,6 +266,76 @@ func (p *K8sWorkerPool) RetireOneMismatchedVersionWorker(ctx context.Context) bo return false } +// cleanupOrphanedWorkerPods deletes worker pods whose DB row is in a terminal +// state (retired/lost) or has no DB row at all, reconciling K8s against the +// state store. Runs from the janitor loop (leader-only). +// +// This closes a gap between ShutdownAll and the janitor's orphan sweep. +// ShutdownAll marks the worker row retired in the DB before issuing the K8s +// pod delete, and the delete is fire-and-forget — so if the delete fails (API +// hiccup) or the CP is SIGKILL'd mid-shutdown, the pod survives while the DB +// row is already terminal. ListOrphanedWorkers explicitly excludes +// terminal-state rows, so without this reconciler those pods live forever. +// Bare worker pods have no owner reference, so nothing else in the cluster +// reaps them either. +// +// minAge protects newly-spawned pods: the spawn path creates the pod BEFORE +// upserting the DB row, so there's a brief window where a live pod has no DB +// record. Without the age gate the reconciler would race the spawner. +// +// Returns the number of pods deleted this call. +func (p *K8sWorkerPool) cleanupOrphanedWorkerPods(ctx context.Context, minAge time.Duration) int { + if p.runtimeStore == nil || p.clientset == nil { + return 0 + } + pods, err := p.clientset.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "app=duckgres-worker", + }) + if err != nil { + slog.Warn("Stranded-pod reconciler failed to list worker pods.", "error", err) + return 0 + } + cutoff := time.Now().Add(-minAge) + deleted := 0 + for i := range pods.Items { + pod := &pods.Items[i] + if pod.CreationTimestamp.Time.After(cutoff) { + continue + } + idStr := pod.Labels["duckgres/worker-id"] + if idStr == "" { + continue + } + workerID, err := strconv.Atoi(idStr) + if err != nil { + continue + } + rec, err := p.runtimeStore.GetWorkerRecord(workerID) + if err != nil { + slog.Warn("Stranded-pod reconciler failed to load worker record.", "worker_id", workerID, "error", err) + continue + } + dbState := "missing" + if rec != nil { + if rec.State != configstore.WorkerStateRetired && rec.State != configstore.WorkerStateLost { + continue + } + dbState = string(rec.State) + } + gracePeriod := int64(10) + if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + }); err != nil && !errors.IsNotFound(err) { + slog.Warn("Stranded-pod reconciler failed to delete pod.", "pod", pod.Name, "worker_id", workerID, "error", err) + continue + } + _ = p.deleteWorkerRPCSecret(ctx, pod.Name) + slog.Info("Stranded worker pod reconciled.", "pod", pod.Name, "worker_id", workerID, "db_state", dbState) + deleted++ + } + return deleted +} + func (p *K8sWorkerPool) resolveCPUID(ctx context.Context) error { pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.cpID, metav1.GetOptions{}) if err != nil { @@ -1800,7 +1870,25 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat } } -// ShutdownAll stops all workers by deleting their pods. +// ShutdownAll stops all workers by deleting their pods. Per worker it runs a +// 3-step CAS chain against the runtime store and K8s API: +// +// 1. MarkWorkerDraining — atomic SQL CAS from a non-terminal state to +// draining. Fences the worker against claims by other CPs: their claim +// queries match state=idle/hot_idle, which no longer apply. If the CAS +// misses (row already terminal or owned by another CP) the worker is +// skipped entirely. +// 2. K8s pod delete. Only reached after the CAS succeeds. NotFound is +// treated as success (the pod is gone by some other path). +// 3. RetireDrainingWorker — atomic SQL CAS draining → retired. Only reached +// on successful pod delete. On delete failure the row stays in +// draining, which lets ListOrphanedWorkers pick it up once the CP's +// heartbeat expires, and lets cleanupOrphanedWorkerPods delete the pod +// by label on the next janitor tick. +// +// This ordering closes the old race where the DB row was flipped to retired +// before the pod delete: if the delete failed, the pod survived forever +// because terminal-state rows are excluded from ListOrphanedWorkers. func (p *K8sWorkerPool) ShutdownAll() { p.mu.Lock() if p.shuttingDown { @@ -1810,7 +1898,6 @@ func (p *K8sWorkerPool) ShutdownAll() { p.shuttingDown = true workers := make([]*ManagedWorker, 0, len(p.workers)) for _, w := range p.workers { - p.markWorkerRetiredLocked(w, RetireReasonShutdown) workers = append(workers, w) } p.mu.Unlock() @@ -1821,15 +1908,55 @@ func (p *K8sWorkerPool) ShutdownAll() { ctx := context.Background() for _, w := range workers { podName := p.workerPodName(w) - gracePeriod := int64(10) slog.Info("Shutting down K8s worker.", "id", w.ID, "pod", podName) - _ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ + + // Step 1: CAS to draining. Skip the worker on CAS miss or error — + // there's no safe way to proceed if we don't own the row. + if p.runtimeStore != nil { + transitioned, err := p.runtimeStore.MarkWorkerDraining(w.ID, p.cpInstanceID) + if err != nil { + slog.Warn("ShutdownAll: CAS to draining failed; orphan sweep will reconcile.", + "worker_id", w.ID, "error", err) + continue + } + if !transitioned { + slog.Debug("ShutdownAll: worker not owned by us or already terminal; skipping.", + "worker_id", w.ID) + continue + } + } + + // Step 2: delete pod. Leave the row in draining on any error other + // than NotFound so recovery paths pick it up. + gracePeriod := int64(10) + if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ GracePeriodSeconds: &gracePeriod, - }) + }); err != nil && !errors.IsNotFound(err) { + slog.Warn("ShutdownAll: pod delete failed; worker left in draining for orphan sweep/reconciler.", + "id", w.ID, "pod", podName, "error", err) + continue + } _ = p.deleteWorkerRPCSecret(ctx, podName) if w.client != nil { _ = w.client.Close() } + + // Step 3: final CAS to retired. If this fails (network blip during + // shutdown) the row stays in draining and the orphan sweep handles + // it once this CP's heartbeat expires. + if p.runtimeStore != nil { + if _, err := p.runtimeStore.RetireDrainingWorker(w.ID, RetireReasonShutdown); err != nil { + slog.Warn("ShutdownAll: CAS to retired failed; orphan sweep will reconcile.", + "worker_id", w.ID, "error", err) + continue + } + } + + // In-memory lifecycle + metrics. Intentionally skips persistence + // (we've already persisted the retired state via the CAS chain). + p.mu.Lock() + p.markWorkerRetiredInMemoryLocked(w, RetireReasonShutdown) + p.mu.Unlock() } p.mu.Lock() @@ -2276,6 +2403,21 @@ func (p *K8sWorkerPool) spawnWarmWorkerBackground(id int) { } func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string) { + p.markWorkerRetiredInMemoryLocked(w, reason) + workerState := configstore.WorkerStateRetired + if reason == RetireReasonCrash { + workerState = configstore.WorkerStateLost + } + p.persistWorkerRecord(p.workerRecordFor(w.ID, w, w.OwnerEpoch(), workerState, reason, nil)) +} + +// markWorkerRetiredInMemoryLocked performs only the in-memory lifecycle +// transition and metrics bookkeeping for a worker retirement, without +// persisting to the runtime store. Used by callers that have already +// advanced the DB state via a scoped CAS (e.g. ShutdownAll's draining +// chain) and don't want an unconditional UpsertWorkerRecord to overwrite +// fields set by that CAS. +func (p *K8sWorkerPool) markWorkerRetiredInMemoryLocked(w *ManagedWorker, reason string) { lifecycle := w.SharedState().NormalizedLifecycle() if lifecycle == WorkerLifecycleHot || lifecycle == WorkerLifecycleHotIdle { observeHotWorkerSessions(w.peakSessions) @@ -2285,11 +2427,6 @@ func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string) return } _ = w.SetSharedState(nextState) - workerState := configstore.WorkerStateRetired - if reason == RetireReasonCrash { - workerState = configstore.WorkerStateLost - } - p.persistWorkerRecord(p.workerRecordFor(w.ID, w, w.OwnerEpoch(), workerState, reason, nil)) observeWorkerRetirement(reason) observeWarmPoolLifecycleGauges(p.workers) } diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index c43e9d1..678536e 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -62,6 +62,26 @@ type captureRuntimeWorkerStore struct { retireIdleCalledReasons []string retireIdleErr error retireIdleMisses map[int]bool + preloadedRecords map[int]*configstore.WorkerRecord + getRecordErrIDs map[int]error + markDrainingCalls int + markDrainingCalledIDs []int + markDrainingCalledCPs []string + markDrainingMisses map[int]bool + markDrainingErr error + retireDrainingCalls int + retireDrainingCalledIDs []int + retireDrainingReasons []string + retireDrainingMisses map[int]bool + retireDrainingErr error + // events records a unified, ordered timeline of state transitions on + // this store so tests can assert happens-before relationships (e.g. + // that pod-delete occurs between markDraining and retireDraining). + events []string +} + +func (s *captureRuntimeWorkerStore) recordEvent(evt string) { + s.events = append(s.events, evt) } func (s *captureRuntimeWorkerStore) UpsertWorkerRecord(record *configstore.WorkerRecord) error { @@ -157,6 +177,16 @@ func (s *captureRuntimeWorkerStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceI func (s *captureRuntimeWorkerStore) GetWorkerRecord(workerID int) (*configstore.WorkerRecord, error) { s.mu.Lock() defer s.mu.Unlock() + if err, ok := s.getRecordErrIDs[workerID]; ok { + return nil, err + } + if rec, ok := s.preloadedRecords[workerID]; ok { + if rec == nil { + return nil, nil + } + record := *rec + return &record, nil + } if s.claimed != nil && s.claimed.WorkerID == workerID { record := *s.claimed return &record, nil @@ -204,6 +234,38 @@ func (s *captureRuntimeWorkerStore) RetireIdleWorker(workerID int, reason string return true, nil } +func (s *captureRuntimeWorkerStore) MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.markDrainingCalls++ + s.markDrainingCalledIDs = append(s.markDrainingCalledIDs, workerID) + s.markDrainingCalledCPs = append(s.markDrainingCalledCPs, ownerCPInstanceID) + s.recordEvent(fmt.Sprintf("draining:%d", workerID)) + if s.markDrainingErr != nil { + return false, s.markDrainingErr + } + if s.markDrainingMisses[workerID] { + return false, nil + } + return true, nil +} + +func (s *captureRuntimeWorkerStore) RetireDrainingWorker(workerID int, reason string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.retireDrainingCalls++ + s.retireDrainingCalledIDs = append(s.retireDrainingCalledIDs, workerID) + s.retireDrainingReasons = append(s.retireDrainingReasons, reason) + s.recordEvent(fmt.Sprintf("retired:%d", workerID)) + if s.retireDrainingErr != nil { + return false, s.retireDrainingErr + } + if s.retireDrainingMisses[workerID] { + return false, nil + } + return true, nil +} + func newTestK8sPool(t *testing.T, maxWorkers int) (*K8sWorkerPool, *fake.Clientset) { t.Helper() cs := fake.NewSimpleClientset() @@ -2099,6 +2161,415 @@ func TestRetireOneMismatchedVersionWorker_NoopWhenCPIDHasNoHashSuffix(t *testing } } +// --- Stranded-pod reconciler tests --- +// +// cleanupOrphanedWorkerPods closes a gap left by ShutdownAll: the CP marks the +// worker row terminal (retired/lost) in the DB before issuing the K8s pod +// delete, and the delete is fire-and-forget. If the delete fails (API hiccup, +// CP SIGKILL'd mid-shutdown), the pod survives forever because: +// - ListOrphanedWorkers excludes terminal states, so orphan cleanup ignores it +// - Bare worker pods have no owner reference, so nothing else reaps them +// These tests pin the expected behavior of the K8s-label-based reconciler. + +// strandedReconcilerPool wires a K8sWorkerPool with a fake clientset and store +// for reconciler tests. Ownership labels aren't checked by the reconciler, so +// we keep the setup minimal. +func strandedReconcilerPool(t *testing.T, store RuntimeWorkerStore) (*K8sWorkerPool, *fake.Clientset) { + t.Helper() + cs := fake.NewClientset() + pool := &K8sWorkerPool{ + workers: make(map[int]*ManagedWorker), + shutdownCh: make(chan struct{}), + stopInform: make(chan struct{}), + clientset: cs, + namespace: "default", + cpID: "duckgres-new-aaaaa", + cpInstanceID: "duckgres-new-aaaaa-boot", + runtimeStore: store, + retireSem: make(chan struct{}, 5), + } + return pool, cs +} + +func createStrandedWorkerPod(t *testing.T, cs *fake.Clientset, name, workerIDLabel string, age time.Duration) { + t.Helper() + _, err := cs.CoreV1().Pods("default").Create(context.Background(), &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + CreationTimestamp: metav1.NewTime(time.Now().Add(-age)), + Labels: map[string]string{ + "app": "duckgres-worker", + "duckgres/worker-id": workerIDLabel, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create pod %q: %v", name, err) + } +} + +func podExists(t *testing.T, cs *fake.Clientset, name string) bool { + t.Helper() + _, err := cs.CoreV1().Pods("default").Get(context.Background(), name, metav1.GetOptions{}) + if err == nil { + return true + } + if k8serrors.IsNotFound(err) { + return false + } + t.Fatalf("unexpected error fetching pod %q: %v", name, err) + return false +} + +func TestCleanupOrphanedWorkerPods_DeletesPodWhenDBStateRetired(t *testing.T) { + // This is the exact prod scenario: worker's DB row is state=retired (a + // previous CP marked it during ShutdownAll) but the K8s pod survived + // because the delete failed or was interrupted. The reconciler must catch + // this and delete the pod. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 31758: {WorkerID: 31758, State: configstore.WorkerStateRetired}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-old-worker-31758", "31758", 10*time.Minute) + + deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute) + if deleted != 1 { + t.Fatalf("expected 1 pod deleted, got %d", deleted) + } + if podExists(t, cs, "duckgres-old-worker-31758") { + t.Fatal("expected stranded pod to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_DeletesPodWhenDBStateLost(t *testing.T) { + // lost is the DB state assigned when a worker is retired with reason=crash + // (see markWorkerRetiredLocked). These pods are also terminal-in-DB and + // must be reconciled. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 42: {WorkerID: 42, State: configstore.WorkerStateLost}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-lost-worker-42", "42", 10*time.Minute) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 1 { + t.Fatalf("expected 1 pod deleted, got %d", deleted) + } + if podExists(t, cs, "duckgres-lost-worker-42") { + t.Fatal("expected lost-state pod to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_DeletesPodWhenDBRecordMissing(t *testing.T) { + // No DB row exists at all for this worker-id: fully orphaned pod, likely + // from a worker row that was purged while the pod kept running. Treat it + // the same as a terminal-state pod. + store := &captureRuntimeWorkerStore{} + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-ghost-worker-99", "99", 10*time.Minute) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 1 { + t.Fatalf("expected 1 pod deleted, got %d", deleted) + } + if podExists(t, cs, "duckgres-ghost-worker-99") { + t.Fatal("expected ghost pod with no DB row to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_LeavesLivePodAlone(t *testing.T) { + // Workers in any non-terminal state (idle, reserved, activating, hot, + // hot_idle, spawning, draining) are part of the normal lifecycle — the + // reconciler must not disturb them. This test covers the common live + // state (idle). Other states follow the same code path. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 7: {WorkerID: 7, State: configstore.WorkerStateIdle}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-live-worker-7", "7", 10*time.Minute) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 0 { + t.Fatalf("expected no pods deleted for live worker, got %d", deleted) + } + if !podExists(t, cs, "duckgres-live-worker-7") { + t.Fatal("expected idle worker pod to survive reconciliation") + } +} + +func TestCleanupOrphanedWorkerPods_SkipsYoungPod(t *testing.T) { + // Spawning workers create the pod BEFORE inserting the DB row. Without a + // grace window on pod age, the reconciler would delete freshly-spawned + // pods in the ~100ms race window between pod creation and DB upsert. + store := &captureRuntimeWorkerStore{} // no record yet — newborn + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-newborn-worker-11", "11", 30*time.Second) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 0 { + t.Fatalf("expected young pod to be skipped, got deleted=%d", deleted) + } + if !podExists(t, cs, "duckgres-newborn-worker-11") { + t.Fatal("expected newborn pod to survive (under grace window)") + } +} + +func TestCleanupOrphanedWorkerPods_TreatsNotFoundAsSuccess(t *testing.T) { + // If two CPs both become leader for a moment during a split-brain, or the + // pod was evicted by kubelet between our List and Delete, the delete will + // return NotFound. The reconciler must treat that as success. + store := &captureRuntimeWorkerStore{ + preloadedRecords: map[int]*configstore.WorkerRecord{ + 50: {WorkerID: 50, State: configstore.WorkerStateRetired}, + 51: {WorkerID: 51, State: configstore.WorkerStateRetired}, + }, + } + pool, cs := strandedReconcilerPool(t, store) + createStrandedWorkerPod(t, cs, "duckgres-gone-worker-50", "50", 10*time.Minute) + createStrandedWorkerPod(t, cs, "duckgres-stale-worker-51", "51", 10*time.Minute) + + // Make the DELETE for worker 50 return NotFound (simulating race). + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + if da.GetName() == "duckgres-gone-worker-50" { + return true, nil, k8serrors.NewNotFound(corev1.Resource("pods"), da.GetName()) + } + return false, nil, nil + }) + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 2 { + t.Fatalf("expected 2 pods deleted (NotFound counts as success), got %d", deleted) + } + if podExists(t, cs, "duckgres-stale-worker-51") { + t.Fatal("expected worker 51's pod to be deleted") + } +} + +func TestCleanupOrphanedWorkerPods_IgnoresNonWorkerPods(t *testing.T) { + // Only pods carrying the duckgres-worker app label should be considered. + // This guards against accidentally reaping CP pods or other workloads + // that happened to be scheduled into the duckgres namespace. + store := &captureRuntimeWorkerStore{} + pool, cs := strandedReconcilerPool(t, store) + // Non-worker pod (missing app=duckgres-worker label) — must be ignored. + _, err := cs.CoreV1().Pods("default").Create(context.Background(), &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-other-pod", + Namespace: "default", + CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour)), + Labels: map[string]string{"app": "something-else"}, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create other pod: %v", err) + } + + if deleted := pool.cleanupOrphanedWorkerPods(context.Background(), 2*time.Minute); deleted != 0 { + t.Fatalf("expected no deletions, got %d", deleted) + } + if !podExists(t, cs, "some-other-pod") { + t.Fatal("non-worker pod must survive reconciliation") + } +} + +// --- ShutdownAll draining-chain tests --- +// +// ShutdownAll is called when the CP pod receives SIGTERM from Kubernetes. The +// old implementation marked each worker retired in the DB and then fire-and- +// forget deleted the pod — on delete failure the DB row moved on but the pod +// survived forever (ListOrphanedWorkers excludes terminal states). These +// tests pin the new 3-step chain: +// +// 1. MarkWorkerDraining: atomic CAS idle/hot_idle/... → draining. Fences +// the worker against claims by other CPs (their claim queries match +// state=idle/hot_idle, which no longer applies). +// 2. K8s pod delete. +// 3. RetireDrainingWorker: atomic CAS draining → retired. Only reached on +// successful pod-delete — so on delete failure the row stays in +// draining, where ListOrphanedWorkers picks it up once the CP's +// heartbeat expires, or cleanupOrphanedWorkerPods handles it by pod +// label regardless of DB state. + +func shutdownTestPool(t *testing.T, store *captureRuntimeWorkerStore) (*K8sWorkerPool, *fake.Clientset) { + t.Helper() + pool, cs := newTestK8sPool(t, 5) + pool.runtimeStore = store + // Intercept pod deletions so the test can assert that Delete is invoked + // strictly between MarkWorkerDraining and RetireDrainingWorker. + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + store.mu.Lock() + store.recordEvent(fmt.Sprintf("delete:%s", da.GetName())) + store.mu.Unlock() + return false, nil, nil // fall through so the fake actually removes the pod + }) + return pool, cs +} + +func addShutdownWorker(t *testing.T, p *K8sWorkerPool, cs *fake.Clientset, id int) *ManagedWorker { + t.Helper() + w := &ManagedWorker{ + ID: id, + podName: fmt.Sprintf("worker-%d", id), + done: make(chan struct{}), + } + p.workers[id] = w + _, err := cs.CoreV1().Pods(p.namespace).Create(context.Background(), &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: w.podName, + Namespace: p.namespace, + Labels: map[string]string{"app": "duckgres-worker", "duckgres/worker-id": strconv.Itoa(id)}, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create pod %q: %v", w.podName, err) + } + return w +} + +func TestShutdownAll_UsesDrainingChainPerWorker(t *testing.T) { + // Per worker: MarkWorkerDraining → Delete pod → RetireDrainingWorker. + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 1) + addShutdownWorker(t, pool, cs, 2) + + pool.ShutdownAll() + + if store.markDrainingCalls != 2 { + t.Fatalf("expected 2 MarkWorkerDraining calls, got %d", store.markDrainingCalls) + } + if store.retireDrainingCalls != 2 { + t.Fatalf("expected 2 RetireDrainingWorker calls, got %d", store.retireDrainingCalls) + } + for _, reason := range store.retireDrainingReasons { + if reason != RetireReasonShutdown { + t.Fatalf("expected retire reason=%q, got %q", RetireReasonShutdown, reason) + } + } + for _, name := range []string{"worker-1", "worker-2"} { + if podExists(t, cs, name) { + t.Fatalf("expected pod %q to be deleted", name) + } + } +} + +func TestShutdownAll_DrainBeforeDeleteBeforeRetire(t *testing.T) { + // Enforces the happens-before chain for a single worker: the SQL CAS to + // draining must complete before the K8s delete, and the K8s delete must + // complete before the SQL CAS to retired. If the order were swapped, + // another CP could claim the worker mid-delete (delete → claim → fail), + // or a crash between delete and retire would leave a stranded pod that + // the orphan sweep can't see (excludes terminal states). + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 42) + + pool.ShutdownAll() + + wantSuffix := []string{"draining:42", "delete:worker-42", "retired:42"} + if len(store.events) < len(wantSuffix) { + t.Fatalf("expected at least %d events, got %d: %v", len(wantSuffix), len(store.events), store.events) + } + for i, want := range wantSuffix { + if store.events[i] != want { + t.Fatalf("event[%d] = %q, want %q (full events: %v)", i, store.events[i], want, store.events) + } + } +} + +func TestShutdownAll_SkipsWorkerWhenMarkDrainingCASMisses(t *testing.T) { + // MarkWorkerDraining returns false when the row is already terminal (e.g. + // the worker was retired on another path between list and CAS) or owned + // by a different CP. In that case there's nothing to drain, so we must + // neither delete the pod nor call RetireDrainingWorker (which would + // transition from a state that isn't draining, never matching). + store := &captureRuntimeWorkerStore{ + markDrainingMisses: map[int]bool{99: true}, + } + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 99) + addShutdownWorker(t, pool, cs, 100) + + pool.ShutdownAll() + + // Worker 99 should be skipped entirely after the CAS miss: no pod delete, + // no RetireDrainingWorker call. Worker 100 should proceed normally. + for _, event := range store.events { + if event == "delete:worker-99" { + t.Fatal("expected no pod delete for worker 99 (MarkDraining CAS missed)") + } + } + for _, id := range store.retireDrainingCalledIDs { + if id == 99 { + t.Fatal("expected no RetireDrainingWorker for worker 99 after CAS miss") + } + } + if !podExists(t, cs, "worker-99") { + t.Fatal("expected pod worker-99 to survive — its DB row wasn't owned by us") + } + if podExists(t, cs, "worker-100") { + t.Fatal("expected worker-100 pod to be deleted") + } +} + +func TestShutdownAll_LeavesInDrainingWhenPodDeleteFails(t *testing.T) { + // On pod-delete failure the worker row stays in draining. That's the + // signal for recovery paths: + // - Once this CP's heartbeat expires, ListOrphanedWorkers picks up + // draining rows owned by expired CPs and retires them. + // - cleanupOrphanedWorkerPods sees the pod by label and deletes it + // regardless of DB state. + // What we must NOT do is call RetireDrainingWorker, since that would + // clear the signal and let a stranded pod linger indefinitely. + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 7) + // Make the Delete fail with a non-NotFound error. + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + if da.GetName() == "worker-7" { + return true, nil, errors.New("api server timeout") + } + return false, nil, nil + }) + + pool.ShutdownAll() + + if store.markDrainingCalls != 1 { + t.Fatalf("expected 1 MarkDraining call, got %d", store.markDrainingCalls) + } + if store.retireDrainingCalls != 0 { + t.Fatalf("expected no RetireDrainingWorker call after delete failure, got %d", store.retireDrainingCalls) + } +} + +func TestShutdownAll_TreatsPodNotFoundAsDeleteSuccess(t *testing.T) { + // NotFound means another actor already removed the pod (node eviction, + // a racing CP during split-brain, manual kubectl delete). The state + // machine effectively reached "pod gone", so we should proceed to the + // final retire CAS rather than leaving the worker pinned in draining. + store := &captureRuntimeWorkerStore{} + pool, cs := shutdownTestPool(t, store) + addShutdownWorker(t, pool, cs, 8) + cs.PrependReactor("delete", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + da := action.(k8stesting.DeleteAction) + if da.GetName() == "worker-8" { + return true, nil, k8serrors.NewNotFound(corev1.Resource("pods"), da.GetName()) + } + return false, nil, nil + }) + + pool.ShutdownAll() + + if store.retireDrainingCalls != 1 { + t.Fatalf("expected RetireDrainingWorker even when delete returned NotFound, got calls=%d", store.retireDrainingCalls) + } +} + // --- Node-age-aware scheduling tests --- // // These cover the two places the pool uses `nodeFirstSeen`: picking the next diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index bbf31e5..32a5802 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -245,6 +245,13 @@ func SetupMultiTenant( defer cancel() router.sharedPool.RetireOneMismatchedVersionWorker(ctx) } + janitor.cleanupOrphanedWorkerPods = func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if n := router.sharedPool.cleanupOrphanedWorkerPods(ctx, 2*time.Minute); n > 0 { + slog.Info("Stranded worker pods reconciled.", "count", n) + } + } janitorLeader, err := NewJanitorLeaderManager(namespace, cpInstanceID, janitor) if err != nil { return nil, nil, nil, nil, nil, err diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 7f249f0..8afe6d4 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -82,6 +82,8 @@ type RuntimeWorkerStore interface { GetWorkerRecord(workerID int) (*configstore.WorkerRecord, error) TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*configstore.WorkerRecord, error) RetireIdleWorker(workerID int, reason string) (bool, error) + MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) + RetireDrainingWorker(workerID int, reason string) (bool, error) } // K8sPoolFactory creates a K8sWorkerPool. Registered at init time by the diff --git a/tests/configstore/runtime_store_postgres_test.go b/tests/configstore/runtime_store_postgres_test.go index e5bc80d..d502dff 100644 --- a/tests/configstore/runtime_store_postgres_test.go +++ b/tests/configstore/runtime_store_postgres_test.go @@ -218,6 +218,24 @@ func TestClaimIdleWorkerRespectsOrgCapPostgres(t *testing.T) { } } +func TestGetWorkerRecordReturnsNilNilForMissingRow(t *testing.T) { + // cleanupOrphanedWorkerPods in k8s_pool.go treats (nil, nil) as "no DB + // row — this pod is fully orphaned and safe to delete" and a non-nil + // error as "skip this tick and retry." If GetWorkerRecord wrapped + // gorm.ErrRecordNotFound as an error, the missing-row branch of the + // reconciler would be unreachable in production and stranded pods with + // no DB row would never be cleaned up. This test pins the contract. + store := newIsolatedConfigStore(t) + + record, err := store.GetWorkerRecord(99999) + if err != nil { + t.Fatalf("expected (nil, nil) for missing row, got err=%v", err) + } + if record != nil { + t.Fatalf("expected nil record for missing row, got %#v", record) + } +} + func TestExpireControlPlaneInstancesPostgres(t *testing.T) { store := newIsolatedConfigStore(t)