From c9c818b5dcfe1b87d8f46003bd466ab22724eaaf Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Wed, 24 Jun 2026 13:27:57 -0700 Subject: [PATCH 1/2] feat: Add initial `funnel k8s cleanup` command Signed-off-by: Liam Beckman --- cmd/kubernetes/kubernetes.go | 5 ++++- compute/kubernetes/backend.go | 12 ++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 9f7e82811..b2b013c1c 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -67,6 +67,7 @@ is decoupled from the server lifecycle and multiple replicas do not race.`, return fmt.Errorf("opening database: %v", err) } + // TODO: Why are we building a new backend here? // Build the K8s backend (connects to the cluster via in-cluster config). // We pass a no-op event writer since this command only deletes resources // and never needs to emit task state events. @@ -77,7 +78,9 @@ is decoupled from the server lifecycle and multiple replicas do not race.`, log.Info("Starting orphaned resource cleanup", "namespace", conf.Kubernetes.JobsNamespace) - backend.CleanOrphanedResources(ctx) + + backend.ReconcileOnce() + // backend.CleanOrphanedResources(ctx) log.Info("Orphaned resource cleanup complete") return nil }, diff --git a/compute/kubernetes/backend.go b/compute/kubernetes/backend.go index c51ee6b6e..c94aff3a5 100644 --- a/compute/kubernetes/backend.go +++ b/compute/kubernetes/backend.go @@ -951,6 +951,18 @@ func (b *Backend) isResourceCleanupNeeded(ctx context.Context, taskID string) (b } } +// ReconcileOnce performs a single reconciliation pass over Funnel-managed Kubernetes +// resources. It is intended to be invoked by an external scheduler (e.g. a Kubernetes +// CronJob configured via the Helm chart's ReconcileRate value) so that reconciliation +// is decoupled from the Funnel server lifecycle and multiple server replicas do not +// race to reconcile the same resources. +// +// This is currently a stub that simply logs; the full reconciliation logic is added in +// https://github.com/calypr/funnel/pull/1438. +func (b *Backend) ReconcileOnce() { + b.log.Info("Reconciling!") +} + // CleanOrphanedResources deletes any Funnel-managed Kubernetes resources that are not associated // with an active task in the database. // From bda12c3ad5c0f8a4b54c635affe057fe555694df Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 26 Jun 2026 16:18:18 -0700 Subject: [PATCH 2/2] feat: Update command --- cmd/kubernetes/kubernetes.go | 26 +++++++++++++++++--------- compute/kubernetes/backend.go | 35 +++++++++++++++++++++++++++++++++++ util/k8sutil/k8s.go | 26 +++++++++++++++++++++----- 3 files changed, 73 insertions(+), 14 deletions(-) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index b2b013c1c..38589897a 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -27,8 +27,9 @@ import ( // Cmd is the root "funnel kubernetes" command. var Cmd = &cobra.Command{ - Use: "kubernetes", - Short: "Funnel Kubernetes management commands.", + Use: "kubernetes", + Aliases: []string{"k8s"}, + Short: "Funnel Kubernetes management commands.", } func init() { @@ -67,11 +68,13 @@ is decoupled from the server lifecycle and multiple replicas do not race.`, return fmt.Errorf("opening database: %v", err) } - // TODO: Why are we building a new backend here? - // Build the K8s backend (connects to the cluster via in-cluster config). - // We pass a no-op event writer since this command only deletes resources - // and never needs to emit task state events. - backend, err := k8sbackend.NewBackend(ctx, conf, reader, &events.Logger{Log: log}, log) + // Build a cleanup-only K8s backend. This connects to the same cluster as + // the running Funnel Server (via in-cluster config) but, unlike the full + // server backend, does not require a worker job template and does not start + // the reconcile goroutine — cleanup only reads the database and deletes + // orphaned resources. The event writer just logs, since cleanup never emits + // task state events. + backend, err := k8sbackend.NewCleanupBackend(conf, reader, &events.Logger{Log: log}, log) if err != nil { return fmt.Errorf("initializing kubernetes backend: %v", err) } @@ -79,8 +82,13 @@ is decoupled from the server lifecycle and multiple replicas do not race.`, log.Info("Starting orphaned resource cleanup", "namespace", conf.Kubernetes.JobsNamespace) - backend.ReconcileOnce() - // backend.CleanOrphanedResources(ctx) + // Delete Funnel-managed resources whose task is gone or terminal. + // + // TODO: Once the modular reconciler lands (calypr/funnel#1438, #1410) + // this should call the full single-pass reconcile (reconcileOnce) so the + // CronJob also reconciles task/job state, not just orphaned resources. + backend.CleanOrphanedResources(ctx) + log.Info("Orphaned resource cleanup complete") return nil }, diff --git a/compute/kubernetes/backend.go b/compute/kubernetes/backend.go index c94aff3a5..2ddee926c 100644 --- a/compute/kubernetes/backend.go +++ b/compute/kubernetes/backend.go @@ -75,6 +75,41 @@ func NewBackend(ctx context.Context, conf *config.Config, reader tes.ReadOnlySer return b, nil } +// NewCleanupBackend returns a Backend suitable for one-shot resource cleanup +// (e.g. the "funnel kubernetes cleanup" command run as a CronJob). +// +// Unlike NewBackend, it does not require a worker job template and does not start +// the reconcile goroutine: cleanup only reads the database and deletes orphaned +// Kubernetes resources, so the task-submission configuration is irrelevant. Like +// the server backend, it connects to the same cluster via in-cluster config and +// is intended to run inside the cluster (e.g. as a CronJob). +func NewCleanupBackend(conf *config.Config, reader tes.ReadOnlyServer, writer events.Writer, log *logger.Logger) (*Backend, error) { + // Resolve the namespace cleanup should operate in. Prefer the configured + // namespace; otherwise fall back to the pod's ServiceAccount namespace. + if conf.Kubernetes.JobsNamespace == "" { + conf.Kubernetes.JobsNamespace = conf.Kubernetes.Namespace + } + if conf.Kubernetes.JobsNamespace == "" { + conf.Kubernetes.JobsNamespace = k8sutil.InClusterNamespace() + } + if conf.Kubernetes.JobsNamespace == "" { + return nil, fmt.Errorf("could not determine kubernetes namespace; set Kubernetes.Namespace in config") + } + + clientset, err := k8sutil.NewK8sClient(conf) + if err != nil { + return nil, fmt.Errorf("creating kubernetes client: %v", err) + } + + return &Backend{ + client: clientset, + event: writer, + database: reader, + log: log, + conf: conf, + }, nil +} + func (b Backend) CheckBackendParameterSupport(task *tes.Task) error { if !task.Resources.GetBackendParametersStrict() { return nil diff --git a/util/k8sutil/k8s.go b/util/k8sutil/k8s.go index de8d30c36..116a4bd1f 100644 --- a/util/k8sutil/k8s.go +++ b/util/k8sutil/k8s.go @@ -2,18 +2,23 @@ package k8sutil import ( "fmt" + "os" + "strings" "github.com/ohsu-comp-bio/funnel/config" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) -// NewK8sClient returns a new Kubernetes client. -func NewK8sClient(conf *config.Config) (*kubernetes.Clientset, error) { - var kubeconfig *rest.Config - var err error +// inClusterNamespaceFile is the path to the namespace file that Kubernetes +// mounts into every pod via its ServiceAccount. +const inClusterNamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - kubeconfig, err = rest.InClusterConfig() +// NewK8sClient returns a new Kubernetes client using in-cluster configuration. +// Funnel's Kubernetes commands (server and cleanup CronJob alike) are designed to +// run inside the cluster, so only in-cluster config is supported. +func NewK8sClient(conf *config.Config) (*kubernetes.Clientset, error) { + kubeconfig, err := rest.InClusterConfig() if err != nil { return nil, fmt.Errorf("building in-cluster kubeconfig: %v", err) } @@ -25,3 +30,14 @@ func NewK8sClient(conf *config.Config) (*kubernetes.Clientset, error) { return clientset, nil } + +// InClusterNamespace returns the namespace the current pod's ServiceAccount is +// bound to, read from the file Kubernetes mounts into every pod. It returns an +// empty string when not running inside a cluster. +func InClusterNamespace() string { + data, err := os.ReadFile(inClusterNamespaceFile) + if err != nil { + return "" + } + return strings.TrimSpace(string(data)) +}