Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,18 @@ func (cs *ConfigStore) ListWorkerRecordsByStatesBefore(states []WorkerState, upd
return workers, nil
}

// GetWorkerRecord returns a runtime worker row by worker id.
// GetWorkerRecord returns a runtime worker row by worker id. Returns
// (nil, nil) when no row matches — "not found" is a normal state for
// callers like cleanupOrphanedWorkerPods that need to distinguish between
// a known terminal row and no row at all. Any other DB error is wrapped
// and returned so callers can log and retry on the next tick.
func (cs *ConfigStore) GetWorkerRecord(workerID int) (*WorkerRecord, error) {
var record WorkerRecord
if err := cs.db.Table(cs.runtimeTable(record.TableName())).First(&record, "worker_id = ?", workerID).Error; err != nil {
err := cs.db.Table(cs.runtimeTable(record.TableName())).First(&record, "worker_id = ?", workerID).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, fmt.Errorf("get worker record: %w", err)
}
return &record, nil
Expand Down Expand Up @@ -666,6 +674,58 @@ func (cs *ConfigStore) RetireIdleWorker(workerID int, reason string) (bool, erro
return result.RowsAffected > 0, nil
}

// MarkWorkerDraining atomically transitions a worker into the draining state
// if and only if it is still owned by the caller and not already terminal. It
// returns true when the transition happened.
//
// Used by ShutdownAll to fence a worker before issuing its K8s pod delete: no
// other CP can claim the worker once it's draining (ClaimIdleWorker and
// ClaimHotIdleWorker filter on state=idle and state=hot_idle respectively),
// so the pod-delete/DB-retire chain can proceed without a claim race. If the
// CP then crashes before the final retired transition, ListOrphanedWorkers
// includes draining rows whose owner CP has expired, so orphan cleanup
// retires the worker and deletes the pod.
//
// The ownerCPInstanceID guard prevents a stale CP from moving a worker that
// has already been taken over by a successor.
func (cs *ConfigStore) MarkWorkerDraining(workerID int, ownerCPInstanceID string) (bool, error) {
drainableStates := []WorkerState{
WorkerStateSpawning,
WorkerStateIdle,
WorkerStateReserved,
WorkerStateActivating,
WorkerStateHot,
WorkerStateHotIdle,
}
result := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())).
Where("worker_id = ? AND owner_cp_instance_id = ? AND state IN ?", workerID, ownerCPInstanceID, drainableStates).
Updates(map[string]any{
"state": WorkerStateDraining,
"updated_at": time.Now(),
})
if result.Error != nil {
return false, fmt.Errorf("mark worker %d draining: %w", workerID, result.Error)
}
return result.RowsAffected > 0, nil
}

// RetireDrainingWorker atomically transitions a draining worker to retired.
// Returns true if the transition happened, false if the worker was no longer
// in draining (e.g. already retired by an orphan sweep after a CP restart).
func (cs *ConfigStore) RetireDrainingWorker(workerID int, reason string) (bool, error) {
result := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())).
Where("worker_id = ? AND state = ?", workerID, WorkerStateDraining).
Updates(map[string]any{
"state": WorkerStateRetired,
"retire_reason": reason,
"updated_at": time.Now(),
})
if result.Error != nil {
return false, fmt.Errorf("retire draining worker %d: %w", workerID, result.Error)
}
return result.RowsAffected > 0, nil
}

// TakeOverWorker transfers durable worker ownership to a new control-plane
// instance when the caller still has the expected prior owner_epoch.
func (cs *ConfigStore) TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*WorkerRecord, error) {
Expand Down Expand Up @@ -842,9 +902,10 @@ func (cs *ConfigStore) CreateNeutralWarmWorkerSlot(ownerCPInstanceID, podNamePre
// already been marked expired long enough ago to pass the orphan grace cutoff.
// Retired/lost rows are deliberately excluded: their pods are either already
// gone (in which case re-listing the row would loop the janitor on a 404 from
// the K8s delete forever) or were leaked when the previous CP died mid-delete,
// and that leak case is handled authoritatively by the K8s label-based startup
// scan in K8sWorkerPool.cleanupOrphanedWorkerPods.
// the K8s delete forever) or were leaked when the previous CP died mid-delete.
// That leak case is handled by the K8s label-based reconciler in
// K8sWorkerPool.cleanupOrphanedWorkerPods, which runs every janitor tick on
// the leader and deletes pods whose DB row is retired/lost or missing.
func (cs *ConfigStore) ListOrphanedWorkers(before time.Time) ([]WorkerRecord, error) {
var workers []WorkerRecord
cleanupStates := []WorkerState{
Expand Down
9 changes: 9 additions & 0 deletions controlplane/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ControlPlaneJanitor struct {
retireLocalWorker func(workerID int, reason string) bool // retires from in-memory pool + pod, returns false if not local
reconcileWarmCapacity func()
retireMismatchedVersionWorker func() // reaps one warm idle worker whose Deployment version differs from this CP's (leader-only)
cleanupOrphanedWorkerPods func() // deletes K8s worker pods whose DB row is terminal (retired/lost) or missing (leader-only)
}

func NewControlPlaneJanitor(store controlPlaneExpiryStore, interval, expiryTimeout time.Duration) *ControlPlaneJanitor {
Expand Down Expand Up @@ -158,6 +159,14 @@ func (j *ControlPlaneJanitor) runOnce() {
j.retireMismatchedVersionWorker()
}

// Reconcile K8s pods against the DB state store: delete any worker pod
// whose DB row is terminal (retired/lost) or missing entirely. Catches
// pods leaked by a previous CP that died mid-shutdown (ShutdownAll marked
// the row retired before the K8s delete completed).
if j.cleanupOrphanedWorkerPods != nil {
j.cleanupOrphanedWorkerPods()
}

if j.reconcileWarmCapacity != nil {
j.reconcileWarmCapacity()
}
Expand Down
55 changes: 55 additions & 0 deletions controlplane/janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,61 @@ func TestControlPlaneJanitorRunReconcilesWarmCapacity(t *testing.T) {
}
}

func TestControlPlaneJanitorRunInvokesStrandedPodReconciler(t *testing.T) {
// Every tick the janitor must invoke the stranded-pod reconciler so that
// pods leaked by a previous CP (which marked its workers retired but
// failed to delete the K8s pod) get cleaned up automatically.
store := &captureControlPlaneExpiryStore{}
janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second)

var mu sync.Mutex
calls := 0
janitor.cleanupOrphanedWorkerPods = func() {
mu.Lock()
defer mu.Unlock()
calls++
}

janitor.runOnce()

mu.Lock()
defer mu.Unlock()
if calls != 1 {
t.Fatalf("expected janitor to invoke stranded-pod reconciler exactly once, got %d", calls)
}
}

func TestControlPlaneJanitorRunInvokesReconcilerAfterVersionReaper(t *testing.T) {
// Ordering matters: retireMismatchedVersionWorker retires an idle worker
// (setting its row to retired) and deletes the pod. If the pod delete
// fails, the stranded-pod reconciler in the next tick's pass must catch
// it. Running them in order within the same tick ensures a delete failure
// gets a retry within ~5s instead of waiting for the next full tick.
store := &captureControlPlaneExpiryStore{}
janitor := NewControlPlaneJanitor(store, 10*time.Millisecond, 20*time.Second)

var mu sync.Mutex
var order []string
janitor.retireMismatchedVersionWorker = func() {
mu.Lock()
defer mu.Unlock()
order = append(order, "version_reap")
}
janitor.cleanupOrphanedWorkerPods = func() {
mu.Lock()
defer mu.Unlock()
order = append(order, "reconcile_stranded")
}

janitor.runOnce()

mu.Lock()
defer mu.Unlock()
if len(order) != 2 || order[0] != "version_reap" || order[1] != "reconcile_stranded" {
t.Fatalf("expected version_reap → reconcile_stranded ordering, got %v", order)
}
}

func TestControlPlaneJanitorRunInvokesVersionReaperBeforeReconcile(t *testing.T) {
// The version-aware reaper must run before reconcileWarmCapacity in the
// same tick so that a retired-this-tick worker's warm slot is replenished
Expand Down
157 changes: 147 additions & 10 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,76 @@ func (p *K8sWorkerPool) RetireOneMismatchedVersionWorker(ctx context.Context) bo
return false
}

// cleanupOrphanedWorkerPods deletes worker pods whose DB row is in a terminal
// state (retired/lost) or has no DB row at all, reconciling K8s against the
// state store. Runs from the janitor loop (leader-only).
//
// This closes a gap between ShutdownAll and the janitor's orphan sweep.
// ShutdownAll marks the worker row retired in the DB before issuing the K8s
// pod delete, and the delete is fire-and-forget — so if the delete fails (API
// hiccup) or the CP is SIGKILL'd mid-shutdown, the pod survives while the DB
// row is already terminal. ListOrphanedWorkers explicitly excludes
// terminal-state rows, so without this reconciler those pods live forever.
// Bare worker pods have no owner reference, so nothing else in the cluster
// reaps them either.
//
// minAge protects newly-spawned pods: the spawn path creates the pod BEFORE
// upserting the DB row, so there's a brief window where a live pod has no DB
// record. Without the age gate the reconciler would race the spawner.
//
// Returns the number of pods deleted this call.
func (p *K8sWorkerPool) cleanupOrphanedWorkerPods(ctx context.Context, minAge time.Duration) int {
if p.runtimeStore == nil || p.clientset == nil {
return 0
}
pods, err := p.clientset.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{
LabelSelector: "app=duckgres-worker",
})
if err != nil {
slog.Warn("Stranded-pod reconciler failed to list worker pods.", "error", err)
return 0
}
cutoff := time.Now().Add(-minAge)
deleted := 0
for i := range pods.Items {
pod := &pods.Items[i]
if pod.CreationTimestamp.Time.After(cutoff) {
continue
}
idStr := pod.Labels["duckgres/worker-id"]
if idStr == "" {
continue
}
workerID, err := strconv.Atoi(idStr)
if err != nil {
continue
}
rec, err := p.runtimeStore.GetWorkerRecord(workerID)
if err != nil {
slog.Warn("Stranded-pod reconciler failed to load worker record.", "worker_id", workerID, "error", err)
continue
}
dbState := "missing"
if rec != nil {
if rec.State != configstore.WorkerStateRetired && rec.State != configstore.WorkerStateLost {
continue
}
dbState = string(rec.State)
}
gracePeriod := int64(10)
if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
}); err != nil && !errors.IsNotFound(err) {
slog.Warn("Stranded-pod reconciler failed to delete pod.", "pod", pod.Name, "worker_id", workerID, "error", err)
continue
}
_ = p.deleteWorkerRPCSecret(ctx, pod.Name)
slog.Info("Stranded worker pod reconciled.", "pod", pod.Name, "worker_id", workerID, "db_state", dbState)
deleted++
}
return deleted
}

func (p *K8sWorkerPool) resolveCPUID(ctx context.Context) error {
pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.cpID, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1800,7 +1870,25 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat
}
}

// ShutdownAll stops all workers by deleting their pods.
// ShutdownAll stops all workers by deleting their pods. Per worker it runs a
// 3-step CAS chain against the runtime store and K8s API:
//
// 1. MarkWorkerDraining — atomic SQL CAS from a non-terminal state to
// draining. Fences the worker against claims by other CPs: their claim
// queries match state=idle/hot_idle, which no longer apply. If the CAS
// misses (row already terminal or owned by another CP) the worker is
// skipped entirely.
// 2. K8s pod delete. Only reached after the CAS succeeds. NotFound is
// treated as success (the pod is gone by some other path).
// 3. RetireDrainingWorker — atomic SQL CAS draining → retired. Only reached
// on successful pod delete. On delete failure the row stays in
// draining, which lets ListOrphanedWorkers pick it up once the CP's
// heartbeat expires, and lets cleanupOrphanedWorkerPods delete the pod
// by label on the next janitor tick.
//
// This ordering closes the old race where the DB row was flipped to retired
// before the pod delete: if the delete failed, the pod survived forever
// because terminal-state rows are excluded from ListOrphanedWorkers.
func (p *K8sWorkerPool) ShutdownAll() {
p.mu.Lock()
if p.shuttingDown {
Expand All @@ -1810,7 +1898,6 @@ func (p *K8sWorkerPool) ShutdownAll() {
p.shuttingDown = true
workers := make([]*ManagedWorker, 0, len(p.workers))
for _, w := range p.workers {
p.markWorkerRetiredLocked(w, RetireReasonShutdown)
workers = append(workers, w)
}
p.mu.Unlock()
Expand All @@ -1821,15 +1908,55 @@ func (p *K8sWorkerPool) ShutdownAll() {
ctx := context.Background()
for _, w := range workers {
podName := p.workerPodName(w)
gracePeriod := int64(10)
slog.Info("Shutting down K8s worker.", "id", w.ID, "pod", podName)
_ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{

// Step 1: CAS to draining. Skip the worker on CAS miss or error —
// there's no safe way to proceed if we don't own the row.
if p.runtimeStore != nil {
transitioned, err := p.runtimeStore.MarkWorkerDraining(w.ID, p.cpInstanceID)
if err != nil {
slog.Warn("ShutdownAll: CAS to draining failed; orphan sweep will reconcile.",
"worker_id", w.ID, "error", err)
continue
}
if !transitioned {
slog.Debug("ShutdownAll: worker not owned by us or already terminal; skipping.",
"worker_id", w.ID)
continue
}
}

// Step 2: delete pod. Leave the row in draining on any error other
// than NotFound so recovery paths pick it up.
gracePeriod := int64(10)
if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
}); err != nil && !errors.IsNotFound(err) {
slog.Warn("ShutdownAll: pod delete failed; worker left in draining for orphan sweep/reconciler.",
"id", w.ID, "pod", podName, "error", err)
continue
}
_ = p.deleteWorkerRPCSecret(ctx, podName)
if w.client != nil {
_ = w.client.Close()
}

// Step 3: final CAS to retired. If this fails (network blip during
// shutdown) the row stays in draining and the orphan sweep handles
// it once this CP's heartbeat expires.
if p.runtimeStore != nil {
if _, err := p.runtimeStore.RetireDrainingWorker(w.ID, RetireReasonShutdown); err != nil {
slog.Warn("ShutdownAll: CAS to retired failed; orphan sweep will reconcile.",
"worker_id", w.ID, "error", err)
continue
}
}

// In-memory lifecycle + metrics. Intentionally skips persistence
// (we've already persisted the retired state via the CAS chain).
p.mu.Lock()
p.markWorkerRetiredInMemoryLocked(w, RetireReasonShutdown)
p.mu.Unlock()
}

p.mu.Lock()
Expand Down Expand Up @@ -2276,6 +2403,21 @@ func (p *K8sWorkerPool) spawnWarmWorkerBackground(id int) {
}

func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string) {
p.markWorkerRetiredInMemoryLocked(w, reason)
workerState := configstore.WorkerStateRetired
if reason == RetireReasonCrash {
workerState = configstore.WorkerStateLost
}
p.persistWorkerRecord(p.workerRecordFor(w.ID, w, w.OwnerEpoch(), workerState, reason, nil))
}

// markWorkerRetiredInMemoryLocked performs only the in-memory lifecycle
// transition and metrics bookkeeping for a worker retirement, without
// persisting to the runtime store. Used by callers that have already
// advanced the DB state via a scoped CAS (e.g. ShutdownAll's draining
// chain) and don't want an unconditional UpsertWorkerRecord to overwrite
// fields set by that CAS.
func (p *K8sWorkerPool) markWorkerRetiredInMemoryLocked(w *ManagedWorker, reason string) {
lifecycle := w.SharedState().NormalizedLifecycle()
if lifecycle == WorkerLifecycleHot || lifecycle == WorkerLifecycleHotIdle {
observeHotWorkerSessions(w.peakSessions)
Expand All @@ -2285,11 +2427,6 @@ func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string)
return
}
_ = w.SetSharedState(nextState)
workerState := configstore.WorkerStateRetired
if reason == RetireReasonCrash {
workerState = configstore.WorkerStateLost
}
p.persistWorkerRecord(p.workerRecordFor(w.ID, w, w.OwnerEpoch(), workerState, reason, nil))
observeWorkerRetirement(reason)
observeWarmPoolLifecycleGauges(p.workers)
}
Expand Down
Loading
Loading