From 7d7152b1cfebca21bbea1de7c2ba6ccd13ab527a Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:23:35 -0700 Subject: [PATCH 01/11] add: dep Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- actions/k8s/app_scheme.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 actions/k8s/app_scheme.go diff --git a/actions/k8s/app_scheme.go b/actions/k8s/app_scheme.go new file mode 100644 index 0000000000..9f3d450d49 --- /dev/null +++ b/actions/k8s/app_scheme.go @@ -0,0 +1,13 @@ +package k8s + +import ( + "k8s.io/client-go/kubernetes/scheme" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" +) + +// InitAppScheme registers Knative Serving types (Service, Route, Configuration, Revision) +// into the client-go scheme so that the K8s client can manage KService CRDs. +// Must be called before creating any K8s clients that interact with apps. +func InitAppScheme() error { + return servingv1.AddToScheme(scheme.Scheme) +} From 6fe72ff603a3925e8d2c1791acaf05439bd9cb7d Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Mon, 6 Apr 2026 21:09:02 -0700 Subject: [PATCH 02/11] restructure Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- actions/k8s/app_scheme.go | 13 ------------- flytestdlib/app/db.go | 0 2 files changed, 13 deletions(-) delete mode 100644 actions/k8s/app_scheme.go create mode 100644 flytestdlib/app/db.go diff --git a/actions/k8s/app_scheme.go b/actions/k8s/app_scheme.go deleted file mode 100644 index 9f3d450d49..0000000000 --- a/actions/k8s/app_scheme.go +++ /dev/null @@ -1,13 +0,0 @@ -package k8s - -import ( - "k8s.io/client-go/kubernetes/scheme" - servingv1 "knative.dev/serving/pkg/apis/serving/v1" -) - -// InitAppScheme registers Knative Serving types (Service, Route, Configuration, Revision) -// into the client-go scheme so that the K8s client can manage KService CRDs. -// Must be called before creating any K8s clients that interact with apps. -func InitAppScheme() error { - return servingv1.AddToScheme(scheme.Scheme) -} diff --git a/flytestdlib/app/db.go b/flytestdlib/app/db.go new file mode 100644 index 0000000000..e69de29bb2 From 928ef216fe907ec8413b443a657fb22d76d3367d Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:17:47 -0700 Subject: [PATCH 03/11] wip Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/app.go | 398 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 398 insertions(+) create mode 100644 app/app.go diff --git a/app/app.go b/app/app.go new file mode 100644 index 0000000000..44cd9c49c7 --- /dev/null +++ b/app/app.go @@ -0,0 +1,398 @@ +package app + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + "google.golang.org/protobuf/proto" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/flyteorg/flyte/v2/actions/config" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" +) + +const ( + labelAppManaged = "flyte.org/app-managed" + labelProject = "flyte.org/project" + labelDomain = "flyte.org/domain" + labelAppName = "flyte.org/app-name" + + annotationSpecSHA = "flyte.org/spec-sha" + annotationAppID = "flyte.org/app-id" + + maxScaleZero = "0" + + // maxKServiceNameLen is the Kubernetes DNS label limit. + maxKServiceNameLen = 63 +) + +// AppK8sClientInterface defines the KService lifecycle operations for the App service. +type AppK8sClientInterface interface { + // Deploy creates or updates the KService for the given app. Idempotent — skips + // the update if the spec SHA annotation is unchanged. + Deploy(ctx context.Context, app *flyteapp.App) error + + // Stop scales the KService to zero by setting max-scale=0. The KService CRD + // is kept so the app can be restarted later. + Stop(ctx context.Context, appID *flyteapp.Identifier) error + + // GetStatus reads the KService and maps its conditions to a DeploymentStatus. + // Returns a Status with STOPPED if the KService does not exist. + GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) + + // List returns all apps (spec + live status) for the given project/domain scope. + List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) +} + +// AppK8sClient implements AppK8sClientInterface using controller-runtime. +type AppK8sClient struct { + k8sClient client.WithWatch + cache ctrlcache.Cache + namespace string + cfg *config.AppConfig +} + +// NewAppK8sClient creates a new AppK8sClient. +func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.AppConfig) *AppK8sClient { + return &AppK8sClient{ + k8sClient: k8sClient, + cache: cache, + namespace: cfg.Namespace, + cfg: cfg, + } +} + +// Deploy creates or updates the KService for the given app. +func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { + appID := app.GetMetadata().GetId() + name := kserviceName(appID) + + ksvc, err := c.buildKService(app) + if err != nil { + return fmt.Errorf("failed to build KService for app %s: %w", name, err) + } + + existing := &servingv1.Service{} + err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, existing) + if k8serrors.IsNotFound(err) { + if err := c.k8sClient.Create(ctx, ksvc); err != nil { + return fmt.Errorf("failed to create KService %s: %w", name, err) + } + logger.Infof(ctx, "Created KService %s/%s", c.namespace, name) + return nil + } + if err != nil { + return fmt.Errorf("failed to get KService %s: %w", name, err) + } + + // Skip update if spec has not changed. + if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { + logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", c.namespace, name) + return nil + } + + existing.Spec = ksvc.Spec + existing.Labels = ksvc.Labels + existing.Annotations = ksvc.Annotations + if err := c.k8sClient.Update(ctx, existing); err != nil { + return fmt.Errorf("failed to update KService %s: %w", name, err) + } + logger.Infof(ctx, "Updated KService %s/%s", c.namespace, name) + return nil +} + +// Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. +func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { + name := kserviceName(appID) + patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"autoscaling.knative.dev/max-scale":"0"}}}}}`) + ksvc := &servingv1.Service{} + ksvc.Name = name + ksvc.Namespace = c.namespace + if err := c.k8sClient.Patch(ctx, ksvc, client.RawPatch(types.MergePatchType, patch)); err != nil { + if k8serrors.IsNotFound(err) { + // Already stopped/deleted — treat as success. + return nil + } + return fmt.Errorf("failed to patch KService %s to stop: %w", name, err) + } + logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", c.namespace, name) + return nil +} + +// GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. +func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { + name := kserviceName(appID) + ksvc := &servingv1.Service{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, ksvc); err != nil { + if k8serrors.IsNotFound(err) { + return statusWithPhase(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, "KService not found"), nil + } + return nil, fmt.Errorf("failed to get KService %s: %w", name, err) + } + return kserviceToStatus(ksvc), nil +} + +// List returns all apps for the given project/domain by listing KServices with label selectors. +func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { + list := &servingv1.ServiceList{} + if err := c.k8sClient.List(ctx, list, + client.InNamespace(c.namespace), + client.MatchingLabels{ + labelProject: project, + labelDomain: domain, + }, + ); err != nil { + return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) + } + + apps := make([]*flyteapp.App, 0, len(list.Items)) + for i := range list.Items { + a, err := kserviceToApp(&list.Items[i]) + if err != nil { + logger.Warnf(ctx, "Skipping KService %s: failed to convert to app: %v", list.Items[i].Name, err) + continue + } + apps = append(apps, a) + } + return apps, nil +} + +// --- Helpers --- + +// kserviceName builds the KService name from an app identifier. +// Format: "{project}-{domain}-{name}", truncated to 63 chars. +func kserviceName(id *flyteapp.Identifier) string { + name := fmt.Sprintf("%s-%s-%s", id.GetProject(), id.GetDomain(), id.GetName()) + if len(name) > maxKServiceNameLen { + name = name[:maxKServiceNameLen] + } + return strings.ToLower(name) +} + +// specSHA computes a SHA256 digest of the serialized App Spec proto. +func specSHA(spec *flyteapp.Spec) (string, error) { + b, err := proto.Marshal(spec) + if err != nil { + return "", fmt.Errorf("failed to marshal spec: %w", err) + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:8]), nil // 8 bytes = 16 hex chars, enough for change detection +} + +// buildKService constructs a Knative Service manifest from an App proto. +func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, error) { + appID := app.GetMetadata().GetId() + spec := app.GetSpec() + name := kserviceName(appID) + + sha, err := specSHA(spec) + if err != nil { + return nil, err + } + + podSpec, err := buildPodSpec(spec) + if err != nil { + return nil, err + } + + templateAnnotations := buildAutoscalingAnnotations(spec, c.cfg) + + timeoutSecs := c.cfg.DefaultRequestTimeout.Seconds() + if t := spec.GetTimeouts().GetRequestTimeout(); t != nil { + timeoutSecs = t.AsDuration().Seconds() + if timeoutSecs > c.cfg.MaxRequestTimeout.Seconds() { + timeoutSecs = c.cfg.MaxRequestTimeout.Seconds() + } + } + timeoutSecsInt := int64(timeoutSecs) + + ksvc := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: c.namespace, + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: appID.GetProject(), + labelDomain: appID.GetDomain(), + labelAppName: appID.GetName(), + }, + Annotations: map[string]string{ + annotationSpecSHA: sha, + annotationAppID: fmt.Sprintf("%s/%s/%s", appID.GetProject(), appID.GetDomain(), appID.GetName()), + }, + }, + Spec: servingv1.ServiceSpec{ + ConfigurationSpec: servingv1.ConfigurationSpec{ + Template: servingv1.RevisionTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: templateAnnotations, + }, + Spec: servingv1.RevisionSpec{ + PodSpec: podSpec, + TimeoutSeconds: &timeoutSecsInt, + }, + }, + }, + }, + } + return ksvc, nil +} + +// buildPodSpec constructs a corev1.PodSpec from an App Spec. +// Supports Container payload only for now; K8sPod support can be added in a follow-up. +func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { + switch p := spec.GetAppPayload().(type) { + case *flyteapp.Spec_Container: + c := p.Container + container := corev1.Container{ + Name: "app", + Image: c.GetImage(), + Args: c.GetArgs(), + } + for _, e := range c.GetEnv() { + container.Env = append(container.Env, corev1.EnvVar{ + Name: e.GetKey(), + Value: e.GetValue(), + }) + } + return corev1.PodSpec{Containers: []corev1.Container{container}}, nil + + case *flyteapp.Spec_Pod: + // K8sPod payloads are not yet supported — the pod spec serialization + // from flyteplugins is needed for a complete implementation. + return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload is not yet supported") + + default: + return corev1.PodSpec{}, fmt.Errorf("app spec has no payload (container or pod required)") + } +} + +// buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template. +func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.AppConfig) map[string]string { + annotations := map[string]string{} + autoscaling := spec.GetAutoscaling() + if autoscaling == nil { + return annotations + } + + if r := autoscaling.GetReplicas(); r != nil { + annotations["autoscaling.knative.dev/min-scale"] = fmt.Sprintf("%d", r.GetMin()) + annotations["autoscaling.knative.dev/max-scale"] = fmt.Sprintf("%d", r.GetMax()) + } + + if m := autoscaling.GetScalingMetric(); m != nil { + switch metric := m.GetMetric().(type) { + case *flyteapp.ScalingMetric_RequestRate: + annotations["autoscaling.knative.dev/metric"] = "rps" + annotations["autoscaling.knative.dev/target"] = fmt.Sprintf("%d", metric.RequestRate.GetTargetValue()) + case *flyteapp.ScalingMetric_Concurrency: + annotations["autoscaling.knative.dev/metric"] = "concurrency" + annotations["autoscaling.knative.dev/target"] = fmt.Sprintf("%d", metric.Concurrency.GetTargetValue()) + } + } + + if p := autoscaling.GetScaledownPeriod(); p != nil { + annotations["autoscaling.knative.dev/window"] = p.AsDuration().String() + } + + return annotations +} + +// statusWithPhase builds a flyteapp.Status with a single Condition set to the given phase. +func statusWithPhase(phase flyteapp.Status_DeploymentStatus, message string) *flyteapp.Status { + return &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + { + DeploymentStatus: phase, + Message: message, + LastTransitionTime: timestamppb.Now(), + }, + }, + } +} + +// kserviceToStatus maps a KService's conditions to a flyteapp.Status proto. +func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { + var phase flyteapp.Status_DeploymentStatus + var message string + + // Check if max-scale=0 is set — explicitly stopped by the control plane. + if ann := ksvc.Spec.Template.Annotations; ann != nil { + if ann["autoscaling.knative.dev/max-scale"] == maxScaleZero { + phase = flyteapp.Status_DEPLOYMENT_STATUS_STOPPED + message = "App scaled to zero" + } + } + + if phase == flyteapp.Status_DEPLOYMENT_STATUS_UNSPECIFIED { + switch { + case ksvc.IsReady(): + phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE + case ksvc.IsFailed(): + phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED + if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { + message = c.Message + } + case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: + phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING + default: + phase = flyteapp.Status_DEPLOYMENT_STATUS_PENDING + } + } + + status := statusWithPhase(phase, message) + + // Populate ingress URL from KService route status. + if url := ksvc.Status.URL; url != nil { + status.Ingress = &flyteapp.Ingress{ + PublicUrl: url.String(), + } + } + + // Populate current replica count and K8s namespace metadata. + status.CurrentReplicas = uint32(len(ksvc.Status.Traffic)) + status.K8SMetadata = &flyteapp.K8SMetadata{ + Namespace: ksvc.Namespace, + } + + return status +} + +// kserviceToApp reconstructs a flyteapp.App from a KService by reading the +// app identifier from annotations and the live status from KService conditions. +func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { + appIDStr, ok := ksvc.Annotations[annotationAppID] + if !ok { + return nil, fmt.Errorf("KService %s missing %s annotation", ksvc.Name, annotationAppID) + } + + // annotation format: "{project}/{domain}/{name}" + parts := strings.SplitN(appIDStr, "/", 3) + if len(parts) != 3 { + return nil, fmt.Errorf("KService %s has malformed %s annotation: %q", ksvc.Name, annotationAppID, appIDStr) + } + + appID := &flyteapp.Identifier{ + Project: parts[0], + Domain: parts[1], + Name: parts[2], + } + + return &flyteapp.App{ + Metadata: &flyteapp.Meta{ + Id: appID, + }, + Status: kserviceToStatus(ksvc), + }, nil +} From 871f58dfd41c8116e8592176cbc5b4d98672ba0b Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:23:07 -0700 Subject: [PATCH 04/11] restruct Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- {app => actions/k8s}/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename {app => actions/k8s}/app.go (99%) diff --git a/app/app.go b/actions/k8s/app.go similarity index 99% rename from app/app.go rename to actions/k8s/app.go index 44cd9c49c7..b47a5bcafa 100644 --- a/app/app.go +++ b/actions/k8s/app.go @@ -1,4 +1,4 @@ -package app +package k8s import ( "context" From c27e32bb2868abb207ed09973e136d90b63e173f Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 12:44:05 -0700 Subject: [PATCH 05/11] fix Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- actions/k8s/app.go | 398 ---------------------------- app/internal/k8s/app_client.go | 145 +++++----- app/internal/k8s/app_client_test.go | 169 ++++-------- 3 files changed, 121 insertions(+), 591 deletions(-) delete mode 100644 actions/k8s/app.go diff --git a/actions/k8s/app.go b/actions/k8s/app.go deleted file mode 100644 index b47a5bcafa..0000000000 --- a/actions/k8s/app.go +++ /dev/null @@ -1,398 +0,0 @@ -package k8s - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "fmt" - "strings" - - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - servingv1 "knative.dev/serving/pkg/apis/serving/v1" - "google.golang.org/protobuf/proto" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" - ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/flyteorg/flyte/v2/actions/config" - "github.com/flyteorg/flyte/v2/flytestdlib/logger" - flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" -) - -const ( - labelAppManaged = "flyte.org/app-managed" - labelProject = "flyte.org/project" - labelDomain = "flyte.org/domain" - labelAppName = "flyte.org/app-name" - - annotationSpecSHA = "flyte.org/spec-sha" - annotationAppID = "flyte.org/app-id" - - maxScaleZero = "0" - - // maxKServiceNameLen is the Kubernetes DNS label limit. - maxKServiceNameLen = 63 -) - -// AppK8sClientInterface defines the KService lifecycle operations for the App service. -type AppK8sClientInterface interface { - // Deploy creates or updates the KService for the given app. Idempotent — skips - // the update if the spec SHA annotation is unchanged. - Deploy(ctx context.Context, app *flyteapp.App) error - - // Stop scales the KService to zero by setting max-scale=0. The KService CRD - // is kept so the app can be restarted later. - Stop(ctx context.Context, appID *flyteapp.Identifier) error - - // GetStatus reads the KService and maps its conditions to a DeploymentStatus. - // Returns a Status with STOPPED if the KService does not exist. - GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) - - // List returns all apps (spec + live status) for the given project/domain scope. - List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) -} - -// AppK8sClient implements AppK8sClientInterface using controller-runtime. -type AppK8sClient struct { - k8sClient client.WithWatch - cache ctrlcache.Cache - namespace string - cfg *config.AppConfig -} - -// NewAppK8sClient creates a new AppK8sClient. -func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.AppConfig) *AppK8sClient { - return &AppK8sClient{ - k8sClient: k8sClient, - cache: cache, - namespace: cfg.Namespace, - cfg: cfg, - } -} - -// Deploy creates or updates the KService for the given app. -func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { - appID := app.GetMetadata().GetId() - name := kserviceName(appID) - - ksvc, err := c.buildKService(app) - if err != nil { - return fmt.Errorf("failed to build KService for app %s: %w", name, err) - } - - existing := &servingv1.Service{} - err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, existing) - if k8serrors.IsNotFound(err) { - if err := c.k8sClient.Create(ctx, ksvc); err != nil { - return fmt.Errorf("failed to create KService %s: %w", name, err) - } - logger.Infof(ctx, "Created KService %s/%s", c.namespace, name) - return nil - } - if err != nil { - return fmt.Errorf("failed to get KService %s: %w", name, err) - } - - // Skip update if spec has not changed. - if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { - logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", c.namespace, name) - return nil - } - - existing.Spec = ksvc.Spec - existing.Labels = ksvc.Labels - existing.Annotations = ksvc.Annotations - if err := c.k8sClient.Update(ctx, existing); err != nil { - return fmt.Errorf("failed to update KService %s: %w", name, err) - } - logger.Infof(ctx, "Updated KService %s/%s", c.namespace, name) - return nil -} - -// Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. -func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { - name := kserviceName(appID) - patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"autoscaling.knative.dev/max-scale":"0"}}}}}`) - ksvc := &servingv1.Service{} - ksvc.Name = name - ksvc.Namespace = c.namespace - if err := c.k8sClient.Patch(ctx, ksvc, client.RawPatch(types.MergePatchType, patch)); err != nil { - if k8serrors.IsNotFound(err) { - // Already stopped/deleted — treat as success. - return nil - } - return fmt.Errorf("failed to patch KService %s to stop: %w", name, err) - } - logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", c.namespace, name) - return nil -} - -// GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. -func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { - name := kserviceName(appID) - ksvc := &servingv1.Service{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, ksvc); err != nil { - if k8serrors.IsNotFound(err) { - return statusWithPhase(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, "KService not found"), nil - } - return nil, fmt.Errorf("failed to get KService %s: %w", name, err) - } - return kserviceToStatus(ksvc), nil -} - -// List returns all apps for the given project/domain by listing KServices with label selectors. -func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { - list := &servingv1.ServiceList{} - if err := c.k8sClient.List(ctx, list, - client.InNamespace(c.namespace), - client.MatchingLabels{ - labelProject: project, - labelDomain: domain, - }, - ); err != nil { - return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) - } - - apps := make([]*flyteapp.App, 0, len(list.Items)) - for i := range list.Items { - a, err := kserviceToApp(&list.Items[i]) - if err != nil { - logger.Warnf(ctx, "Skipping KService %s: failed to convert to app: %v", list.Items[i].Name, err) - continue - } - apps = append(apps, a) - } - return apps, nil -} - -// --- Helpers --- - -// kserviceName builds the KService name from an app identifier. -// Format: "{project}-{domain}-{name}", truncated to 63 chars. -func kserviceName(id *flyteapp.Identifier) string { - name := fmt.Sprintf("%s-%s-%s", id.GetProject(), id.GetDomain(), id.GetName()) - if len(name) > maxKServiceNameLen { - name = name[:maxKServiceNameLen] - } - return strings.ToLower(name) -} - -// specSHA computes a SHA256 digest of the serialized App Spec proto. -func specSHA(spec *flyteapp.Spec) (string, error) { - b, err := proto.Marshal(spec) - if err != nil { - return "", fmt.Errorf("failed to marshal spec: %w", err) - } - sum := sha256.Sum256(b) - return hex.EncodeToString(sum[:8]), nil // 8 bytes = 16 hex chars, enough for change detection -} - -// buildKService constructs a Knative Service manifest from an App proto. -func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, error) { - appID := app.GetMetadata().GetId() - spec := app.GetSpec() - name := kserviceName(appID) - - sha, err := specSHA(spec) - if err != nil { - return nil, err - } - - podSpec, err := buildPodSpec(spec) - if err != nil { - return nil, err - } - - templateAnnotations := buildAutoscalingAnnotations(spec, c.cfg) - - timeoutSecs := c.cfg.DefaultRequestTimeout.Seconds() - if t := spec.GetTimeouts().GetRequestTimeout(); t != nil { - timeoutSecs = t.AsDuration().Seconds() - if timeoutSecs > c.cfg.MaxRequestTimeout.Seconds() { - timeoutSecs = c.cfg.MaxRequestTimeout.Seconds() - } - } - timeoutSecsInt := int64(timeoutSecs) - - ksvc := &servingv1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: c.namespace, - Labels: map[string]string{ - labelAppManaged: "true", - labelProject: appID.GetProject(), - labelDomain: appID.GetDomain(), - labelAppName: appID.GetName(), - }, - Annotations: map[string]string{ - annotationSpecSHA: sha, - annotationAppID: fmt.Sprintf("%s/%s/%s", appID.GetProject(), appID.GetDomain(), appID.GetName()), - }, - }, - Spec: servingv1.ServiceSpec{ - ConfigurationSpec: servingv1.ConfigurationSpec{ - Template: servingv1.RevisionTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: templateAnnotations, - }, - Spec: servingv1.RevisionSpec{ - PodSpec: podSpec, - TimeoutSeconds: &timeoutSecsInt, - }, - }, - }, - }, - } - return ksvc, nil -} - -// buildPodSpec constructs a corev1.PodSpec from an App Spec. -// Supports Container payload only for now; K8sPod support can be added in a follow-up. -func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { - switch p := spec.GetAppPayload().(type) { - case *flyteapp.Spec_Container: - c := p.Container - container := corev1.Container{ - Name: "app", - Image: c.GetImage(), - Args: c.GetArgs(), - } - for _, e := range c.GetEnv() { - container.Env = append(container.Env, corev1.EnvVar{ - Name: e.GetKey(), - Value: e.GetValue(), - }) - } - return corev1.PodSpec{Containers: []corev1.Container{container}}, nil - - case *flyteapp.Spec_Pod: - // K8sPod payloads are not yet supported — the pod spec serialization - // from flyteplugins is needed for a complete implementation. - return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload is not yet supported") - - default: - return corev1.PodSpec{}, fmt.Errorf("app spec has no payload (container or pod required)") - } -} - -// buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template. -func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.AppConfig) map[string]string { - annotations := map[string]string{} - autoscaling := spec.GetAutoscaling() - if autoscaling == nil { - return annotations - } - - if r := autoscaling.GetReplicas(); r != nil { - annotations["autoscaling.knative.dev/min-scale"] = fmt.Sprintf("%d", r.GetMin()) - annotations["autoscaling.knative.dev/max-scale"] = fmt.Sprintf("%d", r.GetMax()) - } - - if m := autoscaling.GetScalingMetric(); m != nil { - switch metric := m.GetMetric().(type) { - case *flyteapp.ScalingMetric_RequestRate: - annotations["autoscaling.knative.dev/metric"] = "rps" - annotations["autoscaling.knative.dev/target"] = fmt.Sprintf("%d", metric.RequestRate.GetTargetValue()) - case *flyteapp.ScalingMetric_Concurrency: - annotations["autoscaling.knative.dev/metric"] = "concurrency" - annotations["autoscaling.knative.dev/target"] = fmt.Sprintf("%d", metric.Concurrency.GetTargetValue()) - } - } - - if p := autoscaling.GetScaledownPeriod(); p != nil { - annotations["autoscaling.knative.dev/window"] = p.AsDuration().String() - } - - return annotations -} - -// statusWithPhase builds a flyteapp.Status with a single Condition set to the given phase. -func statusWithPhase(phase flyteapp.Status_DeploymentStatus, message string) *flyteapp.Status { - return &flyteapp.Status{ - Conditions: []*flyteapp.Condition{ - { - DeploymentStatus: phase, - Message: message, - LastTransitionTime: timestamppb.Now(), - }, - }, - } -} - -// kserviceToStatus maps a KService's conditions to a flyteapp.Status proto. -func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { - var phase flyteapp.Status_DeploymentStatus - var message string - - // Check if max-scale=0 is set — explicitly stopped by the control plane. - if ann := ksvc.Spec.Template.Annotations; ann != nil { - if ann["autoscaling.knative.dev/max-scale"] == maxScaleZero { - phase = flyteapp.Status_DEPLOYMENT_STATUS_STOPPED - message = "App scaled to zero" - } - } - - if phase == flyteapp.Status_DEPLOYMENT_STATUS_UNSPECIFIED { - switch { - case ksvc.IsReady(): - phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE - case ksvc.IsFailed(): - phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED - if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { - message = c.Message - } - case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: - phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING - default: - phase = flyteapp.Status_DEPLOYMENT_STATUS_PENDING - } - } - - status := statusWithPhase(phase, message) - - // Populate ingress URL from KService route status. - if url := ksvc.Status.URL; url != nil { - status.Ingress = &flyteapp.Ingress{ - PublicUrl: url.String(), - } - } - - // Populate current replica count and K8s namespace metadata. - status.CurrentReplicas = uint32(len(ksvc.Status.Traffic)) - status.K8SMetadata = &flyteapp.K8SMetadata{ - Namespace: ksvc.Namespace, - } - - return status -} - -// kserviceToApp reconstructs a flyteapp.App from a KService by reading the -// app identifier from annotations and the live status from KService conditions. -func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { - appIDStr, ok := ksvc.Annotations[annotationAppID] - if !ok { - return nil, fmt.Errorf("KService %s missing %s annotation", ksvc.Name, annotationAppID) - } - - // annotation format: "{project}/{domain}/{name}" - parts := strings.SplitN(appIDStr, "/", 3) - if len(parts) != 3 { - return nil, fmt.Errorf("KService %s has malformed %s annotation: %q", ksvc.Name, annotationAppID, appIDStr) - } - - appID := &flyteapp.Identifier{ - Project: parts[0], - Domain: parts[1], - Name: parts[2], - } - - return &flyteapp.App{ - Metadata: &flyteapp.Meta{ - Id: appID, - }, - Status: kserviceToStatus(ksvc), - }, nil -} diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index c12b9b1278..bd6b189b7d 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -18,7 +18,7 @@ import ( ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/flyteorg/flyte/v2/app/config" + "github.com/flyteorg/flyte/v2/actions/config" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" ) @@ -49,7 +49,7 @@ type AppK8sClientInterface interface { Stop(ctx context.Context, appID *flyteapp.Identifier) error // GetStatus reads the KService and maps its conditions to a DeploymentStatus. - // Returns a not-found error (checkable with k8serrors.IsNotFound) if the KService does not exist. + // Returns a Status with STOPPED if the KService does not exist. GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) // List returns all apps (spec + live status) for the given project/domain scope. @@ -74,6 +74,7 @@ type AppK8sClientInterface interface { type AppK8sClient struct { k8sClient client.WithWatch cache ctrlcache.Cache + namespace string cfg *config.AppConfig } @@ -82,20 +83,14 @@ func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *con return &AppK8sClient{ k8sClient: k8sClient, cache: cache, + namespace: cfg.Namespace, cfg: cfg, } } -// appNamespace returns the K8s namespace for a given project/domain pair. -// Follows the same convention as the Actions and Secret services: "{project}-{domain}". -func appNamespace(project, domain string) string { - return fmt.Sprintf("%s-%s", project, domain) -} - // Deploy creates or updates the KService for the given app. func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { appID := app.GetMetadata().GetId() - ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc, err := c.buildKService(app) @@ -104,12 +99,12 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { } existing := &servingv1.Service{} - err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: ns}, existing) + err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, existing) if k8serrors.IsNotFound(err) { if err := c.k8sClient.Create(ctx, ksvc); err != nil { return fmt.Errorf("failed to create KService %s: %w", name, err) } - logger.Infof(ctx, "Created KService %s/%s", ns, name) + logger.Infof(ctx, "Created KService %s/%s", c.namespace, name) return nil } if err != nil { @@ -118,7 +113,7 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { // Skip update if spec has not changed. if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { - logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", ns, name) + logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", c.namespace, name) return nil } @@ -128,18 +123,17 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { if err := c.k8sClient.Update(ctx, existing); err != nil { return fmt.Errorf("failed to update KService %s: %w", name, err) } - logger.Infof(ctx, "Updated KService %s/%s", ns, name) + logger.Infof(ctx, "Updated KService %s/%s", c.namespace, name) return nil } // Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { - ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"autoscaling.knative.dev/max-scale":"0"}}}}}`) ksvc := &servingv1.Service{} ksvc.Name = name - ksvc.Namespace = ns + ksvc.Namespace = c.namespace if err := c.k8sClient.Patch(ctx, ksvc, client.RawPatch(types.MergePatchType, patch)); err != nil { if k8serrors.IsNotFound(err) { // Already stopped/deleted — treat as success. @@ -147,38 +141,44 @@ func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) err } return fmt.Errorf("failed to patch KService %s to stop: %w", name, err) } - logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", ns, name) + logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", c.namespace, name) return nil } // Delete removes the KService CRD for the given app entirely. func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) error { - ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc := &servingv1.Service{} ksvc.Name = name - ksvc.Namespace = ns + ksvc.Namespace = c.namespace if err := c.k8sClient.Delete(ctx, ksvc); err != nil { if k8serrors.IsNotFound(err) { return nil } return fmt.Errorf("failed to delete KService %s: %w", name, err) } - logger.Infof(ctx, "Deleted KService %s/%s", ns, name) + logger.Infof(ctx, "Deleted KService %s/%s", c.namespace, name) return nil } // Watch returns a channel of WatchResponse events for KServices in the given -// project/domain scope. The channel is closed when ctx is cancelled or the -// underlying watch terminates. +// project/domain scope. Pass empty strings to watch all managed KServices. +// The channel is closed when ctx is cancelled or the underlying watch terminates. func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) { - ns := appNamespace(project, domain) + labels := client.MatchingLabels{labelAppManaged: "true"} + if project != "" { + labels[labelProject] = project + } + if domain != "" { + labels[labelDomain] = domain + } + watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, - client.InNamespace(ns), - client.MatchingLabels{labelAppManaged: "true"}, + client.InNamespace(c.namespace), + labels, ) if err != nil { - return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err) + return nil, fmt.Errorf("failed to start KService watch for %s/%s: %w", project, domain, err) } ch := make(chan *flyteapp.WatchResponse, 64) @@ -193,7 +193,7 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-cha if !ok { return } - resp := c.kserviceEventToWatchResponse(ctx, event) + resp := kserviceEventToWatchResponse(event) if resp == nil { continue } @@ -210,12 +210,12 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-cha // kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse. // Returns nil for event types that should not be forwarded (Error, Bookmark). -func (c *AppK8sClient) kserviceEventToWatchResponse(ctx context.Context, event k8swatch.Event) *flyteapp.WatchResponse { +func kserviceEventToWatchResponse(event k8swatch.Event) *flyteapp.WatchResponse { ksvc, ok := event.Object.(*servingv1.Service) if !ok { return nil } - app, err := c.kserviceToApp(ctx, ksvc) + app, err := kserviceToApp(ksvc) if err != nil { // KService is not managed by us — skip it. return nil @@ -246,33 +246,33 @@ func (c *AppK8sClient) kserviceEventToWatchResponse(ctx context.Context, event k // GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { - ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc := &servingv1.Service{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: ns}, ksvc); err != nil { + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, ksvc); err != nil { if k8serrors.IsNotFound(err) { - return nil, fmt.Errorf("KService %s not found: %w", name, err) + return statusWithPhase(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, "KService not found"), nil } return nil, fmt.Errorf("failed to get KService %s: %w", name, err) } - return c.kserviceToStatus(ctx, ksvc), nil + return kserviceToStatus(ksvc), nil } -// List returns all apps for the given project/domain by listing KServices in the -// project/domain namespace. +// List returns all apps for the given project/domain by listing KServices with label selectors. func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { - ns := appNamespace(project, domain) list := &servingv1.ServiceList{} if err := c.k8sClient.List(ctx, list, - client.InNamespace(ns), - client.MatchingLabels{labelAppManaged: "true"}, + client.InNamespace(c.namespace), + client.MatchingLabels{ + labelProject: project, + labelDomain: domain, + }, ); err != nil { return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) } apps := make([]*flyteapp.App, 0, len(list.Items)) for i := range list.Items { - a, err := c.kserviceToApp(ctx, &list.Items[i]) + a, err := kserviceToApp(&list.Items[i]) if err != nil { logger.Warnf(ctx, "Skipping KService %s: failed to convert to app: %v", list.Items[i].Name, err) continue @@ -284,25 +284,19 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*fly // --- Helpers --- -// kserviceName returns the KService name for an app. Since each app is deployed -// to its own project/domain namespace, the name only needs to be unique within -// that namespace — the app name alone suffices. -// Names are lower-cased and capped at 63 chars (K8s DNS label limit). For names -// that exceed 63 chars, the first 54 chars are kept and an 8-char SHA256 suffix -// is appended to avoid collisions between names with a long common prefix. +// kserviceName builds the KService name from an app identifier. +// Format: "{project}-{domain}-{name}", truncated to 63 chars. func kserviceName(id *flyteapp.Identifier) string { - name := strings.ToLower(id.GetName()) - if len(name) <= maxKServiceNameLen { - return name + name := fmt.Sprintf("%s-%s-%s", id.GetProject(), id.GetDomain(), id.GetName()) + if len(name) > maxKServiceNameLen { + name = name[:maxKServiceNameLen] } - sum := sha256.Sum256([]byte(name)) - suffix := hex.EncodeToString(sum[:4]) // 4 bytes = 8 hex chars - return name[:maxKServiceNameLen-9] + "-" + suffix + return strings.ToLower(name) } // specSHA computes a SHA256 digest of the serialized App Spec proto. func specSHA(spec *flyteapp.Spec) (string, error) { - b, err := proto.MarshalOptions{Deterministic: true}.Marshal(spec) + b, err := proto.Marshal(spec) if err != nil { return "", fmt.Errorf("failed to marshal spec: %w", err) } @@ -315,7 +309,6 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err appID := app.GetMetadata().GetId() spec := app.GetSpec() name := kserviceName(appID) - ns := appNamespace(appID.GetProject(), appID.GetDomain()) sha, err := specSHA(spec) if err != nil { @@ -341,7 +334,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err ksvc := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: ns, + Namespace: c.namespace, Labels: map[string]string{ labelAppManaged: "true", labelProject: appID.GetProject(), @@ -358,10 +351,6 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err Template: servingv1.RevisionTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: templateAnnotations, - Labels: map[string]string{ - labelAppManaged: "true", - labelAppName: appID.GetName(), - }, }, Spec: servingv1.RevisionSpec{ PodSpec: podSpec, @@ -448,8 +437,7 @@ func statusWithPhase(phase flyteapp.Status_DeploymentStatus, message string) *fl } // kserviceToStatus maps a KService's conditions to a flyteapp.Status proto. -// It fetches the latest ready Revision to read the accurate ActualReplicas count. -func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Service) *flyteapp.Status { +func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { var phase flyteapp.Status_DeploymentStatus var message string @@ -467,8 +455,8 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE case ksvc.IsFailed(): phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED - if condition := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); condition != nil { - message = condition.Message + if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { + message = c.Message } case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING @@ -486,15 +474,8 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser } } - // Populate current replica count from the latest ready Revision. - if revName := ksvc.Status.LatestReadyRevisionName; revName != "" { - rev := &servingv1.Revision{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: revName, Namespace: ksvc.Namespace}, rev); err == nil { - if rev.Status.ActualReplicas != nil { - status.CurrentReplicas = uint32(*rev.Status.ActualReplicas) - } - } - } + // Populate current replica count and K8s namespace metadata. + status.CurrentReplicas = uint32(len(ksvc.Status.Traffic)) status.K8SMetadata = &flyteapp.K8SMetadata{ Namespace: ksvc.Namespace, } @@ -502,13 +483,17 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser return status } -// GetReplicas lists the pods currently backing the given app. +// GetReplicas lists the pods currently backing the given app by matching +// the flyte.org/project, flyte.org/domain, and flyte.org/app-name labels. func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) { - ns := appNamespace(appID.GetProject(), appID.GetDomain()) podList := &corev1.PodList{} if err := c.k8sClient.List(ctx, podList, - client.InNamespace(ns), - client.MatchingLabels{labelAppName: appID.GetName()}, + client.InNamespace(c.namespace), + client.MatchingLabels{ + labelProject: appID.GetProject(), + labelDomain: appID.GetDomain(), + labelAppName: appID.GetName(), + }, ); err != nil { return nil, fmt.Errorf("failed to list pods for app %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) @@ -523,18 +508,16 @@ func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifi // DeleteReplica force-deletes a specific pod. Knative will schedule a replacement automatically. func (c *AppK8sClient) DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error { - appID := replicaID.GetAppId() - ns := appNamespace(appID.GetProject(), appID.GetDomain()) pod := &corev1.Pod{} pod.Name = replicaID.GetName() - pod.Namespace = ns + pod.Namespace = c.namespace if err := c.k8sClient.Delete(ctx, pod); err != nil { if k8serrors.IsNotFound(err) { return nil } - return fmt.Errorf("failed to delete pod %s/%s: %w", ns, replicaID.GetName(), err) + return fmt.Errorf("failed to delete pod %s/%s: %w", c.namespace, replicaID.GetName(), err) } - logger.Infof(ctx, "Deleted replica pod %s/%s", ns, replicaID.GetName()) + logger.Infof(ctx, "Deleted replica pod %s/%s", c.namespace, replicaID.GetName()) return nil } @@ -592,7 +575,7 @@ func podDeploymentStatus(pod *corev1.Pod) (string, string) { // kserviceToApp reconstructs a flyteapp.App from a KService by reading the // app identifier from annotations and the live status from KService conditions. -func (c *AppK8sClient) kserviceToApp(ctx context.Context, ksvc *servingv1.Service) (*flyteapp.App, error) { +func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { appIDStr, ok := ksvc.Annotations[annotationAppID] if !ok { return nil, fmt.Errorf("KService %s missing %s annotation", ksvc.Name, annotationAppID) @@ -614,6 +597,6 @@ func (c *AppK8sClient) kserviceToApp(ctx context.Context, ksvc *servingv1.Servic Metadata: &flyteapp.Meta{ Id: appID, }, - Status: c.kserviceToStatus(ctx, ksvc), + Status: kserviceToStatus(ksvc), }, nil } diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 706e21c0d1..2a7bc2c16d 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -2,8 +2,6 @@ package k8s import ( "context" - "crypto/sha256" - "encoding/hex" "testing" "time" @@ -18,12 +16,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/flyteorg/flyte/v2/app/config" + "github.com/flyteorg/flyte/v2/actions/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) -// testScheme builds a runtime.Scheme with Knative and core types registered. +// testScheme builds a runtime.Scheme with Knative types registered. func testScheme(t *testing.T) *runtime.Scheme { t.Helper() s := runtime.NewScheme() @@ -32,19 +30,6 @@ func testScheme(t *testing.T) *runtime.Scheme { return s } -// testRevision builds a Knative Revision object with a given ActualReplicas count. -func testRevision(name, namespace string, actualReplicas int32) *servingv1.Revision { - return &servingv1.Revision{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Status: servingv1.RevisionStatus{ - ActualReplicas: &actualReplicas, - }, - } -} - // testClient builds an AppK8sClient backed by a fake K8s client. func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { t.Helper() @@ -53,12 +38,15 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { WithScheme(s). WithObjects(objs...). Build() + cfg := &config.AppConfig{ + Namespace: "flyte-apps", + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + } return &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{ - DefaultRequestTimeout: 5 * time.Minute, - MaxRequestTimeout: time.Hour, - }, + namespace: cfg.Namespace, + cfg: cfg, } } @@ -91,7 +79,7 @@ func TestDeploy_Create(t *testing.T) { ksvc := &servingv1.Service{} err = c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc) + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) require.NoError(t, err) assert.Equal(t, "proj", ksvc.Labels[labelProject]) assert.Equal(t, "dev", ksvc.Labels[labelDomain]) @@ -111,7 +99,7 @@ func TestDeploy_UpdateOnSpecChange(t *testing.T) { ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) assert.Equal(t, "nginx:2.0", ksvc.Spec.Template.Spec.Containers[0].Image) } @@ -123,14 +111,14 @@ func TestDeploy_SkipUpdateWhenUnchanged(t *testing.T) { // Get initial resource version. ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) initialRV := ksvc.ResourceVersion // Deploy same spec — should be a no-op. require.NoError(t, c.Deploy(context.Background(), app)) require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) assert.Equal(t, initialRV, ksvc.ResourceVersion, "resource version should not change on no-op deploy") } @@ -144,7 +132,7 @@ func TestStop(t *testing.T) { ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) assert.Equal(t, "0", ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"]) } @@ -165,7 +153,7 @@ func TestDelete(t *testing.T) { ksvc := &servingv1.Service{} err := c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc) + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) assert.True(t, k8serrors.IsNotFound(err)) } @@ -179,9 +167,9 @@ func TestGetStatus_NotFound(t *testing.T) { c := testClient(t) id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "missing"} status, err := c.GetStatus(context.Background(), id) - require.Error(t, err) - assert.True(t, k8serrors.IsNotFound(err)) - assert.Nil(t, status) + require.NoError(t, err) + require.Len(t, status.Conditions, 1) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) } func TestGetStatus_Stopped(t *testing.T) { @@ -198,57 +186,17 @@ func TestGetStatus_Stopped(t *testing.T) { assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) } -func TestGetStatus_CurrentReplicas(t *testing.T) { - s := testScheme(t) - // Pre-populate a KService with LatestReadyRevisionName already set in status, - // and the corresponding Revision with ActualReplicas=4. - ksvc := &servingv1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "myapp", - Namespace: "proj-dev", - Labels: map[string]string{ - labelAppManaged: "true", - labelProject: "proj", - labelDomain: "dev", - labelAppName: "myapp", - }, - Annotations: map[string]string{ - annotationAppID: "proj/dev/myapp", - }, - }, - } - ksvc.Status.LatestReadyRevisionName = "myapp-00001" - - rev := testRevision("myapp-00001", "proj-dev", 4) - - fc := fake.NewClientBuilder(). - WithScheme(s). - WithObjects(ksvc, rev). - WithStatusSubresource(ksvc). - Build() - c := &AppK8sClient{ - k8sClient: fc, - cfg: &config.AppConfig{}, - } - - id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} - status, err := c.GetStatus(context.Background(), id) - require.NoError(t, err) - assert.Equal(t, uint32(4), status.CurrentReplicas) -} - func TestList(t *testing.T) { s := testScheme(t) // Pre-populate two KServices with different project labels. ksvc1 := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "app1", - Namespace: "proj-dev", + Name: "proj-dev-app1", + Namespace: "flyte-apps", Labels: map[string]string{ - labelAppManaged: "true", - labelProject: "proj", - labelDomain: "dev", - labelAppName: "app1", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app1", }, Annotations: map[string]string{ annotationAppID: "proj/dev/app1", @@ -257,13 +205,12 @@ func TestList(t *testing.T) { } ksvc2 := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "app2", - Namespace: "other-dev", + Name: "other-dev-app2", + Namespace: "flyte-apps", Labels: map[string]string{ - labelAppManaged: "true", - labelProject: "other", - labelDomain: "dev", - labelAppName: "app2", + labelProject: "other", + labelDomain: "dev", + labelAppName: "app2", }, Annotations: map[string]string{ annotationAppID: "other/dev/app2", @@ -277,7 +224,9 @@ func TestList(t *testing.T) { Build() c := &AppK8sClient{ k8sClient: fc, + namespace: "flyte-apps", cfg: &config.AppConfig{ + Namespace: "flyte-apps", DefaultRequestTimeout: 5 * time.Minute, MaxRequestTimeout: time.Hour, }, @@ -294,9 +243,11 @@ func TestGetReplicas(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "myapp-abc", - Namespace: "proj-dev", + Name: "proj-dev-myapp-abc", + Namespace: "flyte-apps", Labels: map[string]string{ + labelProject: "proj", + labelDomain: "dev", labelAppName: "myapp", }, }, @@ -310,14 +261,15 @@ func TestGetReplicas(t *testing.T) { fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{}, + namespace: "flyte-apps", + cfg: &config.AppConfig{Namespace: "flyte-apps"}, } id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} replicas, err := c.GetReplicas(context.Background(), id) require.NoError(t, err) require.Len(t, replicas, 1) - assert.Equal(t, "myapp-abc", replicas[0].Metadata.Id.Name) + assert.Equal(t, "proj-dev-myapp-abc", replicas[0].Metadata.Id.Name) assert.Equal(t, "ACTIVE", replicas[0].Status.DeploymentStatus) } @@ -325,32 +277,33 @@ func TestDeleteReplica(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "myapp-abc", - Namespace: "proj-dev", + Name: "proj-dev-myapp-abc", + Namespace: "flyte-apps", }, } fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{}, + namespace: "flyte-apps", + cfg: &config.AppConfig{Namespace: "flyte-apps"}, } replicaID := &flyteapp.ReplicaIdentifier{ AppId: &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}, - Name: "myapp-abc", + Name: "proj-dev-myapp-abc", } require.NoError(t, c.DeleteReplica(context.Background(), replicaID)) err := fc.Get(context.Background(), - client.ObjectKey{Name: "myapp-abc", Namespace: "proj-dev"}, &corev1.Pod{}) + client.ObjectKey{Name: "proj-dev-myapp-abc", Namespace: "flyte-apps"}, &corev1.Pod{}) assert.True(t, k8serrors.IsNotFound(err)) } func TestKserviceEventToWatchResponse(t *testing.T) { ksvc := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "myapp", - Namespace: "proj-dev", + Name: "proj-dev-myapp", + Namespace: "flyte-apps", Annotations: map[string]string{ annotationAppID: "proj/dev/myapp", }, @@ -369,10 +322,9 @@ func TestKserviceEventToWatchResponse(t *testing.T) { {k8swatch.Bookmark, true, ""}, } - c := testClient(t) for _, tt := range tests { t.Run(string(tt.eventType), func(t *testing.T) { - resp := c.kserviceEventToWatchResponse(context.Background(), k8swatch.Event{ + resp := kserviceEventToWatchResponse(k8swatch.Event{ Type: tt.eventType, Object: ksvc, }) @@ -397,29 +349,22 @@ func TestKserviceEventToWatchResponse(t *testing.T) { func TestKserviceName(t *testing.T) { tests := []struct { - name string - want string + project, domain, name string + want string }{ - {"myapp", "myapp"}, - {"MyApp", "myapp"}, - // v1 and v2 variants stay distinct — no truncation collision. - {"my-long-service-name-v1", "my-long-service-name-v1"}, - {"my-long-service-name-v2", "my-long-service-name-v2"}, - // Names over 63 chars get a hash suffix instead of blind truncation. + {"proj", "dev", "myapp", "proj-dev-myapp"}, + {"P", "D", "N", "p-d-n"}, + // Long name should be truncated to 63 chars. { - "this-is-a-very-long-app-name-that-exceeds-the-kubernetes-dns-label-limit", - func() string { - name := "this-is-a-very-long-app-name-that-exceeds-the-kubernetes-dns-label-limit" - sum := sha256.Sum256([]byte(name)) - return name[:54] + "-" + hex.EncodeToString(sum[:4]) - }(), + "verylongprojectname", + "verylongdomainname", + "verylongappnamethatexceedslimit", + "verylongprojectname-verylongdomainname-verylongappnamethatexcee"[:63], }, } for _, tt := range tests { - id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: tt.name} - got := kserviceName(id) - assert.Equal(t, tt.want, got) - assert.LessOrEqual(t, len(got), maxKServiceNameLen) + id := &flyteapp.Identifier{Project: tt.project, Domain: tt.domain, Name: tt.name} + assert.Equal(t, tt.want, kserviceName(id)) } } From cd34324f55c0267e9a3709a145373172a00a8f0c Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:10:02 -0700 Subject: [PATCH 06/11] move config Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/config/config.go | 3 +++ app/internal/k8s/app_client.go | 2 +- app/internal/k8s/app_client_test.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 99feec4549..f8f9caa116 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -7,6 +7,9 @@ type AppConfig struct { // Enabled controls whether the app deployment controller is started. Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + // Namespace is the K8s namespace where KService CRDs are created. + Namespace string `json:"namespace" pflag:",Namespace for app KServices"` + // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index bd6b189b7d..0850d92cf0 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -18,7 +18,7 @@ import ( ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/flyteorg/flyte/v2/actions/config" + "github.com/flyteorg/flyte/v2/app/config" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" ) diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 2a7bc2c16d..99be7ba9ec 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -16,7 +16,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/flyteorg/flyte/v2/actions/config" + "github.com/flyteorg/flyte/v2/app/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) From 7365fb6233d64dbe74f824a03a41a04f3a4fcc14 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 21:06:24 -0700 Subject: [PATCH 07/11] remove db.go Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- flytestdlib/app/db.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 flytestdlib/app/db.go diff --git a/flytestdlib/app/db.go b/flytestdlib/app/db.go deleted file mode 100644 index e69de29bb2..0000000000 From ee7a6d1365cda888fef3a2d7061c523b4176fb80 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:59:34 -0700 Subject: [PATCH 08/11] address comments Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/config/config.go | 3 - app/internal/k8s/app_client.go | 133 ++++++++++++---------- app/internal/k8s/app_client_test.go | 167 ++++++++++++++++++---------- 3 files changed, 184 insertions(+), 119 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index f8f9caa116..99feec4549 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -7,9 +7,6 @@ type AppConfig struct { // Enabled controls whether the app deployment controller is started. Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` - // Namespace is the K8s namespace where KService CRDs are created. - Namespace string `json:"namespace" pflag:",Namespace for app KServices"` - // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 0850d92cf0..c9b1baf268 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -49,7 +49,7 @@ type AppK8sClientInterface interface { Stop(ctx context.Context, appID *flyteapp.Identifier) error // GetStatus reads the KService and maps its conditions to a DeploymentStatus. - // Returns a Status with STOPPED if the KService does not exist. + // Returns a not-found error (checkable with k8serrors.IsNotFound) if the KService does not exist. GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) // List returns all apps (spec + live status) for the given project/domain scope. @@ -74,7 +74,6 @@ type AppK8sClientInterface interface { type AppK8sClient struct { k8sClient client.WithWatch cache ctrlcache.Cache - namespace string cfg *config.AppConfig } @@ -83,14 +82,20 @@ func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *con return &AppK8sClient{ k8sClient: k8sClient, cache: cache, - namespace: cfg.Namespace, cfg: cfg, } } +// appNamespace returns the K8s namespace for a given project/domain pair. +// Follows the same convention as the Actions and Secret services: "{project}-{domain}". +func appNamespace(project, domain string) string { + return fmt.Sprintf("%s-%s", project, domain) +} + // Deploy creates or updates the KService for the given app. func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { appID := app.GetMetadata().GetId() + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc, err := c.buildKService(app) @@ -99,12 +104,12 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { } existing := &servingv1.Service{} - err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, existing) + err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: ns}, existing) if k8serrors.IsNotFound(err) { if err := c.k8sClient.Create(ctx, ksvc); err != nil { return fmt.Errorf("failed to create KService %s: %w", name, err) } - logger.Infof(ctx, "Created KService %s/%s", c.namespace, name) + logger.Infof(ctx, "Created KService %s/%s", ns, name) return nil } if err != nil { @@ -113,7 +118,7 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { // Skip update if spec has not changed. if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { - logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", c.namespace, name) + logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", ns, name) return nil } @@ -123,17 +128,18 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { if err := c.k8sClient.Update(ctx, existing); err != nil { return fmt.Errorf("failed to update KService %s: %w", name, err) } - logger.Infof(ctx, "Updated KService %s/%s", c.namespace, name) + logger.Infof(ctx, "Updated KService %s/%s", ns, name) return nil } // Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"autoscaling.knative.dev/max-scale":"0"}}}}}`) ksvc := &servingv1.Service{} ksvc.Name = name - ksvc.Namespace = c.namespace + ksvc.Namespace = ns if err := c.k8sClient.Patch(ctx, ksvc, client.RawPatch(types.MergePatchType, patch)); err != nil { if k8serrors.IsNotFound(err) { // Already stopped/deleted — treat as success. @@ -141,44 +147,38 @@ func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) err } return fmt.Errorf("failed to patch KService %s to stop: %w", name, err) } - logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", c.namespace, name) + logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", ns, name) return nil } // Delete removes the KService CRD for the given app entirely. func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) error { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc := &servingv1.Service{} ksvc.Name = name - ksvc.Namespace = c.namespace + ksvc.Namespace = ns if err := c.k8sClient.Delete(ctx, ksvc); err != nil { if k8serrors.IsNotFound(err) { return nil } return fmt.Errorf("failed to delete KService %s: %w", name, err) } - logger.Infof(ctx, "Deleted KService %s/%s", c.namespace, name) + logger.Infof(ctx, "Deleted KService %s/%s", ns, name) return nil } // Watch returns a channel of WatchResponse events for KServices in the given -// project/domain scope. Pass empty strings to watch all managed KServices. -// The channel is closed when ctx is cancelled or the underlying watch terminates. +// project/domain scope. The channel is closed when ctx is cancelled or the +// underlying watch terminates. func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) { - labels := client.MatchingLabels{labelAppManaged: "true"} - if project != "" { - labels[labelProject] = project - } - if domain != "" { - labels[labelDomain] = domain - } - + ns := appNamespace(project, domain) watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, - client.InNamespace(c.namespace), - labels, + client.InNamespace(ns), + client.MatchingLabels{labelAppManaged: "true"}, ) if err != nil { - return nil, fmt.Errorf("failed to start KService watch for %s/%s: %w", project, domain, err) + return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err) } ch := make(chan *flyteapp.WatchResponse, 64) @@ -193,7 +193,7 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-cha if !ok { return } - resp := kserviceEventToWatchResponse(event) + resp := c.kserviceEventToWatchResponse(ctx, event) if resp == nil { continue } @@ -210,12 +210,12 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-cha // kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse. // Returns nil for event types that should not be forwarded (Error, Bookmark). -func kserviceEventToWatchResponse(event k8swatch.Event) *flyteapp.WatchResponse { +func (c *AppK8sClient) kserviceEventToWatchResponse(ctx context.Context, event k8swatch.Event) *flyteapp.WatchResponse { ksvc, ok := event.Object.(*servingv1.Service) if !ok { return nil } - app, err := kserviceToApp(ksvc) + app, err := c.kserviceToApp(ctx, ksvc) if err != nil { // KService is not managed by us — skip it. return nil @@ -246,33 +246,33 @@ func kserviceEventToWatchResponse(event k8swatch.Event) *flyteapp.WatchResponse // GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc := &servingv1.Service{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, ksvc); err != nil { + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: ns}, ksvc); err != nil { if k8serrors.IsNotFound(err) { - return statusWithPhase(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, "KService not found"), nil + return nil, fmt.Errorf("KService %s not found: %w", name, err) } return nil, fmt.Errorf("failed to get KService %s: %w", name, err) } - return kserviceToStatus(ksvc), nil + return c.kserviceToStatus(ctx, ksvc), nil } -// List returns all apps for the given project/domain by listing KServices with label selectors. +// List returns all apps for the given project/domain by listing KServices in the +// project/domain namespace. func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { + ns := appNamespace(project, domain) list := &servingv1.ServiceList{} if err := c.k8sClient.List(ctx, list, - client.InNamespace(c.namespace), - client.MatchingLabels{ - labelProject: project, - labelDomain: domain, - }, + client.InNamespace(ns), + client.MatchingLabels{labelAppManaged: "true"}, ); err != nil { return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) } apps := make([]*flyteapp.App, 0, len(list.Items)) for i := range list.Items { - a, err := kserviceToApp(&list.Items[i]) + a, err := c.kserviceToApp(ctx, &list.Items[i]) if err != nil { logger.Warnf(ctx, "Skipping KService %s: failed to convert to app: %v", list.Items[i].Name, err) continue @@ -284,14 +284,20 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*fly // --- Helpers --- -// kserviceName builds the KService name from an app identifier. -// Format: "{project}-{domain}-{name}", truncated to 63 chars. +// kserviceName returns the KService name for an app. Since each app is deployed +// to its own project/domain namespace, the name only needs to be unique within +// that namespace — the app name alone suffices. +// Names are lower-cased and capped at 63 chars (K8s DNS label limit). For names +// that exceed 63 chars, the first 54 chars are kept and an 8-char SHA256 suffix +// is appended to avoid collisions between names with a long common prefix. func kserviceName(id *flyteapp.Identifier) string { - name := fmt.Sprintf("%s-%s-%s", id.GetProject(), id.GetDomain(), id.GetName()) - if len(name) > maxKServiceNameLen { - name = name[:maxKServiceNameLen] + name := strings.ToLower(id.GetName()) + if len(name) <= maxKServiceNameLen { + return name } - return strings.ToLower(name) + sum := sha256.Sum256([]byte(name)) + suffix := hex.EncodeToString(sum[:4]) // 4 bytes = 8 hex chars + return name[:maxKServiceNameLen-9] + "-" + suffix } // specSHA computes a SHA256 digest of the serialized App Spec proto. @@ -309,6 +315,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err appID := app.GetMetadata().GetId() spec := app.GetSpec() name := kserviceName(appID) + ns := appNamespace(appID.GetProject(), appID.GetDomain()) sha, err := specSHA(spec) if err != nil { @@ -334,7 +341,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err ksvc := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: c.namespace, + Namespace: ns, Labels: map[string]string{ labelAppManaged: "true", labelProject: appID.GetProject(), @@ -437,7 +444,8 @@ func statusWithPhase(phase flyteapp.Status_DeploymentStatus, message string) *fl } // kserviceToStatus maps a KService's conditions to a flyteapp.Status proto. -func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { +// It fetches the latest ready Revision to read the accurate ActualReplicas count. +func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Service) *flyteapp.Status { var phase flyteapp.Status_DeploymentStatus var message string @@ -474,8 +482,15 @@ func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { } } - // Populate current replica count and K8s namespace metadata. - status.CurrentReplicas = uint32(len(ksvc.Status.Traffic)) + // Populate current replica count from the latest ready Revision. + if revName := ksvc.Status.LatestReadyRevisionName; revName != "" { + rev := &servingv1.Revision{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: revName, Namespace: ksvc.Namespace}, rev); err == nil { + if rev.Status.ActualReplicas != nil { + status.CurrentReplicas = uint32(*rev.Status.ActualReplicas) + } + } + } status.K8SMetadata = &flyteapp.K8SMetadata{ Namespace: ksvc.Namespace, } @@ -483,17 +498,13 @@ func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { return status } -// GetReplicas lists the pods currently backing the given app by matching -// the flyte.org/project, flyte.org/domain, and flyte.org/app-name labels. +// GetReplicas lists the pods currently backing the given app. func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) podList := &corev1.PodList{} if err := c.k8sClient.List(ctx, podList, - client.InNamespace(c.namespace), - client.MatchingLabels{ - labelProject: appID.GetProject(), - labelDomain: appID.GetDomain(), - labelAppName: appID.GetName(), - }, + client.InNamespace(ns), + client.MatchingLabels{labelAppName: appID.GetName()}, ); err != nil { return nil, fmt.Errorf("failed to list pods for app %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) @@ -508,16 +519,18 @@ func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifi // DeleteReplica force-deletes a specific pod. Knative will schedule a replacement automatically. func (c *AppK8sClient) DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error { + appID := replicaID.GetAppId() + ns := appNamespace(appID.GetProject(), appID.GetDomain()) pod := &corev1.Pod{} pod.Name = replicaID.GetName() - pod.Namespace = c.namespace + pod.Namespace = ns if err := c.k8sClient.Delete(ctx, pod); err != nil { if k8serrors.IsNotFound(err) { return nil } - return fmt.Errorf("failed to delete pod %s/%s: %w", c.namespace, replicaID.GetName(), err) + return fmt.Errorf("failed to delete pod %s/%s: %w", ns, replicaID.GetName(), err) } - logger.Infof(ctx, "Deleted replica pod %s/%s", c.namespace, replicaID.GetName()) + logger.Infof(ctx, "Deleted replica pod %s/%s", ns, replicaID.GetName()) return nil } @@ -575,7 +588,7 @@ func podDeploymentStatus(pod *corev1.Pod) (string, string) { // kserviceToApp reconstructs a flyteapp.App from a KService by reading the // app identifier from annotations and the live status from KService conditions. -func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { +func (c *AppK8sClient) kserviceToApp(ctx context.Context, ksvc *servingv1.Service) (*flyteapp.App, error) { appIDStr, ok := ksvc.Annotations[annotationAppID] if !ok { return nil, fmt.Errorf("KService %s missing %s annotation", ksvc.Name, annotationAppID) @@ -597,6 +610,6 @@ func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { Metadata: &flyteapp.Meta{ Id: appID, }, - Status: kserviceToStatus(ksvc), + Status: c.kserviceToStatus(ctx, ksvc), }, nil } diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 99be7ba9ec..706e21c0d1 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -2,6 +2,8 @@ package k8s import ( "context" + "crypto/sha256" + "encoding/hex" "testing" "time" @@ -21,7 +23,7 @@ import ( flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) -// testScheme builds a runtime.Scheme with Knative types registered. +// testScheme builds a runtime.Scheme with Knative and core types registered. func testScheme(t *testing.T) *runtime.Scheme { t.Helper() s := runtime.NewScheme() @@ -30,6 +32,19 @@ func testScheme(t *testing.T) *runtime.Scheme { return s } +// testRevision builds a Knative Revision object with a given ActualReplicas count. +func testRevision(name, namespace string, actualReplicas int32) *servingv1.Revision { + return &servingv1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Status: servingv1.RevisionStatus{ + ActualReplicas: &actualReplicas, + }, + } +} + // testClient builds an AppK8sClient backed by a fake K8s client. func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { t.Helper() @@ -38,15 +53,12 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { WithScheme(s). WithObjects(objs...). Build() - cfg := &config.AppConfig{ - Namespace: "flyte-apps", - DefaultRequestTimeout: 5 * time.Minute, - MaxRequestTimeout: time.Hour, - } return &AppK8sClient{ k8sClient: fc, - namespace: cfg.Namespace, - cfg: cfg, + cfg: &config.AppConfig{ + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + }, } } @@ -79,7 +91,7 @@ func TestDeploy_Create(t *testing.T) { ksvc := &servingv1.Service{} err = c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc) require.NoError(t, err) assert.Equal(t, "proj", ksvc.Labels[labelProject]) assert.Equal(t, "dev", ksvc.Labels[labelDomain]) @@ -99,7 +111,7 @@ func TestDeploy_UpdateOnSpecChange(t *testing.T) { ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) assert.Equal(t, "nginx:2.0", ksvc.Spec.Template.Spec.Containers[0].Image) } @@ -111,14 +123,14 @@ func TestDeploy_SkipUpdateWhenUnchanged(t *testing.T) { // Get initial resource version. ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) initialRV := ksvc.ResourceVersion // Deploy same spec — should be a no-op. require.NoError(t, c.Deploy(context.Background(), app)) require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) assert.Equal(t, initialRV, ksvc.ResourceVersion, "resource version should not change on no-op deploy") } @@ -132,7 +144,7 @@ func TestStop(t *testing.T) { ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) assert.Equal(t, "0", ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"]) } @@ -153,7 +165,7 @@ func TestDelete(t *testing.T) { ksvc := &servingv1.Service{} err := c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc) assert.True(t, k8serrors.IsNotFound(err)) } @@ -167,9 +179,9 @@ func TestGetStatus_NotFound(t *testing.T) { c := testClient(t) id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "missing"} status, err := c.GetStatus(context.Background(), id) - require.NoError(t, err) - require.Len(t, status.Conditions, 1) - assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) + assert.Nil(t, status) } func TestGetStatus_Stopped(t *testing.T) { @@ -186,17 +198,57 @@ func TestGetStatus_Stopped(t *testing.T) { assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) } +func TestGetStatus_CurrentReplicas(t *testing.T) { + s := testScheme(t) + // Pre-populate a KService with LatestReadyRevisionName already set in status, + // and the corresponding Revision with ActualReplicas=4. + ksvc := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myapp", + Namespace: "proj-dev", + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "myapp", + }, + Annotations: map[string]string{ + annotationAppID: "proj/dev/myapp", + }, + }, + } + ksvc.Status.LatestReadyRevisionName = "myapp-00001" + + rev := testRevision("myapp-00001", "proj-dev", 4) + + fc := fake.NewClientBuilder(). + WithScheme(s). + WithObjects(ksvc, rev). + WithStatusSubresource(ksvc). + Build() + c := &AppK8sClient{ + k8sClient: fc, + cfg: &config.AppConfig{}, + } + + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + status, err := c.GetStatus(context.Background(), id) + require.NoError(t, err) + assert.Equal(t, uint32(4), status.CurrentReplicas) +} + func TestList(t *testing.T) { s := testScheme(t) // Pre-populate two KServices with different project labels. ksvc1 := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-app1", - Namespace: "flyte-apps", + Name: "app1", + Namespace: "proj-dev", Labels: map[string]string{ - labelProject: "proj", - labelDomain: "dev", - labelAppName: "app1", + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app1", }, Annotations: map[string]string{ annotationAppID: "proj/dev/app1", @@ -205,12 +257,13 @@ func TestList(t *testing.T) { } ksvc2 := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "other-dev-app2", - Namespace: "flyte-apps", + Name: "app2", + Namespace: "other-dev", Labels: map[string]string{ - labelProject: "other", - labelDomain: "dev", - labelAppName: "app2", + labelAppManaged: "true", + labelProject: "other", + labelDomain: "dev", + labelAppName: "app2", }, Annotations: map[string]string{ annotationAppID: "other/dev/app2", @@ -224,9 +277,7 @@ func TestList(t *testing.T) { Build() c := &AppK8sClient{ k8sClient: fc, - namespace: "flyte-apps", cfg: &config.AppConfig{ - Namespace: "flyte-apps", DefaultRequestTimeout: 5 * time.Minute, MaxRequestTimeout: time.Hour, }, @@ -243,11 +294,9 @@ func TestGetReplicas(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-myapp-abc", - Namespace: "flyte-apps", + Name: "myapp-abc", + Namespace: "proj-dev", Labels: map[string]string{ - labelProject: "proj", - labelDomain: "dev", labelAppName: "myapp", }, }, @@ -261,15 +310,14 @@ func TestGetReplicas(t *testing.T) { fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - namespace: "flyte-apps", - cfg: &config.AppConfig{Namespace: "flyte-apps"}, + cfg: &config.AppConfig{}, } id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} replicas, err := c.GetReplicas(context.Background(), id) require.NoError(t, err) require.Len(t, replicas, 1) - assert.Equal(t, "proj-dev-myapp-abc", replicas[0].Metadata.Id.Name) + assert.Equal(t, "myapp-abc", replicas[0].Metadata.Id.Name) assert.Equal(t, "ACTIVE", replicas[0].Status.DeploymentStatus) } @@ -277,33 +325,32 @@ func TestDeleteReplica(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-myapp-abc", - Namespace: "flyte-apps", + Name: "myapp-abc", + Namespace: "proj-dev", }, } fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - namespace: "flyte-apps", - cfg: &config.AppConfig{Namespace: "flyte-apps"}, + cfg: &config.AppConfig{}, } replicaID := &flyteapp.ReplicaIdentifier{ AppId: &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}, - Name: "proj-dev-myapp-abc", + Name: "myapp-abc", } require.NoError(t, c.DeleteReplica(context.Background(), replicaID)) err := fc.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp-abc", Namespace: "flyte-apps"}, &corev1.Pod{}) + client.ObjectKey{Name: "myapp-abc", Namespace: "proj-dev"}, &corev1.Pod{}) assert.True(t, k8serrors.IsNotFound(err)) } func TestKserviceEventToWatchResponse(t *testing.T) { ksvc := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-myapp", - Namespace: "flyte-apps", + Name: "myapp", + Namespace: "proj-dev", Annotations: map[string]string{ annotationAppID: "proj/dev/myapp", }, @@ -322,9 +369,10 @@ func TestKserviceEventToWatchResponse(t *testing.T) { {k8swatch.Bookmark, true, ""}, } + c := testClient(t) for _, tt := range tests { t.Run(string(tt.eventType), func(t *testing.T) { - resp := kserviceEventToWatchResponse(k8swatch.Event{ + resp := c.kserviceEventToWatchResponse(context.Background(), k8swatch.Event{ Type: tt.eventType, Object: ksvc, }) @@ -349,22 +397,29 @@ func TestKserviceEventToWatchResponse(t *testing.T) { func TestKserviceName(t *testing.T) { tests := []struct { - project, domain, name string - want string + name string + want string }{ - {"proj", "dev", "myapp", "proj-dev-myapp"}, - {"P", "D", "N", "p-d-n"}, - // Long name should be truncated to 63 chars. + {"myapp", "myapp"}, + {"MyApp", "myapp"}, + // v1 and v2 variants stay distinct — no truncation collision. + {"my-long-service-name-v1", "my-long-service-name-v1"}, + {"my-long-service-name-v2", "my-long-service-name-v2"}, + // Names over 63 chars get a hash suffix instead of blind truncation. { - "verylongprojectname", - "verylongdomainname", - "verylongappnamethatexceedslimit", - "verylongprojectname-verylongdomainname-verylongappnamethatexcee"[:63], + "this-is-a-very-long-app-name-that-exceeds-the-kubernetes-dns-label-limit", + func() string { + name := "this-is-a-very-long-app-name-that-exceeds-the-kubernetes-dns-label-limit" + sum := sha256.Sum256([]byte(name)) + return name[:54] + "-" + hex.EncodeToString(sum[:4]) + }(), }, } for _, tt := range tests { - id := &flyteapp.Identifier{Project: tt.project, Domain: tt.domain, Name: tt.name} - assert.Equal(t, tt.want, kserviceName(id)) + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: tt.name} + got := kserviceName(id) + assert.Equal(t, tt.want, got) + assert.LessOrEqual(t, len(got), maxKServiceNameLen) } } From 2bc6562afa7959c7e95bf3e68d7c6557befb5759 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:39:11 -0700 Subject: [PATCH 09/11] impl: internal service Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/config/config.go | 4 + app/internal/k8s/app_client.go | 48 ++-- app/internal/k8s/app_client_test.go | 3 +- app/internal/service/internal_app_service.go | 250 +++++++++++++++++++ app/internal/setup.go | 39 +++ 5 files changed, 328 insertions(+), 16 deletions(-) create mode 100644 app/internal/service/internal_app_service.go create mode 100644 app/internal/setup.go diff --git a/app/config/config.go b/app/config/config.go index 99feec4549..cc2085c4ee 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -7,6 +7,10 @@ type AppConfig struct { // Enabled controls whether the app deployment controller is started. Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + // BaseDomain is the base domain used to generate public URLs for apps. + // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". + BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` + // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index c9b1baf268..a716c539d8 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -52,8 +52,10 @@ type AppK8sClientInterface interface { // Returns a not-found error (checkable with k8serrors.IsNotFound) if the KService does not exist. GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) - // List returns all apps (spec + live status) for the given project/domain scope. - List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) + // List returns apps for the given project/domain scope with optional pagination. + // limit=0 means no limit. token is the K8s continue token from a previous call. + // Returns the apps, the continue token for the next page (empty if last page), and any error. + List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) // Delete removes the KService CRD entirely. The app must be re-created from scratch. // Use Stop to scale to zero while preserving the KService. @@ -66,8 +68,9 @@ type AppK8sClientInterface interface { DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error // Watch returns a channel of WatchResponse events for KServices matching the - // given project/domain scope. The channel is closed when ctx is cancelled. - Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) + // given project/domain scope. If appName is non-empty, only events for that + // specific app are returned. The channel is closed when ctx is cancelled. + Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) } // AppK8sClient implements AppK8sClientInterface using controller-runtime. @@ -169,13 +172,20 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e } // Watch returns a channel of WatchResponse events for KServices in the given -// project/domain scope. The channel is closed when ctx is cancelled or the +// project/domain scope. If appName is non-empty, only events for that specific +// app are returned. The channel is closed when ctx is cancelled or the // underlying watch terminates. -func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) { +func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) { ns := appNamespace(project, domain) + + labels := map[string]string{labelAppManaged: "true"} + if appName != "" { + labels[labelAppName] = strings.ToLower(appName) + } + watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, client.InNamespace(ns), - client.MatchingLabels{labelAppManaged: "true"}, + client.MatchingLabels(labels), ) if err != nil { return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err) @@ -258,16 +268,24 @@ func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier return c.kserviceToStatus(ctx, ksvc), nil } -// List returns all apps for the given project/domain by listing KServices in the -// project/domain namespace. -func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { +// List returns apps for the given project/domain scope with optional pagination. +func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) { ns := appNamespace(project, domain) - list := &servingv1.ServiceList{} - if err := c.k8sClient.List(ctx, list, + + listOpts := []client.ListOption{ client.InNamespace(ns), client.MatchingLabels{labelAppManaged: "true"}, - ); err != nil { - return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) + } + if limit > 0 { + listOpts = append(listOpts, client.Limit(int64(limit))) + } + if token != "" { + listOpts = append(listOpts, client.Continue(token)) + } + + list := &servingv1.ServiceList{} + if err := c.k8sClient.List(ctx, list, listOpts...); err != nil { + return nil, "", fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) } apps := make([]*flyteapp.App, 0, len(list.Items)) @@ -279,7 +297,7 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*fly } apps = append(apps, a) } - return apps, nil + return apps, list.Continue, nil } // --- Helpers --- diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 706e21c0d1..e0d22d4f77 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -283,8 +283,9 @@ func TestList(t *testing.T) { }, } - apps, err := c.List(context.Background(), "proj", "dev") + apps, nextToken, err := c.List(context.Background(), "proj", "dev", 0, "") require.NoError(t, err) + assert.Empty(t, nextToken) require.Len(t, apps, 1) assert.Equal(t, "proj", apps[0].Metadata.Id.Project) assert.Equal(t, "app1", apps[0].Metadata.Id.Name) diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go new file mode 100644 index 0000000000..8840f2e452 --- /dev/null +++ b/app/internal/service/internal_app_service.go @@ -0,0 +1,250 @@ +package service + +import ( + "context" + "fmt" + "strings" + + "connectrpc.com/connect" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" +) + +// InternalAppService is the data plane implementation of the AppService. +// It has direct K8s access via AppK8sClientInterface and no database dependency — +// all app state lives in KService CRDs. +type InternalAppService struct { + appconnect.UnimplementedAppServiceHandler + k8s appk8s.AppK8sClientInterface + cfg *appconfig.AppConfig +} + +// NewInternalAppService creates a new InternalAppService. +func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.AppConfig) *InternalAppService { + return &InternalAppService{k8s: k8s, cfg: cfg} +} + +// Ensure InternalAppService satisfies the generated handler interface. +var _ appconnect.AppServiceHandler = (*InternalAppService)(nil) + +// Create deploys a new app as a KService CRD. +func (s *InternalAppService) Create( + ctx context.Context, + req *connect.Request[flyteapp.CreateRequest], +) (*connect.Response[flyteapp.CreateResponse], error) { + app := req.Msg.GetApp() + if app.GetMetadata().GetId() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app identifier is required")) + } + if app.GetSpec() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app spec is required")) + } + if app.GetSpec().GetAppPayload() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app spec must include a container or pod payload")) + } + + if err := s.k8s.Deploy(ctx, app); err != nil { + logger.Errorf(ctx, "Failed to deploy app %s: %v", app.GetMetadata().GetId().GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + app.Status = &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + { + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, + LastTransitionTime: timestamppb.Now(), + }, + }, + Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg.BaseDomain), + } + + return connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil +} + +// publicIngress builds the deterministic public URL for an app. +// Pattern: "https://{name}-{project}-{domain}.{base_domain}" +// Returns nil if BaseDomain is not configured. +func publicIngress(id *flyteapp.Identifier, baseDomain string) *flyteapp.Ingress { + if baseDomain == "" { + return nil + } + host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", + id.GetName(), id.GetProject(), id.GetDomain(), baseDomain)) + return &flyteapp.Ingress{ + PublicUrl: "https://" + host, + } +} + +// Get retrieves an app and its live status from the KService CRD. +// Note: App.Spec is not populated — status and ingress URL are the authoritative fields. +func (s *InternalAppService) Get( + ctx context.Context, + req *connect.Request[flyteapp.GetRequest], +) (*connect.Response[flyteapp.GetResponse], error) { + appID, ok := req.Msg.GetIdentifier().(*flyteapp.GetRequest_AppId) + if !ok || appID.AppId == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app_id is required")) + } + + status, err := s.k8s.GetStatus(ctx, appID.AppId) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + return connect.NewResponse(&flyteapp.GetResponse{ + App: &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: appID.AppId}, + Status: status, + }, + }), nil +} + +// Update modifies an app's spec or desired state. +// When Spec.DesiredState is STOPPED, the app is scaled to zero (KService kept). +// When Spec.DesiredState is STARTED or ACTIVE, the app is redeployed/resumed. +// Otherwise the spec update is applied and the app is redeployed. +func (s *InternalAppService) Update( + ctx context.Context, + req *connect.Request[flyteapp.UpdateRequest], +) (*connect.Response[flyteapp.UpdateResponse], error) { + app := req.Msg.GetApp() + if app.GetMetadata().GetId() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app identifier is required")) + } + + appID := app.GetMetadata().GetId() + + switch app.GetSpec().GetDesiredState() { + case flyteapp.Spec_DESIRED_STATE_STOPPED: + if err := s.k8s.Stop(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to stop app %s: %v", appID.GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + default: + // UNSPECIFIED, STARTED, ACTIVE — deploy/redeploy the spec. + if err := s.k8s.Deploy(ctx, app); err != nil { + logger.Errorf(ctx, "Failed to update app %s: %v", appID.GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + } + + status, err := s.k8s.GetStatus(ctx, appID) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + app.Status = status + + return connect.NewResponse(&flyteapp.UpdateResponse{App: app}), nil +} + +// Delete removes the KService CRD for the given app entirely. +func (s *InternalAppService) Delete( + ctx context.Context, + req *connect.Request[flyteapp.DeleteRequest], +) (*connect.Response[flyteapp.DeleteResponse], error) { + appID := req.Msg.GetAppId() + if appID == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app_id is required")) + } + + if err := s.k8s.Delete(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to delete app %s: %v", appID.GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + return connect.NewResponse(&flyteapp.DeleteResponse{}), nil +} + +// List returns apps for the requested scope with pagination. +func (s *InternalAppService) List( + ctx context.Context, + req *connect.Request[flyteapp.ListRequest], +) (*connect.Response[flyteapp.ListResponse], error) { + var project, domain string + + switch f := req.Msg.GetFilterBy().(type) { + case *flyteapp.ListRequest_Project: + project = f.Project.GetName() + domain = f.Project.GetDomain() + case *flyteapp.ListRequest_Org, *flyteapp.ListRequest_ClusterId: + return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("org and cluster_id filters are not supported by the data plane")) + } + + var limit uint32 + var token string + if r := req.Msg.GetRequest(); r != nil { + limit = r.GetLimit() + token = r.GetToken() + } + + apps, nextToken, err := s.k8s.List(ctx, project, domain, limit, token) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + return connect.NewResponse(&flyteapp.ListResponse{Apps: apps, Token: nextToken}), nil +} + +// Watch streams live KService events to the client. +// It first sends the current state as CreateEvents (initial snapshot), then streams changes. +func (s *InternalAppService) Watch( + ctx context.Context, + req *connect.Request[flyteapp.WatchRequest], + stream *connect.ServerStream[flyteapp.WatchResponse], +) error { + var project, domain, appName string + + switch t := req.Msg.GetTarget().(type) { + case *flyteapp.WatchRequest_Project: + project = t.Project.GetName() + domain = t.Project.GetDomain() + case *flyteapp.WatchRequest_AppId: + project = t.AppId.GetProject() + domain = t.AppId.GetDomain() + appName = t.AppId.GetName() + case *flyteapp.WatchRequest_Org, *flyteapp.WatchRequest_ClusterId: + return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("org and cluster_id watch targets are not supported by the data plane")) + } + + // Send initial snapshot so the client has current state before watching for changes. + snapshot, _, err := s.k8s.List(ctx, project, domain, 0, "") + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + for _, app := range snapshot { + if appName != "" && app.GetMetadata().GetId().GetName() != appName { + continue + } + if err := stream.Send(&flyteapp.WatchResponse{ + Event: &flyteapp.WatchResponse_CreateEvent{ + CreateEvent: &flyteapp.CreateEvent{App: app}, + }, + }); err != nil { + return err + } + } + + ch, err := s.k8s.Watch(ctx, project, domain, appName) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + + for { + select { + case <-ctx.Done(): + return nil + case event, ok := <-ch: + if !ok { + return nil + } + if err := stream.Send(event); err != nil { + return err + } + } + } +} diff --git a/app/internal/setup.go b/app/internal/setup.go new file mode 100644 index 0000000000..7b5ad60a2c --- /dev/null +++ b/app/internal/setup.go @@ -0,0 +1,39 @@ +package internal + +import ( + "context" + "fmt" + "net/http" + + stdlibapp "github.com/flyteorg/flyte/v2/flytestdlib/app" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/app/internal/service" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" + knativeapp "github.com/flyteorg/flyte/v2/flytestdlib/app" +) + +// Setup registers the InternalAppService handler on the SetupContext mux. +// It is mounted at /internal to avoid collision with the control plane +// AppService, which shares the same proto service definition. +func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.AppConfig) error { + if !cfg.Enabled { + logger.Infof(ctx, "InternalAppService disabled (apps.enabled=false), skipping setup") + return nil + } + + if err := knativeapp.InitAppScheme(); err != nil { + return fmt.Errorf("internalapp: failed to register Knative scheme: %w", err) + } + + appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg) + internalAppSvc := service.NewInternalAppService(appK8sClient, cfg) + + path, handler := appconnect.NewAppServiceHandler(internalAppSvc) + sc.Mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) + logger.Infof(ctx, "Mounted InternalAppService at /internal%s", path) + + return nil +} From 5e34858a31685e80a27377d8d7e3eb1a6dffe477 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:54:41 -0700 Subject: [PATCH 10/11] add tests Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- .../service/internal_app_service_test.go | 354 ++++++++++++++++++ 1 file changed, 354 insertions(+) create mode 100644 app/internal/service/internal_app_service_test.go diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go new file mode 100644 index 0000000000..49eb0af9f3 --- /dev/null +++ b/app/internal/service/internal_app_service_test.go @@ -0,0 +1,354 @@ +package service + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" + flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common" +) + +// mockAppK8sClient is a testify mock for AppK8sClientInterface. +type mockAppK8sClient struct { + mock.Mock +} + +func (m *mockAppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { + return m.Called(ctx, app).Error(0) +} + +func (m *mockAppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { + return m.Called(ctx, appID).Error(0) +} + +func (m *mockAppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) error { + return m.Called(ctx, appID).Error(0) +} + +func (m *mockAppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { + args := m.Called(ctx, appID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*flyteapp.Status), args.Error(1) +} + +func (m *mockAppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) { + args := m.Called(ctx, project, domain, limit, token) + if args.Get(0) == nil { + return nil, "", args.Error(2) + } + return args.Get(0).([]*flyteapp.App), args.String(1), args.Error(2) +} + +func (m *mockAppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) { + args := m.Called(ctx, appID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]*flyteapp.Replica), args.Error(1) +} + +func (m *mockAppK8sClient) DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error { + return m.Called(ctx, replicaID).Error(0) +} + +func (m *mockAppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) { + args := m.Called(ctx, project, domain, appName) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(<-chan *flyteapp.WatchResponse), args.Error(1) +} + +// --- helpers --- + +func testCfg() *appconfig.AppConfig { + return &appconfig.AppConfig{ + Enabled: true, + BaseDomain: "apps.example.com", + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + } +} + +func testAppID() *flyteapp.Identifier { + return &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} +} + +func testApp() *flyteapp.App { + return &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: testAppID()}, + Spec: &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Container{ + Container: &flytecoreapp.Container{Image: "nginx:latest"}, + }, + }, + } +} + +func testStatus(phase flyteapp.Status_DeploymentStatus) *flyteapp.Status { + return &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + {DeploymentStatus: phase}, + }, + } +} + +func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceClient { + svc := NewInternalAppService(k8s, testCfg()) + path, handler := appconnect.NewAppServiceHandler(svc) + mux := http.NewServeMux() + mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) + server := httptest.NewServer(mux) + t.Cleanup(server.Close) + return appconnect.NewAppServiceClient(http.DefaultClient, server.URL+"/internal") +} + +// --- Create --- + +func TestCreate_Success(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + + resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_PENDING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + assert.Equal(t, "https://myapp-proj-dev.apps.example.com", resp.Msg.App.Status.Ingress.PublicUrl) + k8s.AssertExpectations(t) +} + +func TestCreate_MissingID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ + App: &flyteapp.App{Spec: testApp().Spec}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +func TestCreate_MissingSpec(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ + App: &flyteapp.App{Metadata: &flyteapp.Meta{Id: testAppID()}}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +func TestCreate_MissingPayload(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ + App: &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: testAppID()}, + Spec: &flyteapp.Spec{}, + }, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { + k8s := &mockAppK8sClient{} + cfg := testCfg() + cfg.BaseDomain = "" + svc := NewInternalAppService(k8s, cfg) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + + resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) + require.NoError(t, err) + assert.Nil(t, resp.Msg.App.Status.Ingress) + k8s.AssertExpectations(t) +} + +// --- Get --- + +func TestGet_Success(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + appID := testAppID() + k8s.On("GetStatus", mock.Anything, appID).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE), nil) + + resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + k8s.AssertExpectations(t) +} + +func TestGet_MissingAppID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{})) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +// --- Update --- + +func TestUpdate_Deploy(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("GetStatus", mock.Anything, app.Metadata.Id).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING), nil) + + resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + k8s.AssertExpectations(t) +} + +func TestUpdate_Stop(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + app := testApp() + app.Spec.DesiredState = flyteapp.Spec_DESIRED_STATE_STOPPED + k8s.On("Stop", mock.Anything, app.Metadata.Id).Return(nil) + k8s.On("GetStatus", mock.Anything, app.Metadata.Id).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED), nil) + + resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + k8s.AssertExpectations(t) +} + +func TestUpdate_MissingID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{ + App: &flyteapp.App{}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +// --- Delete --- + +func TestDelete_Success(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + appID := testAppID() + k8s.On("Delete", mock.Anything, appID).Return(nil) + + _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{AppId: appID})) + require.NoError(t, err) + k8s.AssertExpectations(t) +} + +func TestDelete_MissingID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{})) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +// --- List --- + +func TestList_ByProject(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + apps := []*flyteapp.App{testApp()} + k8s.On("List", mock.Anything, "proj", "dev", uint32(10), "tok").Return(apps, "nexttok", nil) + + resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{ + FilterBy: &flyteapp.ListRequest_Project{ + Project: &common.ProjectIdentifier{Name: "proj", Domain: "dev"}, + }, + Request: &common.ListRequest{Limit: 10, Token: "tok"}, + })) + require.NoError(t, err) + assert.Len(t, resp.Msg.Apps, 1) + assert.Equal(t, "nexttok", resp.Msg.Token) + k8s.AssertExpectations(t) +} + +func TestList_NoFilter(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + k8s.On("List", mock.Anything, "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) + + resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{})) + require.NoError(t, err) + assert.Empty(t, resp.Msg.Apps) + k8s.AssertExpectations(t) +} + +// --- Watch --- + +func TestWatch_InitialSnapshot(t *testing.T) { + k8s := &mockAppK8sClient{} + + apps := []*flyteapp.App{testApp()} + ch := make(chan *flyteapp.WatchResponse) + close(ch) + + k8s.On("List", mock.Anything, "proj", "dev", uint32(0), "").Return(apps, "", nil) + k8s.On("Watch", mock.Anything, "proj", "dev", "").Return((<-chan *flyteapp.WatchResponse)(ch), nil) + + client := newTestClient(t, k8s) + stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ + Target: &flyteapp.WatchRequest_Project{ + Project: &common.ProjectIdentifier{Name: "proj", Domain: "dev"}, + }, + })) + require.NoError(t, err) + + // Expect one CreateEvent from the initial snapshot. + require.True(t, stream.Receive()) + resp := stream.Msg() + ce, ok := resp.Event.(*flyteapp.WatchResponse_CreateEvent) + require.True(t, ok) + assert.Equal(t, "myapp", ce.CreateEvent.App.Metadata.Id.Name) + + // Channel is closed — stream should end. + assert.False(t, stream.Receive()) + k8s.AssertExpectations(t) +} + +func TestWatch_AppIDTarget(t *testing.T) { + k8s := &mockAppK8sClient{} + + ch := make(chan *flyteapp.WatchResponse) + close(ch) + + k8s.On("List", mock.Anything, "proj", "dev", uint32(0), "").Return([]*flyteapp.App{}, "", nil) + k8s.On("Watch", mock.Anything, "proj", "dev", "myapp").Return((<-chan *flyteapp.WatchResponse)(ch), nil) + + client := newTestClient(t, k8s) + stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ + Target: &flyteapp.WatchRequest_AppId{AppId: testAppID()}, + })) + require.NoError(t, err) + + // No snapshot apps, channel closed — stream ends immediately. + assert.False(t, stream.Receive()) + k8s.AssertExpectations(t) +} From bfeafe37b56fa8cc3b59a958c029cd6a560aadbe Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Fri, 17 Apr 2026 12:19:26 -0700 Subject: [PATCH 11/11] address comments Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/internal/k8s/app_client.go | 11 ++++-- app/internal/k8s/app_client_test.go | 39 ++++++++++++++++++- app/internal/service/internal_app_service.go | 24 ++++++------ .../service/internal_app_service_test.go | 31 ++++++++++++--- app/internal/setup.go | 3 +- 5 files changed, 85 insertions(+), 23 deletions(-) diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index a716c539d8..ce24654367 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -53,9 +53,10 @@ type AppK8sClientInterface interface { GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) // List returns apps for the given project/domain scope with optional pagination. + // If appName is non-empty, only the app with that name is returned. // limit=0 means no limit. token is the K8s continue token from a previous call. // Returns the apps, the continue token for the next page (empty if last page), and any error. - List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) + List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) // Delete removes the KService CRD entirely. The app must be re-created from scratch. // Use Stop to scale to zero while preserving the KService. @@ -269,12 +270,16 @@ func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier } // List returns apps for the given project/domain scope with optional pagination. -func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) { +func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) { ns := appNamespace(project, domain) + matchLabels := client.MatchingLabels{labelAppManaged: "true"} + if appName != "" { + matchLabels[labelAppName] = strings.ToLower(appName) + } listOpts := []client.ListOption{ client.InNamespace(ns), - client.MatchingLabels{labelAppManaged: "true"}, + matchLabels, } if limit > 0 { listOpts = append(listOpts, client.Limit(int64(limit))) diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index e0d22d4f77..7fa30910c2 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -283,7 +283,7 @@ func TestList(t *testing.T) { }, } - apps, nextToken, err := c.List(context.Background(), "proj", "dev", 0, "") + apps, nextToken, err := c.List(context.Background(), "proj", "dev", "", 0, "") require.NoError(t, err) assert.Empty(t, nextToken) require.Len(t, apps, 1) @@ -291,6 +291,43 @@ func TestList(t *testing.T) { assert.Equal(t, "app1", apps[0].Metadata.Id.Name) } +func TestList_ByAppName(t *testing.T) { + s := testScheme(t) + ksvc1 := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "proj-dev", + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app1", + }, + Annotations: map[string]string{annotationAppID: "proj/dev/app1"}, + }, + } + ksvc2 := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app2", + Namespace: "proj-dev", + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app2", + }, + Annotations: map[string]string{annotationAppID: "proj/dev/app2"}, + }, + } + fc := fake.NewClientBuilder().WithScheme(s).WithObjects(ksvc1, ksvc2).Build() + c := &AppK8sClient{k8sClient: fc, cfg: &config.AppConfig{}} + + apps, _, err := c.List(context.Background(), "proj", "dev", "app1", 0, "") + require.NoError(t, err) + require.Len(t, apps, 1) + assert.Equal(t, "app1", apps[0].Metadata.Id.Name) +} + func TestGetReplicas(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index 8840f2e452..3d75198a40 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -7,6 +7,7 @@ import ( "connectrpc.com/connect" timestamppb "google.golang.org/protobuf/types/known/timestamppb" + k8serrors "k8s.io/apimachinery/pkg/api/errors" appconfig "github.com/flyteorg/flyte/v2/app/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" @@ -93,6 +94,9 @@ func (s *InternalAppService) Get( status, err := s.k8s.GetStatus(ctx, appID.AppId) if err != nil { + if k8serrors.IsNotFound(err) { + return nil, connect.NewError(connect.CodeNotFound, err) + } return nil, connect.NewError(connect.CodeInternal, err) } @@ -182,7 +186,7 @@ func (s *InternalAppService) List( token = r.GetToken() } - apps, nextToken, err := s.k8s.List(ctx, project, domain, limit, token) + apps, nextToken, err := s.k8s.List(ctx, project, domain, "", limit, token) if err != nil { return nil, connect.NewError(connect.CodeInternal, err) } @@ -211,15 +215,18 @@ func (s *InternalAppService) Watch( return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("org and cluster_id watch targets are not supported by the data plane")) } - // Send initial snapshot so the client has current state before watching for changes. - snapshot, _, err := s.k8s.List(ctx, project, domain, 0, "") + // Start watch before listing so no events are lost between the two calls. + ch, err := s.k8s.Watch(ctx, project, domain, appName) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + + // Send initial snapshot so the client has current state before streaming changes. + snapshot, _, err := s.k8s.List(ctx, project, domain, appName, 0, "") if err != nil { return connect.NewError(connect.CodeInternal, err) } for _, app := range snapshot { - if appName != "" && app.GetMetadata().GetId().GetName() != appName { - continue - } if err := stream.Send(&flyteapp.WatchResponse{ Event: &flyteapp.WatchResponse_CreateEvent{ CreateEvent: &flyteapp.CreateEvent{App: app}, @@ -229,11 +236,6 @@ func (s *InternalAppService) Watch( } } - ch, err := s.k8s.Watch(ctx, project, domain, appName) - if err != nil { - return connect.NewError(connect.CodeInternal, err) - } - for { select { case <-ctx.Done(): diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index 49eb0af9f3..08f7159e8e 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -2,6 +2,7 @@ package service import ( "context" + "fmt" "net/http" "net/http/httptest" "testing" @@ -11,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" appconfig "github.com/flyteorg/flyte/v2/app/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" @@ -44,8 +47,8 @@ func (m *mockAppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identi return args.Get(0).(*flyteapp.Status), args.Error(1) } -func (m *mockAppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) { - args := m.Called(ctx, project, domain, limit, token) +func (m *mockAppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) { + args := m.Called(ctx, project, domain, appName, limit, token) if args.Get(0) == nil { return nil, "", args.Error(2) } @@ -197,6 +200,22 @@ func TestGet_Success(t *testing.T) { k8s.AssertExpectations(t) } +func TestGet_NotFound(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + appID := testAppID() + notFoundErr := fmt.Errorf("KService myapp not found: %w", kerrors.NewNotFound(schema.GroupResource{}, "myapp")) + k8s.On("GetStatus", mock.Anything, appID).Return(nil, notFoundErr) + + _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) + k8s.AssertExpectations(t) +} + func TestGet_MissingAppID(t *testing.T) { svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) @@ -275,7 +294,7 @@ func TestList_ByProject(t *testing.T) { svc := NewInternalAppService(k8s, testCfg()) apps := []*flyteapp.App{testApp()} - k8s.On("List", mock.Anything, "proj", "dev", uint32(10), "tok").Return(apps, "nexttok", nil) + k8s.On("List", mock.Anything, "proj", "dev", "", uint32(10), "tok").Return(apps, "nexttok", nil) resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{ FilterBy: &flyteapp.ListRequest_Project{ @@ -293,7 +312,7 @@ func TestList_NoFilter(t *testing.T) { k8s := &mockAppK8sClient{} svc := NewInternalAppService(k8s, testCfg()) - k8s.On("List", mock.Anything, "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) + k8s.On("List", mock.Anything, "", "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{})) require.NoError(t, err) @@ -310,8 +329,8 @@ func TestWatch_InitialSnapshot(t *testing.T) { ch := make(chan *flyteapp.WatchResponse) close(ch) - k8s.On("List", mock.Anything, "proj", "dev", uint32(0), "").Return(apps, "", nil) k8s.On("Watch", mock.Anything, "proj", "dev", "").Return((<-chan *flyteapp.WatchResponse)(ch), nil) + k8s.On("List", mock.Anything, "proj", "dev", "", uint32(0), "").Return(apps, "", nil) client := newTestClient(t, k8s) stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ @@ -339,8 +358,8 @@ func TestWatch_AppIDTarget(t *testing.T) { ch := make(chan *flyteapp.WatchResponse) close(ch) - k8s.On("List", mock.Anything, "proj", "dev", uint32(0), "").Return([]*flyteapp.App{}, "", nil) k8s.On("Watch", mock.Anything, "proj", "dev", "myapp").Return((<-chan *flyteapp.WatchResponse)(ch), nil) + k8s.On("List", mock.Anything, "proj", "dev", "myapp", uint32(0), "").Return([]*flyteapp.App{}, "", nil) client := newTestClient(t, k8s) stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ diff --git a/app/internal/setup.go b/app/internal/setup.go index 7b5ad60a2c..619bcb91d2 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -12,7 +12,6 @@ import ( appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" "github.com/flyteorg/flyte/v2/app/internal/service" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" - knativeapp "github.com/flyteorg/flyte/v2/flytestdlib/app" ) // Setup registers the InternalAppService handler on the SetupContext mux. @@ -24,7 +23,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.AppCo return nil } - if err := knativeapp.InitAppScheme(); err != nil { + if err := stdlibapp.InitAppScheme(); err != nil { return fmt.Errorf("internalapp: failed to register Knative scheme: %w", err) }