From f10d0fcad0f15d66d0fcb8bcfb990d78e217502a Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:23:35 -0700 Subject: [PATCH 1/9] add: dep Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- actions/k8s/app_scheme.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 actions/k8s/app_scheme.go diff --git a/actions/k8s/app_scheme.go b/actions/k8s/app_scheme.go new file mode 100644 index 0000000000..9f3d450d49 --- /dev/null +++ b/actions/k8s/app_scheme.go @@ -0,0 +1,13 @@ +package k8s + +import ( + "k8s.io/client-go/kubernetes/scheme" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" +) + +// InitAppScheme registers Knative Serving types (Service, Route, Configuration, Revision) +// into the client-go scheme so that the K8s client can manage KService CRDs. +// Must be called before creating any K8s clients that interact with apps. +func InitAppScheme() error { + return servingv1.AddToScheme(scheme.Scheme) +} From 115563ff1e828a2f4abad71b648a8891717f2ed5 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Mon, 6 Apr 2026 21:09:02 -0700 Subject: [PATCH 2/9] restructure Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- actions/k8s/app_scheme.go | 13 ------------- flytestdlib/app/db.go | 0 2 files changed, 13 deletions(-) delete mode 100644 actions/k8s/app_scheme.go create mode 100644 flytestdlib/app/db.go diff --git a/actions/k8s/app_scheme.go b/actions/k8s/app_scheme.go deleted file mode 100644 index 9f3d450d49..0000000000 --- a/actions/k8s/app_scheme.go +++ /dev/null @@ -1,13 +0,0 @@ -package k8s - -import ( - "k8s.io/client-go/kubernetes/scheme" - servingv1 "knative.dev/serving/pkg/apis/serving/v1" -) - -// InitAppScheme registers Knative Serving types (Service, Route, Configuration, Revision) -// into the client-go scheme so that the K8s client can manage KService CRDs. -// Must be called before creating any K8s clients that interact with apps. -func InitAppScheme() error { - return servingv1.AddToScheme(scheme.Scheme) -} diff --git a/flytestdlib/app/db.go b/flytestdlib/app/db.go new file mode 100644 index 0000000000..e69de29bb2 From 65b378b3bd474b0d2bd4ba782089483e5dbffebe Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:17:47 -0700 Subject: [PATCH 3/9] wip Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/app.go | 398 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 398 insertions(+) create mode 100644 app/app.go diff --git a/app/app.go b/app/app.go new file mode 100644 index 0000000000..44cd9c49c7 --- /dev/null +++ b/app/app.go @@ -0,0 +1,398 @@ +package app + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + "google.golang.org/protobuf/proto" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/flyteorg/flyte/v2/actions/config" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" +) + +const ( + labelAppManaged = "flyte.org/app-managed" + labelProject = "flyte.org/project" + labelDomain = "flyte.org/domain" + labelAppName = "flyte.org/app-name" + + annotationSpecSHA = "flyte.org/spec-sha" + annotationAppID = "flyte.org/app-id" + + maxScaleZero = "0" + + // maxKServiceNameLen is the Kubernetes DNS label limit. + maxKServiceNameLen = 63 +) + +// AppK8sClientInterface defines the KService lifecycle operations for the App service. +type AppK8sClientInterface interface { + // Deploy creates or updates the KService for the given app. Idempotent — skips + // the update if the spec SHA annotation is unchanged. + Deploy(ctx context.Context, app *flyteapp.App) error + + // Stop scales the KService to zero by setting max-scale=0. The KService CRD + // is kept so the app can be restarted later. + Stop(ctx context.Context, appID *flyteapp.Identifier) error + + // GetStatus reads the KService and maps its conditions to a DeploymentStatus. + // Returns a Status with STOPPED if the KService does not exist. + GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) + + // List returns all apps (spec + live status) for the given project/domain scope. + List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) +} + +// AppK8sClient implements AppK8sClientInterface using controller-runtime. +type AppK8sClient struct { + k8sClient client.WithWatch + cache ctrlcache.Cache + namespace string + cfg *config.AppConfig +} + +// NewAppK8sClient creates a new AppK8sClient. +func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.AppConfig) *AppK8sClient { + return &AppK8sClient{ + k8sClient: k8sClient, + cache: cache, + namespace: cfg.Namespace, + cfg: cfg, + } +} + +// Deploy creates or updates the KService for the given app. +func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { + appID := app.GetMetadata().GetId() + name := kserviceName(appID) + + ksvc, err := c.buildKService(app) + if err != nil { + return fmt.Errorf("failed to build KService for app %s: %w", name, err) + } + + existing := &servingv1.Service{} + err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, existing) + if k8serrors.IsNotFound(err) { + if err := c.k8sClient.Create(ctx, ksvc); err != nil { + return fmt.Errorf("failed to create KService %s: %w", name, err) + } + logger.Infof(ctx, "Created KService %s/%s", c.namespace, name) + return nil + } + if err != nil { + return fmt.Errorf("failed to get KService %s: %w", name, err) + } + + // Skip update if spec has not changed. + if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { + logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", c.namespace, name) + return nil + } + + existing.Spec = ksvc.Spec + existing.Labels = ksvc.Labels + existing.Annotations = ksvc.Annotations + if err := c.k8sClient.Update(ctx, existing); err != nil { + return fmt.Errorf("failed to update KService %s: %w", name, err) + } + logger.Infof(ctx, "Updated KService %s/%s", c.namespace, name) + return nil +} + +// Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. +func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { + name := kserviceName(appID) + patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"autoscaling.knative.dev/max-scale":"0"}}}}}`) + ksvc := &servingv1.Service{} + ksvc.Name = name + ksvc.Namespace = c.namespace + if err := c.k8sClient.Patch(ctx, ksvc, client.RawPatch(types.MergePatchType, patch)); err != nil { + if k8serrors.IsNotFound(err) { + // Already stopped/deleted — treat as success. + return nil + } + return fmt.Errorf("failed to patch KService %s to stop: %w", name, err) + } + logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", c.namespace, name) + return nil +} + +// GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. +func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { + name := kserviceName(appID) + ksvc := &servingv1.Service{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, ksvc); err != nil { + if k8serrors.IsNotFound(err) { + return statusWithPhase(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, "KService not found"), nil + } + return nil, fmt.Errorf("failed to get KService %s: %w", name, err) + } + return kserviceToStatus(ksvc), nil +} + +// List returns all apps for the given project/domain by listing KServices with label selectors. +func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { + list := &servingv1.ServiceList{} + if err := c.k8sClient.List(ctx, list, + client.InNamespace(c.namespace), + client.MatchingLabels{ + labelProject: project, + labelDomain: domain, + }, + ); err != nil { + return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) + } + + apps := make([]*flyteapp.App, 0, len(list.Items)) + for i := range list.Items { + a, err := kserviceToApp(&list.Items[i]) + if err != nil { + logger.Warnf(ctx, "Skipping KService %s: failed to convert to app: %v", list.Items[i].Name, err) + continue + } + apps = append(apps, a) + } + return apps, nil +} + +// --- Helpers --- + +// kserviceName builds the KService name from an app identifier. +// Format: "{project}-{domain}-{name}", truncated to 63 chars. +func kserviceName(id *flyteapp.Identifier) string { + name := fmt.Sprintf("%s-%s-%s", id.GetProject(), id.GetDomain(), id.GetName()) + if len(name) > maxKServiceNameLen { + name = name[:maxKServiceNameLen] + } + return strings.ToLower(name) +} + +// specSHA computes a SHA256 digest of the serialized App Spec proto. +func specSHA(spec *flyteapp.Spec) (string, error) { + b, err := proto.Marshal(spec) + if err != nil { + return "", fmt.Errorf("failed to marshal spec: %w", err) + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:8]), nil // 8 bytes = 16 hex chars, enough for change detection +} + +// buildKService constructs a Knative Service manifest from an App proto. +func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, error) { + appID := app.GetMetadata().GetId() + spec := app.GetSpec() + name := kserviceName(appID) + + sha, err := specSHA(spec) + if err != nil { + return nil, err + } + + podSpec, err := buildPodSpec(spec) + if err != nil { + return nil, err + } + + templateAnnotations := buildAutoscalingAnnotations(spec, c.cfg) + + timeoutSecs := c.cfg.DefaultRequestTimeout.Seconds() + if t := spec.GetTimeouts().GetRequestTimeout(); t != nil { + timeoutSecs = t.AsDuration().Seconds() + if timeoutSecs > c.cfg.MaxRequestTimeout.Seconds() { + timeoutSecs = c.cfg.MaxRequestTimeout.Seconds() + } + } + timeoutSecsInt := int64(timeoutSecs) + + ksvc := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: c.namespace, + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: appID.GetProject(), + labelDomain: appID.GetDomain(), + labelAppName: appID.GetName(), + }, + Annotations: map[string]string{ + annotationSpecSHA: sha, + annotationAppID: fmt.Sprintf("%s/%s/%s", appID.GetProject(), appID.GetDomain(), appID.GetName()), + }, + }, + Spec: servingv1.ServiceSpec{ + ConfigurationSpec: servingv1.ConfigurationSpec{ + Template: servingv1.RevisionTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: templateAnnotations, + }, + Spec: servingv1.RevisionSpec{ + PodSpec: podSpec, + TimeoutSeconds: &timeoutSecsInt, + }, + }, + }, + }, + } + return ksvc, nil +} + +// buildPodSpec constructs a corev1.PodSpec from an App Spec. +// Supports Container payload only for now; K8sPod support can be added in a follow-up. +func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { + switch p := spec.GetAppPayload().(type) { + case *flyteapp.Spec_Container: + c := p.Container + container := corev1.Container{ + Name: "app", + Image: c.GetImage(), + Args: c.GetArgs(), + } + for _, e := range c.GetEnv() { + container.Env = append(container.Env, corev1.EnvVar{ + Name: e.GetKey(), + Value: e.GetValue(), + }) + } + return corev1.PodSpec{Containers: []corev1.Container{container}}, nil + + case *flyteapp.Spec_Pod: + // K8sPod payloads are not yet supported — the pod spec serialization + // from flyteplugins is needed for a complete implementation. + return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload is not yet supported") + + default: + return corev1.PodSpec{}, fmt.Errorf("app spec has no payload (container or pod required)") + } +} + +// buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template. +func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.AppConfig) map[string]string { + annotations := map[string]string{} + autoscaling := spec.GetAutoscaling() + if autoscaling == nil { + return annotations + } + + if r := autoscaling.GetReplicas(); r != nil { + annotations["autoscaling.knative.dev/min-scale"] = fmt.Sprintf("%d", r.GetMin()) + annotations["autoscaling.knative.dev/max-scale"] = fmt.Sprintf("%d", r.GetMax()) + } + + if m := autoscaling.GetScalingMetric(); m != nil { + switch metric := m.GetMetric().(type) { + case *flyteapp.ScalingMetric_RequestRate: + annotations["autoscaling.knative.dev/metric"] = "rps" + annotations["autoscaling.knative.dev/target"] = fmt.Sprintf("%d", metric.RequestRate.GetTargetValue()) + case *flyteapp.ScalingMetric_Concurrency: + annotations["autoscaling.knative.dev/metric"] = "concurrency" + annotations["autoscaling.knative.dev/target"] = fmt.Sprintf("%d", metric.Concurrency.GetTargetValue()) + } + } + + if p := autoscaling.GetScaledownPeriod(); p != nil { + annotations["autoscaling.knative.dev/window"] = p.AsDuration().String() + } + + return annotations +} + +// statusWithPhase builds a flyteapp.Status with a single Condition set to the given phase. +func statusWithPhase(phase flyteapp.Status_DeploymentStatus, message string) *flyteapp.Status { + return &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + { + DeploymentStatus: phase, + Message: message, + LastTransitionTime: timestamppb.Now(), + }, + }, + } +} + +// kserviceToStatus maps a KService's conditions to a flyteapp.Status proto. +func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { + var phase flyteapp.Status_DeploymentStatus + var message string + + // Check if max-scale=0 is set — explicitly stopped by the control plane. + if ann := ksvc.Spec.Template.Annotations; ann != nil { + if ann["autoscaling.knative.dev/max-scale"] == maxScaleZero { + phase = flyteapp.Status_DEPLOYMENT_STATUS_STOPPED + message = "App scaled to zero" + } + } + + if phase == flyteapp.Status_DEPLOYMENT_STATUS_UNSPECIFIED { + switch { + case ksvc.IsReady(): + phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE + case ksvc.IsFailed(): + phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED + if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { + message = c.Message + } + case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: + phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING + default: + phase = flyteapp.Status_DEPLOYMENT_STATUS_PENDING + } + } + + status := statusWithPhase(phase, message) + + // Populate ingress URL from KService route status. + if url := ksvc.Status.URL; url != nil { + status.Ingress = &flyteapp.Ingress{ + PublicUrl: url.String(), + } + } + + // Populate current replica count and K8s namespace metadata. + status.CurrentReplicas = uint32(len(ksvc.Status.Traffic)) + status.K8SMetadata = &flyteapp.K8SMetadata{ + Namespace: ksvc.Namespace, + } + + return status +} + +// kserviceToApp reconstructs a flyteapp.App from a KService by reading the +// app identifier from annotations and the live status from KService conditions. +func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { + appIDStr, ok := ksvc.Annotations[annotationAppID] + if !ok { + return nil, fmt.Errorf("KService %s missing %s annotation", ksvc.Name, annotationAppID) + } + + // annotation format: "{project}/{domain}/{name}" + parts := strings.SplitN(appIDStr, "/", 3) + if len(parts) != 3 { + return nil, fmt.Errorf("KService %s has malformed %s annotation: %q", ksvc.Name, annotationAppID, appIDStr) + } + + appID := &flyteapp.Identifier{ + Project: parts[0], + Domain: parts[1], + Name: parts[2], + } + + return &flyteapp.App{ + Metadata: &flyteapp.Meta{ + Id: appID, + }, + Status: kserviceToStatus(ksvc), + }, nil +} From 9915dbdd8551155647e5d0d3c662c2120214f220 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:23:07 -0700 Subject: [PATCH 4/9] restruct Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- {app => actions/k8s}/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename {app => actions/k8s}/app.go (99%) diff --git a/app/app.go b/actions/k8s/app.go similarity index 99% rename from app/app.go rename to actions/k8s/app.go index 44cd9c49c7..b47a5bcafa 100644 --- a/app/app.go +++ b/actions/k8s/app.go @@ -1,4 +1,4 @@ -package app +package k8s import ( "context" From e2cbefcc542cd5b4ed995663615cc686f59ba406 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 12:44:05 -0700 Subject: [PATCH 5/9] fix Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- .../app.go => app/internal/k8s/app_client.go | 208 +++++++- app/internal/k8s/app_client_test.go | 446 ++++++++++++++++++ 2 files changed, 652 insertions(+), 2 deletions(-) rename actions/k8s/app.go => app/internal/k8s/app_client.go (67%) create mode 100644 app/internal/k8s/app_client_test.go diff --git a/actions/k8s/app.go b/app/internal/k8s/app_client.go similarity index 67% rename from actions/k8s/app.go rename to app/internal/k8s/app_client.go index b47a5bcafa..bd6b189b7d 100644 --- a/actions/k8s/app.go +++ b/app/internal/k8s/app_client.go @@ -7,13 +7,14 @@ import ( "fmt" "strings" + "google.golang.org/protobuf/proto" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + k8swatch "k8s.io/apimachinery/pkg/watch" servingv1 "knative.dev/serving/pkg/apis/serving/v1" - "google.golang.org/protobuf/proto" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -53,6 +54,20 @@ type AppK8sClientInterface interface { // List returns all apps (spec + live status) for the given project/domain scope. List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) + + // Delete removes the KService CRD entirely. The app must be re-created from scratch. + // Use Stop to scale to zero while preserving the KService. + Delete(ctx context.Context, appID *flyteapp.Identifier) error + + // GetReplicas lists the pods (replicas) currently backing the given app. + GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) + + // DeleteReplica force-deletes a specific pod. Knative will replace it automatically. + DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error + + // Watch returns a channel of WatchResponse events for KServices matching the + // given project/domain scope. The channel is closed when ctx is cancelled. + Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) } // AppK8sClient implements AppK8sClientInterface using controller-runtime. @@ -130,6 +145,105 @@ func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) err return nil } +// Delete removes the KService CRD for the given app entirely. +func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) error { + name := kserviceName(appID) + ksvc := &servingv1.Service{} + ksvc.Name = name + ksvc.Namespace = c.namespace + if err := c.k8sClient.Delete(ctx, ksvc); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete KService %s: %w", name, err) + } + logger.Infof(ctx, "Deleted KService %s/%s", c.namespace, name) + return nil +} + +// Watch returns a channel of WatchResponse events for KServices in the given +// project/domain scope. Pass empty strings to watch all managed KServices. +// The channel is closed when ctx is cancelled or the underlying watch terminates. +func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) { + labels := client.MatchingLabels{labelAppManaged: "true"} + if project != "" { + labels[labelProject] = project + } + if domain != "" { + labels[labelDomain] = domain + } + + watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, + client.InNamespace(c.namespace), + labels, + ) + if err != nil { + return nil, fmt.Errorf("failed to start KService watch for %s/%s: %w", project, domain, err) + } + + ch := make(chan *flyteapp.WatchResponse, 64) + go func() { + defer close(ch) + defer watcher.Stop() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-watcher.ResultChan(): + if !ok { + return + } + resp := kserviceEventToWatchResponse(event) + if resp == nil { + continue + } + select { + case ch <- resp: + case <-ctx.Done(): + return + } + } + } + }() + return ch, nil +} + +// kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse. +// Returns nil for event types that should not be forwarded (Error, Bookmark). +func kserviceEventToWatchResponse(event k8swatch.Event) *flyteapp.WatchResponse { + ksvc, ok := event.Object.(*servingv1.Service) + if !ok { + return nil + } + app, err := kserviceToApp(ksvc) + if err != nil { + // KService is not managed by us — skip it. + return nil + } + switch event.Type { + case k8swatch.Added: + return &flyteapp.WatchResponse{ + Event: &flyteapp.WatchResponse_CreateEvent{ + CreateEvent: &flyteapp.CreateEvent{App: app}, + }, + } + case k8swatch.Modified: + return &flyteapp.WatchResponse{ + Event: &flyteapp.WatchResponse_UpdateEvent{ + UpdateEvent: &flyteapp.UpdateEvent{UpdatedApp: app}, + }, + } + case k8swatch.Deleted: + return &flyteapp.WatchResponse{ + Event: &flyteapp.WatchResponse_DeleteEvent{ + DeleteEvent: &flyteapp.DeleteEvent{App: app}, + }, + } + default: + return nil + } +} + // GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { name := kserviceName(appID) @@ -369,6 +483,96 @@ func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { return status } +// GetReplicas lists the pods currently backing the given app by matching +// the flyte.org/project, flyte.org/domain, and flyte.org/app-name labels. +func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) { + podList := &corev1.PodList{} + if err := c.k8sClient.List(ctx, podList, + client.InNamespace(c.namespace), + client.MatchingLabels{ + labelProject: appID.GetProject(), + labelDomain: appID.GetDomain(), + labelAppName: appID.GetName(), + }, + ); err != nil { + return nil, fmt.Errorf("failed to list pods for app %s/%s/%s: %w", + appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + replicas := make([]*flyteapp.Replica, 0, len(podList.Items)) + for i := range podList.Items { + replicas = append(replicas, podToReplica(appID, &podList.Items[i])) + } + return replicas, nil +} + +// DeleteReplica force-deletes a specific pod. Knative will schedule a replacement automatically. +func (c *AppK8sClient) DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error { + pod := &corev1.Pod{} + pod.Name = replicaID.GetName() + pod.Namespace = c.namespace + if err := c.k8sClient.Delete(ctx, pod); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete pod %s/%s: %w", c.namespace, replicaID.GetName(), err) + } + logger.Infof(ctx, "Deleted replica pod %s/%s", c.namespace, replicaID.GetName()) + return nil +} + +// podToReplica maps a corev1.Pod to a flyteapp.Replica proto. +func podToReplica(appID *flyteapp.Identifier, pod *corev1.Pod) *flyteapp.Replica { + status, reason := podDeploymentStatus(pod) + return &flyteapp.Replica{ + Metadata: &flyteapp.ReplicaMeta{ + Id: &flyteapp.ReplicaIdentifier{ + AppId: appID, + Name: pod.Name, + }, + }, + Status: &flyteapp.ReplicaStatus{ + DeploymentStatus: status, + Reason: reason, + }, + } +} + +// podDeploymentStatus maps a pod's phase and conditions to a status string and reason. +func podDeploymentStatus(pod *corev1.Pod) (string, string) { + switch pod.Status.Phase { + case corev1.PodRunning: + for _, cs := range pod.Status.ContainerStatuses { + if !cs.Ready { + if cs.State.Waiting != nil { + return "DEPLOYING", cs.State.Waiting.Reason + } + return "DEPLOYING", "container not ready" + } + } + return "ACTIVE", "" + case corev1.PodPending: + for _, cs := range pod.Status.ContainerStatuses { + if cs.State.Waiting != nil && cs.State.Waiting.Reason != "" { + return "PENDING", cs.State.Waiting.Reason + } + } + return "PENDING", string(pod.Status.Phase) + case corev1.PodFailed: + reason := pod.Status.Reason + if reason == "" && len(pod.Status.ContainerStatuses) > 0 { + if t := pod.Status.ContainerStatuses[0].State.Terminated; t != nil { + reason = t.Reason + } + } + return "FAILED", reason + case corev1.PodSucceeded: + return "STOPPED", "pod completed" + default: + return "PENDING", string(pod.Status.Phase) + } +} + // kserviceToApp reconstructs a flyteapp.App from a KService by reading the // app identifier from annotations and the live status from KService conditions. func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go new file mode 100644 index 0000000000..2a7bc2c16d --- /dev/null +++ b/app/internal/k8s/app_client_test.go @@ -0,0 +1,446 @@ +package k8s + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8swatch "k8s.io/apimachinery/pkg/watch" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/flyteorg/flyte/v2/actions/config" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" +) + +// testScheme builds a runtime.Scheme with Knative types registered. +func testScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(s)) + require.NoError(t, servingv1.AddToScheme(s)) + return s +} + +// testClient builds an AppK8sClient backed by a fake K8s client. +func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { + t.Helper() + s := testScheme(t) + fc := fake.NewClientBuilder(). + WithScheme(s). + WithObjects(objs...). + Build() + cfg := &config.AppConfig{ + Namespace: "flyte-apps", + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + } + return &AppK8sClient{ + k8sClient: fc, + namespace: cfg.Namespace, + cfg: cfg, + } +} + +// testApp builds a minimal flyteapp.App for use in tests. +func testApp(project, domain, name, image string) *flyteapp.App { + return &flyteapp.App{ + Metadata: &flyteapp.Meta{ + Id: &flyteapp.Identifier{ + Project: project, + Domain: domain, + Name: name, + }, + }, + Spec: &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Container{ + Container: &flytecoreapp.Container{ + Image: image, + }, + }, + }, + } +} + +func TestDeploy_Create(t *testing.T) { + c := testClient(t) + app := testApp("proj", "dev", "myapp", "nginx:latest") + + err := c.Deploy(context.Background(), app) + require.NoError(t, err) + + ksvc := &servingv1.Service{} + err = c.k8sClient.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) + require.NoError(t, err) + assert.Equal(t, "proj", ksvc.Labels[labelProject]) + assert.Equal(t, "dev", ksvc.Labels[labelDomain]) + assert.Equal(t, "myapp", ksvc.Labels[labelAppName]) + assert.NotEmpty(t, ksvc.Annotations[annotationSpecSHA]) + assert.Equal(t, "proj/dev/myapp", ksvc.Annotations[annotationAppID]) +} + +func TestDeploy_UpdateOnSpecChange(t *testing.T) { + c := testClient(t) + app := testApp("proj", "dev", "myapp", "nginx:1.0") + require.NoError(t, c.Deploy(context.Background(), app)) + + // Change image — spec SHA changes → update should happen. + app.Spec.GetContainer().Image = "nginx:2.0" + require.NoError(t, c.Deploy(context.Background(), app)) + + ksvc := &servingv1.Service{} + require.NoError(t, c.k8sClient.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + assert.Equal(t, "nginx:2.0", ksvc.Spec.Template.Spec.Containers[0].Image) +} + +func TestDeploy_SkipUpdateWhenUnchanged(t *testing.T) { + c := testClient(t) + app := testApp("proj", "dev", "myapp", "nginx:latest") + require.NoError(t, c.Deploy(context.Background(), app)) + + // Get initial resource version. + ksvc := &servingv1.Service{} + require.NoError(t, c.k8sClient.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + initialRV := ksvc.ResourceVersion + + // Deploy same spec — should be a no-op. + require.NoError(t, c.Deploy(context.Background(), app)) + + require.NoError(t, c.k8sClient.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + assert.Equal(t, initialRV, ksvc.ResourceVersion, "resource version should not change on no-op deploy") +} + +func TestStop(t *testing.T) { + c := testClient(t) + app := testApp("proj", "dev", "myapp", "nginx:latest") + require.NoError(t, c.Deploy(context.Background(), app)) + + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + require.NoError(t, c.Stop(context.Background(), id)) + + ksvc := &servingv1.Service{} + require.NoError(t, c.k8sClient.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + assert.Equal(t, "0", ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"]) +} + +func TestStop_NotFound(t *testing.T) { + c := testClient(t) + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "missing"} + // Should succeed silently — already gone. + require.NoError(t, c.Stop(context.Background(), id)) +} + +func TestDelete(t *testing.T) { + c := testClient(t) + app := testApp("proj", "dev", "myapp", "nginx:latest") + require.NoError(t, c.Deploy(context.Background(), app)) + + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + require.NoError(t, c.Delete(context.Background(), id)) + + ksvc := &servingv1.Service{} + err := c.k8sClient.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) + assert.True(t, k8serrors.IsNotFound(err)) +} + +func TestDelete_NotFound(t *testing.T) { + c := testClient(t) + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "missing"} + require.NoError(t, c.Delete(context.Background(), id)) +} + +func TestGetStatus_NotFound(t *testing.T) { + c := testClient(t) + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "missing"} + status, err := c.GetStatus(context.Background(), id) + require.NoError(t, err) + require.Len(t, status.Conditions, 1) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) +} + +func TestGetStatus_Stopped(t *testing.T) { + c := testClient(t) + app := testApp("proj", "dev", "myapp", "nginx:latest") + require.NoError(t, c.Deploy(context.Background(), app)) + + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + require.NoError(t, c.Stop(context.Background(), id)) + + status, err := c.GetStatus(context.Background(), id) + require.NoError(t, err) + require.Len(t, status.Conditions, 1) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) +} + +func TestList(t *testing.T) { + s := testScheme(t) + // Pre-populate two KServices with different project labels. + ksvc1 := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "proj-dev-app1", + Namespace: "flyte-apps", + Labels: map[string]string{ + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app1", + }, + Annotations: map[string]string{ + annotationAppID: "proj/dev/app1", + }, + }, + } + ksvc2 := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "other-dev-app2", + Namespace: "flyte-apps", + Labels: map[string]string{ + labelProject: "other", + labelDomain: "dev", + labelAppName: "app2", + }, + Annotations: map[string]string{ + annotationAppID: "other/dev/app2", + }, + }, + } + + fc := fake.NewClientBuilder(). + WithScheme(s). + WithObjects(ksvc1, ksvc2). + Build() + c := &AppK8sClient{ + k8sClient: fc, + namespace: "flyte-apps", + cfg: &config.AppConfig{ + Namespace: "flyte-apps", + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + }, + } + + apps, err := c.List(context.Background(), "proj", "dev") + require.NoError(t, err) + require.Len(t, apps, 1) + assert.Equal(t, "proj", apps[0].Metadata.Id.Project) + assert.Equal(t, "app1", apps[0].Metadata.Id.Name) +} + +func TestGetReplicas(t *testing.T) { + s := testScheme(t) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "proj-dev-myapp-abc", + Namespace: "flyte-apps", + Labels: map[string]string{ + labelProject: "proj", + labelDomain: "dev", + labelAppName: "myapp", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + {Ready: true}, + }, + }, + } + fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() + c := &AppK8sClient{ + k8sClient: fc, + namespace: "flyte-apps", + cfg: &config.AppConfig{Namespace: "flyte-apps"}, + } + + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + replicas, err := c.GetReplicas(context.Background(), id) + require.NoError(t, err) + require.Len(t, replicas, 1) + assert.Equal(t, "proj-dev-myapp-abc", replicas[0].Metadata.Id.Name) + assert.Equal(t, "ACTIVE", replicas[0].Status.DeploymentStatus) +} + +func TestDeleteReplica(t *testing.T) { + s := testScheme(t) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "proj-dev-myapp-abc", + Namespace: "flyte-apps", + }, + } + fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() + c := &AppK8sClient{ + k8sClient: fc, + namespace: "flyte-apps", + cfg: &config.AppConfig{Namespace: "flyte-apps"}, + } + + replicaID := &flyteapp.ReplicaIdentifier{ + AppId: &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}, + Name: "proj-dev-myapp-abc", + } + require.NoError(t, c.DeleteReplica(context.Background(), replicaID)) + + err := fc.Get(context.Background(), + client.ObjectKey{Name: "proj-dev-myapp-abc", Namespace: "flyte-apps"}, &corev1.Pod{}) + assert.True(t, k8serrors.IsNotFound(err)) +} + +func TestKserviceEventToWatchResponse(t *testing.T) { + ksvc := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "proj-dev-myapp", + Namespace: "flyte-apps", + Annotations: map[string]string{ + annotationAppID: "proj/dev/myapp", + }, + }, + } + + tests := []struct { + eventType k8swatch.EventType + wantNil bool + wantEventKey string + }{ + {k8swatch.Added, false, "create"}, + {k8swatch.Modified, false, "update"}, + {k8swatch.Deleted, false, "delete"}, + {k8swatch.Error, true, ""}, + {k8swatch.Bookmark, true, ""}, + } + + for _, tt := range tests { + t.Run(string(tt.eventType), func(t *testing.T) { + resp := kserviceEventToWatchResponse(k8swatch.Event{ + Type: tt.eventType, + Object: ksvc, + }) + if tt.wantNil { + assert.Nil(t, resp) + return + } + require.NotNil(t, resp) + switch tt.wantEventKey { + case "create": + assert.NotNil(t, resp.GetCreateEvent()) + assert.Equal(t, "proj", resp.GetCreateEvent().App.Metadata.Id.Project) + case "update": + assert.NotNil(t, resp.GetUpdateEvent()) + assert.Equal(t, "myapp", resp.GetUpdateEvent().UpdatedApp.Metadata.Id.Name) + case "delete": + assert.NotNil(t, resp.GetDeleteEvent()) + } + }) + } +} + +func TestKserviceName(t *testing.T) { + tests := []struct { + project, domain, name string + want string + }{ + {"proj", "dev", "myapp", "proj-dev-myapp"}, + {"P", "D", "N", "p-d-n"}, + // Long name should be truncated to 63 chars. + { + "verylongprojectname", + "verylongdomainname", + "verylongappnamethatexceedslimit", + "verylongprojectname-verylongdomainname-verylongappnamethatexcee"[:63], + }, + } + for _, tt := range tests { + id := &flyteapp.Identifier{Project: tt.project, Domain: tt.domain, Name: tt.name} + assert.Equal(t, tt.want, kserviceName(id)) + } +} + +func TestPodDeploymentStatus(t *testing.T) { + tests := []struct { + name string + pod corev1.Pod + wantStatus string + wantReason string + }{ + { + name: "running and ready", + pod: corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{{Ready: true}}, + }, + }, + wantStatus: "ACTIVE", + }, + { + name: "running but container not ready", + pod: corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + {Ready: false, State: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{Reason: "ContainerCreating"}, + }}, + }, + }, + }, + wantStatus: "DEPLOYING", + wantReason: "ContainerCreating", + }, + { + name: "pending with waiting reason", + pod: corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + ContainerStatuses: []corev1.ContainerStatus{ + {State: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{Reason: "ImagePullBackOff"}, + }}, + }, + }, + }, + wantStatus: "PENDING", + wantReason: "ImagePullBackOff", + }, + { + name: "failed", + pod: corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + Reason: "OOMKilled", + }, + }, + wantStatus: "FAILED", + wantReason: "OOMKilled", + }, + { + name: "succeeded", + pod: corev1.Pod{ + Status: corev1.PodStatus{Phase: corev1.PodSucceeded}, + }, + wantStatus: "STOPPED", + wantReason: "pod completed", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + status, reason := podDeploymentStatus(&tt.pod) + assert.Equal(t, tt.wantStatus, status) + assert.Equal(t, tt.wantReason, reason) + }) + } +} From 61e09c6c2a946a45acb40f90eaf414303bf57285 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:10:02 -0700 Subject: [PATCH 6/9] move config Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- actions/config/config.go | 26 -------------------------- app/config/config.go | 18 ++++++++++++++++++ app/internal/k8s/app_client.go | 2 +- app/internal/k8s/app_client_test.go | 2 +- 4 files changed, 20 insertions(+), 28 deletions(-) create mode 100644 app/config/config.go diff --git a/actions/config/config.go b/actions/config/config.go index 53803e2fca..46b964e043 100644 --- a/actions/config/config.go +++ b/actions/config/config.go @@ -1,8 +1,6 @@ package config import ( - "time" - "github.com/flyteorg/flyte/v2/flytestdlib/config" ) @@ -23,31 +21,10 @@ var defaultConfig = &Config{ RunServiceURL: "http://localhost:8090", // 8M slots × 8 bytes/pointer = 64 MB; can track ~8M unique actions. RecordFilterSize: 1 << 23, - Apps: AppConfig{ - Enabled: false, - Namespace: "flyte-apps", - DefaultRequestTimeout: 5 * time.Minute, - MaxRequestTimeout: time.Hour, - }, } var configSection = config.MustRegisterSection(configSectionKey, defaultConfig) -// AppConfig holds configuration for the App deployment controller. -type AppConfig struct { - // Enabled controls whether the app deployment controller is started. - Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` - - // Namespace is the K8s namespace where KService CRDs are created. - Namespace string `json:"namespace" pflag:",Namespace for app KServices"` - - // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. - DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` - - // MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s). - MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"` -} - // Config holds the configuration for the Actions service type Config struct { // HTTP server configuration @@ -68,9 +45,6 @@ type Config struct { // RecordFilterSize is the size of the bloom filter used to deduplicate RecordAction calls. RecordFilterSize int `json:"recordFilterSize" pflag:",Size of the oppo bloom filter for deduplicating RecordAction calls"` - - // Apps holds configuration for the app deployment controller. - Apps AppConfig `json:"apps"` } // ServerConfig holds HTTP server configuration diff --git a/app/config/config.go b/app/config/config.go new file mode 100644 index 0000000000..f8f9caa116 --- /dev/null +++ b/app/config/config.go @@ -0,0 +1,18 @@ +package config + +import "time" + +// AppConfig holds configuration for the App deployment controller. +type AppConfig struct { + // Enabled controls whether the app deployment controller is started. + Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + + // Namespace is the K8s namespace where KService CRDs are created. + Namespace string `json:"namespace" pflag:",Namespace for app KServices"` + + // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. + DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` + + // MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s). + MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"` +} diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index bd6b189b7d..0850d92cf0 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -18,7 +18,7 @@ import ( ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/flyteorg/flyte/v2/actions/config" + "github.com/flyteorg/flyte/v2/app/config" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" ) diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 2a7bc2c16d..99be7ba9ec 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -16,7 +16,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/flyteorg/flyte/v2/actions/config" + "github.com/flyteorg/flyte/v2/app/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) From dc89b356009c5b821f6972c06138c49f60744051 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 7 Apr 2026 21:06:24 -0700 Subject: [PATCH 7/9] remove db.go Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- flytestdlib/app/db.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 flytestdlib/app/db.go diff --git a/flytestdlib/app/db.go b/flytestdlib/app/db.go deleted file mode 100644 index e69de29bb2..0000000000 From 96cc8e1ac0eacd8b9e9e45930141034048cb83f9 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:59:34 -0700 Subject: [PATCH 8/9] address comments Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/config/config.go | 3 - app/internal/k8s/app_client.go | 133 ++++++++++++---------- app/internal/k8s/app_client_test.go | 167 ++++++++++++++++++---------- 3 files changed, 184 insertions(+), 119 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index f8f9caa116..99feec4549 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -7,9 +7,6 @@ type AppConfig struct { // Enabled controls whether the app deployment controller is started. Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` - // Namespace is the K8s namespace where KService CRDs are created. - Namespace string `json:"namespace" pflag:",Namespace for app KServices"` - // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 0850d92cf0..c9b1baf268 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -49,7 +49,7 @@ type AppK8sClientInterface interface { Stop(ctx context.Context, appID *flyteapp.Identifier) error // GetStatus reads the KService and maps its conditions to a DeploymentStatus. - // Returns a Status with STOPPED if the KService does not exist. + // Returns a not-found error (checkable with k8serrors.IsNotFound) if the KService does not exist. GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) // List returns all apps (spec + live status) for the given project/domain scope. @@ -74,7 +74,6 @@ type AppK8sClientInterface interface { type AppK8sClient struct { k8sClient client.WithWatch cache ctrlcache.Cache - namespace string cfg *config.AppConfig } @@ -83,14 +82,20 @@ func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *con return &AppK8sClient{ k8sClient: k8sClient, cache: cache, - namespace: cfg.Namespace, cfg: cfg, } } +// appNamespace returns the K8s namespace for a given project/domain pair. +// Follows the same convention as the Actions and Secret services: "{project}-{domain}". +func appNamespace(project, domain string) string { + return fmt.Sprintf("%s-%s", project, domain) +} + // Deploy creates or updates the KService for the given app. func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { appID := app.GetMetadata().GetId() + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc, err := c.buildKService(app) @@ -99,12 +104,12 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { } existing := &servingv1.Service{} - err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, existing) + err = c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: ns}, existing) if k8serrors.IsNotFound(err) { if err := c.k8sClient.Create(ctx, ksvc); err != nil { return fmt.Errorf("failed to create KService %s: %w", name, err) } - logger.Infof(ctx, "Created KService %s/%s", c.namespace, name) + logger.Infof(ctx, "Created KService %s/%s", ns, name) return nil } if err != nil { @@ -113,7 +118,7 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { // Skip update if spec has not changed. if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { - logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", c.namespace, name) + logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", ns, name) return nil } @@ -123,17 +128,18 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { if err := c.k8sClient.Update(ctx, existing); err != nil { return fmt.Errorf("failed to update KService %s: %w", name, err) } - logger.Infof(ctx, "Updated KService %s/%s", c.namespace, name) + logger.Infof(ctx, "Updated KService %s/%s", ns, name) return nil } // Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"autoscaling.knative.dev/max-scale":"0"}}}}}`) ksvc := &servingv1.Service{} ksvc.Name = name - ksvc.Namespace = c.namespace + ksvc.Namespace = ns if err := c.k8sClient.Patch(ctx, ksvc, client.RawPatch(types.MergePatchType, patch)); err != nil { if k8serrors.IsNotFound(err) { // Already stopped/deleted — treat as success. @@ -141,44 +147,38 @@ func (c *AppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) err } return fmt.Errorf("failed to patch KService %s to stop: %w", name, err) } - logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", c.namespace, name) + logger.Infof(ctx, "Stopped KService %s/%s (max-scale=0)", ns, name) return nil } // Delete removes the KService CRD for the given app entirely. func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) error { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc := &servingv1.Service{} ksvc.Name = name - ksvc.Namespace = c.namespace + ksvc.Namespace = ns if err := c.k8sClient.Delete(ctx, ksvc); err != nil { if k8serrors.IsNotFound(err) { return nil } return fmt.Errorf("failed to delete KService %s: %w", name, err) } - logger.Infof(ctx, "Deleted KService %s/%s", c.namespace, name) + logger.Infof(ctx, "Deleted KService %s/%s", ns, name) return nil } // Watch returns a channel of WatchResponse events for KServices in the given -// project/domain scope. Pass empty strings to watch all managed KServices. -// The channel is closed when ctx is cancelled or the underlying watch terminates. +// project/domain scope. The channel is closed when ctx is cancelled or the +// underlying watch terminates. func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) { - labels := client.MatchingLabels{labelAppManaged: "true"} - if project != "" { - labels[labelProject] = project - } - if domain != "" { - labels[labelDomain] = domain - } - + ns := appNamespace(project, domain) watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, - client.InNamespace(c.namespace), - labels, + client.InNamespace(ns), + client.MatchingLabels{labelAppManaged: "true"}, ) if err != nil { - return nil, fmt.Errorf("failed to start KService watch for %s/%s: %w", project, domain, err) + return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err) } ch := make(chan *flyteapp.WatchResponse, 64) @@ -193,7 +193,7 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-cha if !ok { return } - resp := kserviceEventToWatchResponse(event) + resp := c.kserviceEventToWatchResponse(ctx, event) if resp == nil { continue } @@ -210,12 +210,12 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-cha // kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse. // Returns nil for event types that should not be forwarded (Error, Bookmark). -func kserviceEventToWatchResponse(event k8swatch.Event) *flyteapp.WatchResponse { +func (c *AppK8sClient) kserviceEventToWatchResponse(ctx context.Context, event k8swatch.Event) *flyteapp.WatchResponse { ksvc, ok := event.Object.(*servingv1.Service) if !ok { return nil } - app, err := kserviceToApp(ksvc) + app, err := c.kserviceToApp(ctx, ksvc) if err != nil { // KService is not managed by us — skip it. return nil @@ -246,33 +246,33 @@ func kserviceEventToWatchResponse(event k8swatch.Event) *flyteapp.WatchResponse // GetStatus reads the KService and maps its conditions to a flyteapp.Status proto. func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) ksvc := &servingv1.Service{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: c.namespace}, ksvc); err != nil { + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: ns}, ksvc); err != nil { if k8serrors.IsNotFound(err) { - return statusWithPhase(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, "KService not found"), nil + return nil, fmt.Errorf("KService %s not found: %w", name, err) } return nil, fmt.Errorf("failed to get KService %s: %w", name, err) } - return kserviceToStatus(ksvc), nil + return c.kserviceToStatus(ctx, ksvc), nil } -// List returns all apps for the given project/domain by listing KServices with label selectors. +// List returns all apps for the given project/domain by listing KServices in the +// project/domain namespace. func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { + ns := appNamespace(project, domain) list := &servingv1.ServiceList{} if err := c.k8sClient.List(ctx, list, - client.InNamespace(c.namespace), - client.MatchingLabels{ - labelProject: project, - labelDomain: domain, - }, + client.InNamespace(ns), + client.MatchingLabels{labelAppManaged: "true"}, ); err != nil { return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) } apps := make([]*flyteapp.App, 0, len(list.Items)) for i := range list.Items { - a, err := kserviceToApp(&list.Items[i]) + a, err := c.kserviceToApp(ctx, &list.Items[i]) if err != nil { logger.Warnf(ctx, "Skipping KService %s: failed to convert to app: %v", list.Items[i].Name, err) continue @@ -284,14 +284,20 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*fly // --- Helpers --- -// kserviceName builds the KService name from an app identifier. -// Format: "{project}-{domain}-{name}", truncated to 63 chars. +// kserviceName returns the KService name for an app. Since each app is deployed +// to its own project/domain namespace, the name only needs to be unique within +// that namespace — the app name alone suffices. +// Names are lower-cased and capped at 63 chars (K8s DNS label limit). For names +// that exceed 63 chars, the first 54 chars are kept and an 8-char SHA256 suffix +// is appended to avoid collisions between names with a long common prefix. func kserviceName(id *flyteapp.Identifier) string { - name := fmt.Sprintf("%s-%s-%s", id.GetProject(), id.GetDomain(), id.GetName()) - if len(name) > maxKServiceNameLen { - name = name[:maxKServiceNameLen] + name := strings.ToLower(id.GetName()) + if len(name) <= maxKServiceNameLen { + return name } - return strings.ToLower(name) + sum := sha256.Sum256([]byte(name)) + suffix := hex.EncodeToString(sum[:4]) // 4 bytes = 8 hex chars + return name[:maxKServiceNameLen-9] + "-" + suffix } // specSHA computes a SHA256 digest of the serialized App Spec proto. @@ -309,6 +315,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err appID := app.GetMetadata().GetId() spec := app.GetSpec() name := kserviceName(appID) + ns := appNamespace(appID.GetProject(), appID.GetDomain()) sha, err := specSHA(spec) if err != nil { @@ -334,7 +341,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err ksvc := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: c.namespace, + Namespace: ns, Labels: map[string]string{ labelAppManaged: "true", labelProject: appID.GetProject(), @@ -437,7 +444,8 @@ func statusWithPhase(phase flyteapp.Status_DeploymentStatus, message string) *fl } // kserviceToStatus maps a KService's conditions to a flyteapp.Status proto. -func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { +// It fetches the latest ready Revision to read the accurate ActualReplicas count. +func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Service) *flyteapp.Status { var phase flyteapp.Status_DeploymentStatus var message string @@ -474,8 +482,15 @@ func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { } } - // Populate current replica count and K8s namespace metadata. - status.CurrentReplicas = uint32(len(ksvc.Status.Traffic)) + // Populate current replica count from the latest ready Revision. + if revName := ksvc.Status.LatestReadyRevisionName; revName != "" { + rev := &servingv1.Revision{} + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: revName, Namespace: ksvc.Namespace}, rev); err == nil { + if rev.Status.ActualReplicas != nil { + status.CurrentReplicas = uint32(*rev.Status.ActualReplicas) + } + } + } status.K8SMetadata = &flyteapp.K8SMetadata{ Namespace: ksvc.Namespace, } @@ -483,17 +498,13 @@ func kserviceToStatus(ksvc *servingv1.Service) *flyteapp.Status { return status } -// GetReplicas lists the pods currently backing the given app by matching -// the flyte.org/project, flyte.org/domain, and flyte.org/app-name labels. +// GetReplicas lists the pods currently backing the given app. func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) { + ns := appNamespace(appID.GetProject(), appID.GetDomain()) podList := &corev1.PodList{} if err := c.k8sClient.List(ctx, podList, - client.InNamespace(c.namespace), - client.MatchingLabels{ - labelProject: appID.GetProject(), - labelDomain: appID.GetDomain(), - labelAppName: appID.GetName(), - }, + client.InNamespace(ns), + client.MatchingLabels{labelAppName: appID.GetName()}, ); err != nil { return nil, fmt.Errorf("failed to list pods for app %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) @@ -508,16 +519,18 @@ func (c *AppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifi // DeleteReplica force-deletes a specific pod. Knative will schedule a replacement automatically. func (c *AppK8sClient) DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error { + appID := replicaID.GetAppId() + ns := appNamespace(appID.GetProject(), appID.GetDomain()) pod := &corev1.Pod{} pod.Name = replicaID.GetName() - pod.Namespace = c.namespace + pod.Namespace = ns if err := c.k8sClient.Delete(ctx, pod); err != nil { if k8serrors.IsNotFound(err) { return nil } - return fmt.Errorf("failed to delete pod %s/%s: %w", c.namespace, replicaID.GetName(), err) + return fmt.Errorf("failed to delete pod %s/%s: %w", ns, replicaID.GetName(), err) } - logger.Infof(ctx, "Deleted replica pod %s/%s", c.namespace, replicaID.GetName()) + logger.Infof(ctx, "Deleted replica pod %s/%s", ns, replicaID.GetName()) return nil } @@ -575,7 +588,7 @@ func podDeploymentStatus(pod *corev1.Pod) (string, string) { // kserviceToApp reconstructs a flyteapp.App from a KService by reading the // app identifier from annotations and the live status from KService conditions. -func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { +func (c *AppK8sClient) kserviceToApp(ctx context.Context, ksvc *servingv1.Service) (*flyteapp.App, error) { appIDStr, ok := ksvc.Annotations[annotationAppID] if !ok { return nil, fmt.Errorf("KService %s missing %s annotation", ksvc.Name, annotationAppID) @@ -597,6 +610,6 @@ func kserviceToApp(ksvc *servingv1.Service) (*flyteapp.App, error) { Metadata: &flyteapp.Meta{ Id: appID, }, - Status: kserviceToStatus(ksvc), + Status: c.kserviceToStatus(ctx, ksvc), }, nil } diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 99be7ba9ec..706e21c0d1 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -2,6 +2,8 @@ package k8s import ( "context" + "crypto/sha256" + "encoding/hex" "testing" "time" @@ -21,7 +23,7 @@ import ( flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) -// testScheme builds a runtime.Scheme with Knative types registered. +// testScheme builds a runtime.Scheme with Knative and core types registered. func testScheme(t *testing.T) *runtime.Scheme { t.Helper() s := runtime.NewScheme() @@ -30,6 +32,19 @@ func testScheme(t *testing.T) *runtime.Scheme { return s } +// testRevision builds a Knative Revision object with a given ActualReplicas count. +func testRevision(name, namespace string, actualReplicas int32) *servingv1.Revision { + return &servingv1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Status: servingv1.RevisionStatus{ + ActualReplicas: &actualReplicas, + }, + } +} + // testClient builds an AppK8sClient backed by a fake K8s client. func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { t.Helper() @@ -38,15 +53,12 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { WithScheme(s). WithObjects(objs...). Build() - cfg := &config.AppConfig{ - Namespace: "flyte-apps", - DefaultRequestTimeout: 5 * time.Minute, - MaxRequestTimeout: time.Hour, - } return &AppK8sClient{ k8sClient: fc, - namespace: cfg.Namespace, - cfg: cfg, + cfg: &config.AppConfig{ + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + }, } } @@ -79,7 +91,7 @@ func TestDeploy_Create(t *testing.T) { ksvc := &servingv1.Service{} err = c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc) require.NoError(t, err) assert.Equal(t, "proj", ksvc.Labels[labelProject]) assert.Equal(t, "dev", ksvc.Labels[labelDomain]) @@ -99,7 +111,7 @@ func TestDeploy_UpdateOnSpecChange(t *testing.T) { ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) assert.Equal(t, "nginx:2.0", ksvc.Spec.Template.Spec.Containers[0].Image) } @@ -111,14 +123,14 @@ func TestDeploy_SkipUpdateWhenUnchanged(t *testing.T) { // Get initial resource version. ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) initialRV := ksvc.ResourceVersion // Deploy same spec — should be a no-op. require.NoError(t, c.Deploy(context.Background(), app)) require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) assert.Equal(t, initialRV, ksvc.ResourceVersion, "resource version should not change on no-op deploy") } @@ -132,7 +144,7 @@ func TestStop(t *testing.T) { ksvc := &servingv1.Service{} require.NoError(t, c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc)) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc)) assert.Equal(t, "0", ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"]) } @@ -153,7 +165,7 @@ func TestDelete(t *testing.T) { ksvc := &servingv1.Service{} err := c.k8sClient.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp", Namespace: "flyte-apps"}, ksvc) + client.ObjectKey{Name: "myapp", Namespace: "proj-dev"}, ksvc) assert.True(t, k8serrors.IsNotFound(err)) } @@ -167,9 +179,9 @@ func TestGetStatus_NotFound(t *testing.T) { c := testClient(t) id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "missing"} status, err := c.GetStatus(context.Background(), id) - require.NoError(t, err) - require.Len(t, status.Conditions, 1) - assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) + assert.Nil(t, status) } func TestGetStatus_Stopped(t *testing.T) { @@ -186,17 +198,57 @@ func TestGetStatus_Stopped(t *testing.T) { assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, status.Conditions[0].DeploymentStatus) } +func TestGetStatus_CurrentReplicas(t *testing.T) { + s := testScheme(t) + // Pre-populate a KService with LatestReadyRevisionName already set in status, + // and the corresponding Revision with ActualReplicas=4. + ksvc := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myapp", + Namespace: "proj-dev", + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "myapp", + }, + Annotations: map[string]string{ + annotationAppID: "proj/dev/myapp", + }, + }, + } + ksvc.Status.LatestReadyRevisionName = "myapp-00001" + + rev := testRevision("myapp-00001", "proj-dev", 4) + + fc := fake.NewClientBuilder(). + WithScheme(s). + WithObjects(ksvc, rev). + WithStatusSubresource(ksvc). + Build() + c := &AppK8sClient{ + k8sClient: fc, + cfg: &config.AppConfig{}, + } + + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + status, err := c.GetStatus(context.Background(), id) + require.NoError(t, err) + assert.Equal(t, uint32(4), status.CurrentReplicas) +} + func TestList(t *testing.T) { s := testScheme(t) // Pre-populate two KServices with different project labels. ksvc1 := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-app1", - Namespace: "flyte-apps", + Name: "app1", + Namespace: "proj-dev", Labels: map[string]string{ - labelProject: "proj", - labelDomain: "dev", - labelAppName: "app1", + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app1", }, Annotations: map[string]string{ annotationAppID: "proj/dev/app1", @@ -205,12 +257,13 @@ func TestList(t *testing.T) { } ksvc2 := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "other-dev-app2", - Namespace: "flyte-apps", + Name: "app2", + Namespace: "other-dev", Labels: map[string]string{ - labelProject: "other", - labelDomain: "dev", - labelAppName: "app2", + labelAppManaged: "true", + labelProject: "other", + labelDomain: "dev", + labelAppName: "app2", }, Annotations: map[string]string{ annotationAppID: "other/dev/app2", @@ -224,9 +277,7 @@ func TestList(t *testing.T) { Build() c := &AppK8sClient{ k8sClient: fc, - namespace: "flyte-apps", cfg: &config.AppConfig{ - Namespace: "flyte-apps", DefaultRequestTimeout: 5 * time.Minute, MaxRequestTimeout: time.Hour, }, @@ -243,11 +294,9 @@ func TestGetReplicas(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-myapp-abc", - Namespace: "flyte-apps", + Name: "myapp-abc", + Namespace: "proj-dev", Labels: map[string]string{ - labelProject: "proj", - labelDomain: "dev", labelAppName: "myapp", }, }, @@ -261,15 +310,14 @@ func TestGetReplicas(t *testing.T) { fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - namespace: "flyte-apps", - cfg: &config.AppConfig{Namespace: "flyte-apps"}, + cfg: &config.AppConfig{}, } id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} replicas, err := c.GetReplicas(context.Background(), id) require.NoError(t, err) require.Len(t, replicas, 1) - assert.Equal(t, "proj-dev-myapp-abc", replicas[0].Metadata.Id.Name) + assert.Equal(t, "myapp-abc", replicas[0].Metadata.Id.Name) assert.Equal(t, "ACTIVE", replicas[0].Status.DeploymentStatus) } @@ -277,33 +325,32 @@ func TestDeleteReplica(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-myapp-abc", - Namespace: "flyte-apps", + Name: "myapp-abc", + Namespace: "proj-dev", }, } fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - namespace: "flyte-apps", - cfg: &config.AppConfig{Namespace: "flyte-apps"}, + cfg: &config.AppConfig{}, } replicaID := &flyteapp.ReplicaIdentifier{ AppId: &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}, - Name: "proj-dev-myapp-abc", + Name: "myapp-abc", } require.NoError(t, c.DeleteReplica(context.Background(), replicaID)) err := fc.Get(context.Background(), - client.ObjectKey{Name: "proj-dev-myapp-abc", Namespace: "flyte-apps"}, &corev1.Pod{}) + client.ObjectKey{Name: "myapp-abc", Namespace: "proj-dev"}, &corev1.Pod{}) assert.True(t, k8serrors.IsNotFound(err)) } func TestKserviceEventToWatchResponse(t *testing.T) { ksvc := &servingv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "proj-dev-myapp", - Namespace: "flyte-apps", + Name: "myapp", + Namespace: "proj-dev", Annotations: map[string]string{ annotationAppID: "proj/dev/myapp", }, @@ -322,9 +369,10 @@ func TestKserviceEventToWatchResponse(t *testing.T) { {k8swatch.Bookmark, true, ""}, } + c := testClient(t) for _, tt := range tests { t.Run(string(tt.eventType), func(t *testing.T) { - resp := kserviceEventToWatchResponse(k8swatch.Event{ + resp := c.kserviceEventToWatchResponse(context.Background(), k8swatch.Event{ Type: tt.eventType, Object: ksvc, }) @@ -349,22 +397,29 @@ func TestKserviceEventToWatchResponse(t *testing.T) { func TestKserviceName(t *testing.T) { tests := []struct { - project, domain, name string - want string + name string + want string }{ - {"proj", "dev", "myapp", "proj-dev-myapp"}, - {"P", "D", "N", "p-d-n"}, - // Long name should be truncated to 63 chars. + {"myapp", "myapp"}, + {"MyApp", "myapp"}, + // v1 and v2 variants stay distinct — no truncation collision. + {"my-long-service-name-v1", "my-long-service-name-v1"}, + {"my-long-service-name-v2", "my-long-service-name-v2"}, + // Names over 63 chars get a hash suffix instead of blind truncation. { - "verylongprojectname", - "verylongdomainname", - "verylongappnamethatexceedslimit", - "verylongprojectname-verylongdomainname-verylongappnamethatexcee"[:63], + "this-is-a-very-long-app-name-that-exceeds-the-kubernetes-dns-label-limit", + func() string { + name := "this-is-a-very-long-app-name-that-exceeds-the-kubernetes-dns-label-limit" + sum := sha256.Sum256([]byte(name)) + return name[:54] + "-" + hex.EncodeToString(sum[:4]) + }(), }, } for _, tt := range tests { - id := &flyteapp.Identifier{Project: tt.project, Domain: tt.domain, Name: tt.name} - assert.Equal(t, tt.want, kserviceName(id)) + id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: tt.name} + got := kserviceName(id) + assert.Equal(t, tt.want, got) + assert.LessOrEqual(t, len(got), maxKServiceNameLen) } } From 1ff02368c0fbdfc960d3851dfce3572f2f63f350 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Wed, 15 Apr 2026 11:23:46 -0700 Subject: [PATCH 9/9] comments Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/internal/k8s/app_client.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index c9b1baf268..c12b9b1278 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -302,7 +302,7 @@ func kserviceName(id *flyteapp.Identifier) string { // specSHA computes a SHA256 digest of the serialized App Spec proto. func specSHA(spec *flyteapp.Spec) (string, error) { - b, err := proto.Marshal(spec) + b, err := proto.MarshalOptions{Deterministic: true}.Marshal(spec) if err != nil { return "", fmt.Errorf("failed to marshal spec: %w", err) } @@ -358,6 +358,10 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err Template: servingv1.RevisionTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: templateAnnotations, + Labels: map[string]string{ + labelAppManaged: "true", + labelAppName: appID.GetName(), + }, }, Spec: servingv1.RevisionSpec{ PodSpec: podSpec, @@ -463,8 +467,8 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE case ksvc.IsFailed(): phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED - if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { - message = c.Message + if condition := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); condition != nil { + message = condition.Message } case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING