Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (

// Cmd is the root "funnel kubernetes" command.
var Cmd = &cobra.Command{
Use: "kubernetes",
Short: "Funnel Kubernetes management commands.",
Use: "kubernetes",
Aliases: []string{"k8s"},
Short: "Funnel Kubernetes management commands.",
}

func init() {
Expand Down Expand Up @@ -67,17 +68,27 @@ is decoupled from the server lifecycle and multiple replicas do not race.`,
return fmt.Errorf("opening database: %v", err)
}

// Build the K8s backend (connects to the cluster via in-cluster config).
// We pass a no-op event writer since this command only deletes resources
// and never needs to emit task state events.
backend, err := k8sbackend.NewBackend(ctx, conf, reader, &events.Logger{Log: log}, log)
// Build a cleanup-only K8s backend. This connects to the same cluster as
// the running Funnel Server (via in-cluster config) but, unlike the full
// server backend, does not require a worker job template and does not start
// the reconcile goroutine — cleanup only reads the database and deletes
// orphaned resources. The event writer just logs, since cleanup never emits
// task state events.
backend, err := k8sbackend.NewCleanupBackend(conf, reader, &events.Logger{Log: log}, log)
if err != nil {
return fmt.Errorf("initializing kubernetes backend: %v", err)
}

log.Info("Starting orphaned resource cleanup",
"namespace", conf.Kubernetes.JobsNamespace)

// Delete Funnel-managed resources whose task is gone or terminal.
//
// TODO: Once the modular reconciler lands (calypr/funnel#1438, #1410)
// this should call the full single-pass reconcile (reconcileOnce) so the
// CronJob also reconciles task/job state, not just orphaned resources.
backend.CleanOrphanedResources(ctx)

log.Info("Orphaned resource cleanup complete")
return nil
},
Expand Down
47 changes: 47 additions & 0 deletions compute/kubernetes/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,41 @@ func NewBackend(ctx context.Context, conf *config.Config, reader tes.ReadOnlySer
return b, nil
}

// NewCleanupBackend returns a Backend suitable for one-shot resource cleanup
// (e.g. the "funnel kubernetes cleanup" command run as a CronJob).
//
// Unlike NewBackend, it does not require a worker job template and does not start
// the reconcile goroutine: cleanup only reads the database and deletes orphaned
// Kubernetes resources, so the task-submission configuration is irrelevant. Like
// the server backend, it connects to the same cluster via in-cluster config and
// is intended to run inside the cluster (e.g. as a CronJob).
func NewCleanupBackend(conf *config.Config, reader tes.ReadOnlyServer, writer events.Writer, log *logger.Logger) (*Backend, error) {
// Resolve the namespace cleanup should operate in. Prefer the configured
// namespace; otherwise fall back to the pod's ServiceAccount namespace.
if conf.Kubernetes.JobsNamespace == "" {
conf.Kubernetes.JobsNamespace = conf.Kubernetes.Namespace
}
if conf.Kubernetes.JobsNamespace == "" {
conf.Kubernetes.JobsNamespace = k8sutil.InClusterNamespace()
}
if conf.Kubernetes.JobsNamespace == "" {
return nil, fmt.Errorf("could not determine kubernetes namespace; set Kubernetes.Namespace in config")
}

clientset, err := k8sutil.NewK8sClient(conf)
if err != nil {
return nil, fmt.Errorf("creating kubernetes client: %v", err)
}

return &Backend{
client: clientset,
event: writer,
database: reader,
log: log,
conf: conf,
}, nil
}

func (b Backend) CheckBackendParameterSupport(task *tes.Task) error {
if !task.Resources.GetBackendParametersStrict() {
return nil
Expand Down Expand Up @@ -951,6 +986,18 @@ func (b *Backend) isResourceCleanupNeeded(ctx context.Context, taskID string) (b
}
}

// ReconcileOnce performs a single reconciliation pass over Funnel-managed Kubernetes
// resources. It is intended to be invoked by an external scheduler (e.g. a Kubernetes
// CronJob configured via the Helm chart's ReconcileRate value) so that reconciliation
// is decoupled from the Funnel server lifecycle and multiple server replicas do not
// race to reconcile the same resources.
//
// This is currently a stub that simply logs; the full reconciliation logic is added in
// https://github.com/calypr/funnel/pull/1438.
func (b *Backend) ReconcileOnce() {
b.log.Info("Reconciling!")
}

// CleanOrphanedResources deletes any Funnel-managed Kubernetes resources that are not associated
// with an active task in the database.
//
Expand Down
26 changes: 21 additions & 5 deletions util/k8sutil/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ package k8sutil

import (
"fmt"
"os"
"strings"

"github.com/ohsu-comp-bio/funnel/config"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// NewK8sClient returns a new Kubernetes client.
func NewK8sClient(conf *config.Config) (*kubernetes.Clientset, error) {
var kubeconfig *rest.Config
var err error
// inClusterNamespaceFile is the path to the namespace file that Kubernetes
// mounts into every pod via its ServiceAccount.
const inClusterNamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

kubeconfig, err = rest.InClusterConfig()
// NewK8sClient returns a new Kubernetes client using in-cluster configuration.
// Funnel's Kubernetes commands (server and cleanup CronJob alike) are designed to
// run inside the cluster, so only in-cluster config is supported.
func NewK8sClient(conf *config.Config) (*kubernetes.Clientset, error) {
kubeconfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("building in-cluster kubeconfig: %v", err)
}
Expand All @@ -25,3 +30,14 @@ func NewK8sClient(conf *config.Config) (*kubernetes.Clientset, error) {

return clientset, nil
}

// InClusterNamespace returns the namespace the current pod's ServiceAccount is
// bound to, read from the file Kubernetes mounts into every pod. It returns an
// empty string when not running inside a cluster.
func InClusterNamespace() string {
data, err := os.ReadFile(inClusterNamespaceFile)
if err != nil {
return ""
}
return strings.TrimSpace(string(data))
}
Loading