diff --git a/Dockerfile b/Dockerfile index d43a445e531..d3bf838fdd0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,7 @@ COPY flyteplugins flyteplugins COPY flytestdlib flytestdlib COPY gen/go gen/go COPY actions actions +COPY app app COPY events events COPY runs runs COPY cache_service cache_service diff --git a/Makefile b/Makefile index b983a494d86..a3eab6747d0 100644 --- a/Makefile +++ b/Makefile @@ -31,37 +31,22 @@ build: verify ## Build all Go service binaries $(MAKE) -C runs build $(MAKE) -C executor build -# ============================================================================= -# Sandbox Commands -# ============================================================================= - -.PHONY: sandbox-build -sandbox-build: ## Build and start the flyte sandbox (docker/devbox-bundled) - $(MAKE) -C docker/devbox-bundled build - -# Run in dev mode with extra arg FLYTE_DEV=True -.PHONY: sandbox-run -sandbox-run: ## Start the flyte sandbox without rebuilding the image - $(MAKE) -C docker/devbox-bundled start - -.PHONY: sandbox-stop -sandbox-stop: ## Stop the flyte sandbox - $(MAKE) -C docker/devbox-bundled stop - # ============================================================================= # Devbox Commands # ============================================================================= .PHONY: devbox-build -devbox-build: ## Build and start the flyte devbox cluster (docker/devbox-bundled) +devbox-build: ## Build the flyte devbox image (docker/devbox-bundled) $(MAKE) -C docker/devbox-bundled build +# Run in dev mode with extra arg FLYTE_DEV=True .PHONY: devbox-run -devbox-run: ## Start the flyte devbox cluster without rebuilding the image - $(MAKE) -C docker/devbox-bundled start +devbox-run: ## Start the flyte devbox and install Knative with app routing config + $(MAKE) -C docker/devbox-bundled start FLYTE_DEV=$(FLYTE_DEV) + $(MAKE) -C docker/devbox-bundled setup-knative .PHONY: devbox-stop -devbox-stop: ## Stop the flyte devbox cluster +devbox-stop: ## Stop the flyte devbox $(MAKE) -C docker/devbox-bundled stop .PHONY: help @@ -83,10 +68,9 @@ sep: # Helper to time a step: $(call timed,step_name,command) define timed @start=$$(date +%s); \ - $(2); rc=$$?; \ + $(2); \ elapsed=$$(( $$(date +%s) - $$start )); \ - echo "⏱ $(1) completed in $${elapsed}s"; \ - exit $$rc + echo "⏱ $(1) completed in $${elapsed}s" endef .PHONY: buf-dep diff --git a/app/config/config.go b/app/config/config.go index 6532f30b5c0..fe547080a01 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -20,3 +20,55 @@ func DefaultAppConfig() *AppConfig { CacheTTL: 30 * time.Second, } } + +// InternalAppConfig holds configuration for the data plane InternalAppService. +type InternalAppConfig struct { + // Enabled controls whether the InternalAppService is started. + Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + + // BaseDomain is the base domain used to generate public URLs for apps. + // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". + BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` + + // Scheme is the URL scheme used for public app URLs ("http" or "https"). + // Defaults to "https" if unset. + Scheme string `json:"scheme" pflag:",URL scheme for app public URLs (http or https)"` + + // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. + DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` + + // MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s). + MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"` + + // IngressEnabled controls whether a Traefik IngressRoute and Middleware are + // created for each deployed app so it is reachable at + // {name}-{project}-{domain}.{IngressAppsDomain}:{IngressAppsPort} through Traefik. + // Enable this for sandbox/local setups where Traefik is the ingress controller. + IngressEnabled bool `json:"ingressEnabled" pflag:",Create Traefik IngressRoute for each app"` + + // IngressEntryPoint is the Traefik entry point name that app routes are + // attached to (default: "apps"). + IngressEntryPoint string `json:"ingressEntryPoint" pflag:",Traefik entry point name for app ingress routes"` + + // IngressAppsDomain is the domain suffix for subdomain-based app URLs. + // Apps are exposed at {name}-{project}-{domain}.{IngressAppsDomain}. + // Use "localhost" for local devbox (resolves without /etc/hosts on macOS/Linux). + // Windows users can override to e.g. "localtest.me". + IngressAppsDomain string `json:"ingressAppsDomain" pflag:",Domain suffix for app subdomain URLs"` + + // IngressAppsPort is the port appended to the public app URL (e.g. 30081). + // Set to 0 to omit the port when behind a standard 80/443 proxy. + IngressAppsPort int `json:"ingressAppsPort" pflag:",Port for app subdomain URLs (0 = omit)"` + + // DefaultEnvVars is a list of environment variables injected into every KService + // pod at deploy time, in addition to any env vars specified in the app spec. + // Use this to inject cluster-internal endpoints (e.g. _U_EP_OVERRIDE) that app + // processes need to connect back to the Flyte manager. + DefaultEnvVars []EnvVar `json:"defaultEnvVars" pflag:"-,Default env vars injected into every app pod"` +} + +// EnvVar is a name/value environment variable pair for app pod injection. +type EnvVar struct { + Name string `json:"name"` + Value string `json:"value"` +} diff --git a/app/internal/config/config.go b/app/internal/config/config.go index 17e9e01dc23..4aa836ca222 100644 --- a/app/internal/config/config.go +++ b/app/internal/config/config.go @@ -1,19 +1,7 @@ package config -import "time" +import appconfig "github.com/flyteorg/flyte/v2/app/config" -// InternalAppConfig holds configuration for the data plane app deployment controller. -type InternalAppConfig struct { - // Enabled controls whether the app deployment controller is started. - Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` - - // BaseDomain is the base domain used to generate public URLs for apps. - // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". - BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` - - // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. - DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` - - // MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s). - MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"` -} +// InternalAppConfig is an alias of the public config type so internal packages +// can import it without depending on the public app/config path directly. +type InternalAppConfig = appconfig.InternalAppConfig diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 474823529e6..e79f4860721 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -13,7 +13,10 @@ import ( timestamppb "google.golang.org/protobuf/types/known/timestamppb" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + k8sresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" k8swatch "k8s.io/apimachinery/pkg/watch" servingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -21,8 +24,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/flytestdlib/k8s" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + flytecore "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) const ( @@ -40,6 +45,11 @@ const ( maxKServiceNameLen = 63 ) +var ( + traefikIngressRouteGVK = schema.GroupVersionKind{Group: "traefik.io", Version: "v1alpha1", Kind: "IngressRoute"} + traefikMiddlewareGVK = schema.GroupVersionKind{Group: "traefik.io", Version: "v1alpha1", Kind: "Middleware"} +) + // AppK8sClientInterface defines the KService lifecycle operations for the App service. type AppK8sClientInterface interface { // Deploy creates or updates the KService for the given app. Idempotent — skips @@ -55,10 +65,9 @@ type AppK8sClientInterface interface { GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) // List returns apps for the given project/domain scope with optional pagination. - // If appName is non-empty, only the app with that name is returned. // limit=0 means no limit. token is the K8s continue token from a previous call. // Returns the apps, the continue token for the next page (empty if last page), and any error. - List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) + List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) // Delete removes the KService CRD entirely. The app must be re-created from scratch. // Use Stop to scale to zero while preserving the KService. @@ -104,6 +113,10 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { ns := appNamespace(appID.GetProject(), appID.GetDomain()) name := kserviceName(appID) + if err := k8s.EnsureNamespaceExists(ctx, c.k8sClient, ns); err != nil { + return fmt.Errorf("failed to ensure namespace %s: %w", ns, err) + } + ksvc, err := c.buildKService(app) if err != nil { return fmt.Errorf("failed to build KService for app %s: %w", name, err) @@ -116,26 +129,40 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { return fmt.Errorf("failed to create KService %s: %w", name, err) } logger.Infof(ctx, "Created KService %s/%s", ns, name) - return nil + return c.deployIngress(ctx, app) } if err != nil { return fmt.Errorf("failed to get KService %s: %w", name, err) } - // Skip update if spec has not changed. + // Skip KService update if spec has not changed, but still ensure ingress exists + // (it may be missing if the app was deployed before ingress support was added). if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] { logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", ns, name) - return nil + return c.deployIngress(ctx, app) } existing.Spec = ksvc.Spec - existing.Labels = ksvc.Labels - existing.Annotations = ksvc.Annotations + // Merge labels and annotations rather than replacing them wholesale. + // Knative sets immutable annotations (e.g. serving.knative.dev/creator) + // on creation; overwriting them causes the admission webhook to reject the update. + if existing.Labels == nil { + existing.Labels = make(map[string]string) + } + for k, v := range ksvc.Labels { + existing.Labels[k] = v + } + if existing.Annotations == nil { + existing.Annotations = make(map[string]string) + } + for k, v := range ksvc.Annotations { + existing.Annotations[k] = v + } if err := c.k8sClient.Update(ctx, existing); err != nil { return fmt.Errorf("failed to update KService %s: %w", name, err) } logger.Infof(ctx, "Updated KService %s/%s", ns, name) - return nil + return c.deployIngress(ctx, app) } // Stop sets max-scale=0 on the KService, scaling it to zero without deleting it. @@ -166,11 +193,13 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e ksvc.Namespace = ns if err := c.k8sClient.Delete(ctx, ksvc); err != nil { if k8serrors.IsNotFound(err) { + c.deleteIngress(ctx, appID) return nil } return fmt.Errorf("failed to delete KService %s: %w", name, err) } logger.Infof(ctx, "Deleted KService %s/%s", ns, name) + c.deleteIngress(ctx, appID) return nil } @@ -232,17 +261,18 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName strin } // openWatch starts a K8s watch from resourceVersion (empty = watch from now). -// AllowWatchBookmarks is always set so K8s sends Bookmark events on every session, -// keeping lastResourceVersion current even when no objects change. func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[string]string, resourceVersion string) (k8swatch.Interface, error) { - rawOpts := &metav1.ListOptions{AllowWatchBookmarks: true} - if resourceVersion != "" { - rawOpts.ResourceVersion = resourceVersion - } opts := []client.ListOption{ client.InNamespace(ns), client.MatchingLabels(labels), - &client.ListOptions{Raw: rawOpts}, + } + if resourceVersion != "" { + opts = append(opts, &client.ListOptions{ + Raw: &metav1.ListOptions{ + ResourceVersion: resourceVersion, + AllowWatchBookmarks: true, + }, + }) } watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, opts...) if err != nil { @@ -252,9 +282,7 @@ func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[stri } // watchLoop is the reconnect loop. It drains watcher until it closes, then -// reopens. Exponential backoff is applied only on K8s Error events; normal -// watch timeouts (clean channel close) reconnect immediately. Closes ch only -// when ctx is cancelled. +// reopens with exponential backoff. Closes ch only when ctx is cancelled. func (c *AppK8sClient) watchLoop( ctx context.Context, ns string, @@ -268,27 +296,21 @@ func (c *AppK8sClient) watchLoop( state := &watchState{backoff: watchBackoffInitial} for { - reconnect, isError := c.drainWatcher(ctx, watcher, ch, state) + reconnect := c.drainWatcher(ctx, watcher, ch, state) if !reconnect { return // ctx cancelled } watcher.Stop() + state.consecutiveErrors++ + delay := state.nextBackoff() + logger.Warnf(ctx, "KService watch in namespace %s closed unexpectedly (attempt %d); reconnecting in %v", + ns, state.consecutiveErrors, delay) - if isError { - state.consecutiveErrors++ - delay := state.nextBackoff() - logger.Warnf(ctx, "KService watch in namespace %s closed with error (attempt %d); reconnecting in %v", - ns, state.consecutiveErrors, delay) - select { - case <-ctx.Done(): - return - case <-time.After(delay): - } - } else { - // Normal K8s watch timeout — reconnect immediately, no backoff. - state.resetBackoff() - logger.Debugf(ctx, "KService watch in namespace %s timed out naturally; reconnecting", ns) + select { + case <-ctx.Done(): + return + case <-time.After(delay): } newWatcher, err := c.openWatch(ctx, ns, labels, state.lastResourceVersion) @@ -303,23 +325,24 @@ func (c *AppK8sClient) watchLoop( } // drainWatcher processes events from watcher until the channel closes or ctx is done. -// Returns (reconnect, isError): reconnect=false means ctx was cancelled (stop the loop); -// isError=true means a K8s Error event triggered the close and backoff should be applied. +// Returns true if reconnect is needed, false if ctx was cancelled. func (c *AppK8sClient) drainWatcher( ctx context.Context, watcher k8swatch.Interface, ch chan<- *flyteapp.WatchResponse, state *watchState, -) (bool, bool) { +) bool { for { select { case <-ctx.Done(): - return false, false + return false case event, ok := <-watcher.ResultChan(): if !ok { - return true, false // normal K8s watch timeout + return true } + c.updateResourceVersion(event, state) + switch event.Type { case k8swatch.Error: if status, ok := event.Object.(*metav1.Status); ok { @@ -328,24 +351,20 @@ func (c *AppK8sClient) drainWatcher( } else { logger.Warnf(ctx, "KService watch received error event (type %T); will reconnect", event.Object) } - return true, true // error — apply backoff on reconnect + return true case k8swatch.Bookmark: - // Update RV immediately — there is no delivery to confirm. - c.updateResourceVersion(event, state) + // resourceVersion already updated — nothing to forward. state.resetBackoff() default: resp := c.kserviceEventToWatchResponse(ctx, event) if resp == nil { continue } + state.resetBackoff() select { case ch <- resp: - // Advance RV only after confirmed delivery so a failed send - // doesn't silently skip the event on the next reconnect. - c.updateResourceVersion(event, state) - state.resetBackoff() case <-ctx.Done(): - return false, false + return false } } } @@ -353,7 +372,7 @@ func (c *AppK8sClient) drainWatcher( } // updateResourceVersion extracts and stores the latest resourceVersion from a watch event. -// For Bookmark events it is called immediately; for data events only after successful delivery. +// Called before event type dispatch so both normal events and Bookmarks checkpoint the position. func (c *AppK8sClient) updateResourceVersion(event k8swatch.Event, state *watchState) { switch event.Type { case k8swatch.Added, k8swatch.Modified, k8swatch.Deleted, k8swatch.Bookmark: @@ -416,16 +435,12 @@ func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier } // List returns apps for the given project/domain scope with optional pagination. -func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) { +func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) { ns := appNamespace(project, domain) - matchLabels := client.MatchingLabels{labelAppManaged: "true"} - if appName != "" { - matchLabels[labelAppName] = strings.ToLower(appName) - } listOpts := []client.ListOption{ client.InNamespace(ns), - matchLabels, + client.MatchingLabels{labelAppManaged: "true"}, } if limit > 0 { listOpts = append(listOpts, client.Limit(int64(limit))) @@ -451,6 +466,152 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string return apps, list.Continue, nil } +// publicIngress returns the deterministic public URL for an app using the same +// logic as the service layer so GetStatus/List/Watch are consistent with Create. +func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { + if c.cfg.IngressEnabled && c.cfg.IngressAppsDomain != "" { + scheme := c.cfg.Scheme + if scheme == "" { + scheme = "http" + } + host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", + id.GetName(), id.GetProject(), id.GetDomain(), c.cfg.IngressAppsDomain)) + url := scheme + "://" + host + if c.cfg.IngressAppsPort != 0 { + url += fmt.Sprintf(":%d", c.cfg.IngressAppsPort) + } + return &flyteapp.Ingress{PublicUrl: url} + } + if c.cfg.BaseDomain == "" { + return nil + } + scheme := c.cfg.Scheme + if scheme == "" { + scheme = "https" + } + host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", + id.GetName(), id.GetProject(), id.GetDomain(), c.cfg.BaseDomain)) + return &flyteapp.Ingress{PublicUrl: scheme + "://" + host} +} + +// deployIngress creates or updates the Traefik resources for the given app: +// - host- Middleware: rewrites Host to the Knative hostname so Kourier +// can route the request (Knative's K8s service is ExternalName → Kourier) +// - app- IngressRoute: matches Host header and applies the middleware +// +// No-ops when IngressEnabled is false. +func (c *AppK8sClient) deployIngress(ctx context.Context, app *flyteapp.App) error { + if !c.cfg.IngressEnabled { + return nil + } + appID := app.GetMetadata().GetId() + ns := appNamespace(appID.GetProject(), appID.GetDomain()) + ksvcName := kserviceName(appID) + hostName := "host-" + ksvcName + routeName := "app-" + ksvcName + appHost := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", + appID.GetName(), appID.GetProject(), appID.GetDomain(), c.cfg.IngressAppsDomain)) + entryPoint := c.cfg.IngressEntryPoint + if entryPoint == "" { + entryPoint = "apps" + } + + // Knative's K8s service is ExternalName → kourier-internal. Kourier routes by + // Host header, so we must rewrite it to the deterministic Knative hostname. + // hostname pattern matches the configured domain-template: {name}-{namespace}.{domain} + knativeHost := fmt.Sprintf("%s-%s.%s", ksvcName, ns, c.cfg.BaseDomain) + + hostMW := &unstructured.Unstructured{} + hostMW.SetGroupVersionKind(traefikMiddlewareGVK) + hostMW.SetName(hostName) + hostMW.SetNamespace(ns) + hostMW.Object["spec"] = map[string]interface{}{ + "headers": map[string]interface{}{ + "customRequestHeaders": map[string]interface{}{ + "Host": knativeHost, + // Rewrite Origin to match the Knative host so that apps with + // XSRF/CORS origin checks (e.g. Streamlit) accept the connection. + // Safe for devbox — XSRF provides no value in a local environment. + "Origin": "http://" + knativeHost, + }, + }, + } + + ir := &unstructured.Unstructured{} + ir.SetGroupVersionKind(traefikIngressRouteGVK) + ir.SetName(routeName) + ir.SetNamespace(ns) + ir.Object["spec"] = map[string]interface{}{ + "entryPoints": []interface{}{entryPoint}, + "routes": []interface{}{ + map[string]interface{}{ + "match": fmt.Sprintf("Host(`%s`)", appHost), + "kind": "Rule", + "middlewares": []interface{}{ + map[string]interface{}{"name": hostName}, + }, + "services": []interface{}{ + map[string]interface{}{ + "name": ksvcName, + "port": int64(80), + }, + }, + }, + }, + } + + for _, obj := range []*unstructured.Unstructured{hostMW, ir} { + existing := &unstructured.Unstructured{} + existing.SetGroupVersionKind(obj.GroupVersionKind()) + err := c.k8sClient.Get(ctx, client.ObjectKey{Name: obj.GetName(), Namespace: obj.GetNamespace()}, existing) + if k8serrors.IsNotFound(err) { + if createErr := c.k8sClient.Create(ctx, obj); createErr != nil { + return fmt.Errorf("failed to create %s %s/%s: %w", obj.GetKind(), ns, obj.GetName(), createErr) + } + logger.Infof(ctx, "Created %s %s/%s", obj.GetKind(), ns, obj.GetName()) + } else if err != nil { + return fmt.Errorf("failed to get %s %s/%s: %w", obj.GetKind(), ns, obj.GetName(), err) + } else { + obj.SetResourceVersion(existing.GetResourceVersion()) + if updateErr := c.k8sClient.Update(ctx, obj); updateErr != nil { + return fmt.Errorf("failed to update %s %s/%s: %w", obj.GetKind(), ns, obj.GetName(), updateErr) + } + logger.Infof(ctx, "Updated %s %s/%s", obj.GetKind(), ns, obj.GetName()) + } + } + return nil +} + +// deleteIngress removes the Traefik IngressRoute and both Middlewares for the given app. +// Errors are logged as warnings — ingress cleanup failure does not block app deletion. +func (c *AppK8sClient) deleteIngress(ctx context.Context, appID *flyteapp.Identifier) { + if !c.cfg.IngressEnabled { + return + } + ns := appNamespace(appID.GetProject(), appID.GetDomain()) + ksvcName := kserviceName(appID) + + type resource struct { + gvk schema.GroupVersionKind + name string + } + resources := []resource{ + {traefikIngressRouteGVK, "app-" + ksvcName}, + {traefikMiddlewareGVK, "host-" + ksvcName}, + } + for _, r := range resources { + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(r.gvk) + obj.SetName(r.name) + obj.SetNamespace(ns) + if err := c.k8sClient.Delete(ctx, obj); err != nil && !k8serrors.IsNotFound(err) { + logger.Warnf(ctx, "Failed to delete %s %s/%s: %v", r.gvk.Kind, ns, r.name, err) + } else { + logger.Infof(ctx, "Deleted %s %s/%s", r.gvk.Kind, ns, r.name) + } + } +} + // --- Helpers --- // kserviceName returns the KService name for an app. Since each app is deployed @@ -495,6 +656,15 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err if err != nil { return nil, err } + // Inject cluster-level default env vars (e.g. _U_EP_OVERRIDE) before user vars + // so they can be overridden by app-specific env vars if needed. + if len(c.cfg.DefaultEnvVars) > 0 && len(podSpec.Containers) > 0 { + defaults := make([]corev1.EnvVar, 0, len(c.cfg.DefaultEnvVars)) + for _, e := range c.cfg.DefaultEnvVars { + defaults = append(defaults, corev1.EnvVar{Name: e.Name, Value: e.Value}) + } + podSpec.Containers[0].Env = append(defaults, podSpec.Containers[0].Env...) + } templateAnnotations := buildAutoscalingAnnotations(spec, c.cfg) @@ -546,9 +716,10 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { case *flyteapp.Spec_Container: c := p.Container container := corev1.Container{ - Name: "app", - Image: c.GetImage(), - Args: c.GetArgs(), + Name: "app", + Image: c.GetImage(), + Command: c.GetCommand(), + Args: c.GetArgs(), } for _, e := range c.GetEnv() { container.Env = append(container.Env, corev1.EnvVar{ @@ -556,7 +727,17 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { Value: e.GetValue(), }) } - return corev1.PodSpec{Containers: []corev1.Container{container}}, nil + for _, p := range c.GetPorts() { + container.Ports = append(container.Ports, corev1.ContainerPort{ + ContainerPort: int32(p.GetContainerPort()), + Name: p.GetName(), + }) + } + container.Resources = buildResourceRequirements(c.GetResources()) + return corev1.PodSpec{ + Containers: []corev1.Container{container}, + EnableServiceLinks: boolPtr(false), + }, nil case *flyteapp.Spec_Pod: // K8sPod payloads are not yet supported — the pod spec serialization @@ -568,6 +749,49 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { } } +// buildResourceRequirements maps flyteidl2 Resources to corev1.ResourceRequirements. +func buildResourceRequirements(res *flytecore.Resources) corev1.ResourceRequirements { + if res == nil { + return corev1.ResourceRequirements{} + } + reqs := corev1.ResourceRequirements{} + if len(res.GetRequests()) > 0 { + reqs.Requests = make(corev1.ResourceList) + for _, e := range res.GetRequests() { + if name, ok := protoResourceName(e.GetName()); ok { + reqs.Requests[name] = k8sresource.MustParse(e.GetValue()) + } + } + } + if len(res.GetLimits()) > 0 { + reqs.Limits = make(corev1.ResourceList) + for _, e := range res.GetLimits() { + if name, ok := protoResourceName(e.GetName()); ok { + reqs.Limits[name] = k8sresource.MustParse(e.GetValue()) + } + } + } + return reqs +} + +// protoResourceName maps a flyteidl2 ResourceName to the equivalent corev1.ResourceName. +func protoResourceName(name flytecore.Resources_ResourceName) (corev1.ResourceName, bool) { + switch name { + case flytecore.Resources_CPU: + return corev1.ResourceCPU, true + case flytecore.Resources_MEMORY: + return corev1.ResourceMemory, true + case flytecore.Resources_STORAGE: + return corev1.ResourceStorage, true + case flytecore.Resources_EPHEMERAL_STORAGE: + return corev1.ResourceEphemeralStorage, true + default: + return "", false + } +} + +func boolPtr(b bool) *bool { return &b } + // buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template. func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.InternalAppConfig) map[string]string { annotations := map[string]string{} @@ -644,10 +868,13 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser status := statusWithPhase(phase, message) - // Populate ingress URL from KService route status. - if url := ksvc.Status.URL; url != nil { - status.Ingress = &flyteapp.Ingress{ - PublicUrl: url.String(), + // Populate ingress URL from the app annotation so the URL is consistent + // with the Create response regardless of Knative route readiness. + if appIDStr := ksvc.Annotations[annotationAppID]; appIDStr != "" { + parts := strings.SplitN(appIDStr, "/", 3) + if len(parts) == 3 { + appID := &flyteapp.Identifier{Project: parts[0], Domain: parts[1], Name: parts[2]} + status.Ingress = c.publicIngress(appID) } } diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index cb04428a297..38c31127051 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -285,7 +285,7 @@ func TestList(t *testing.T) { }, } - apps, nextToken, err := c.List(context.Background(), "proj", "dev", "", 0, "") + apps, nextToken, err := c.List(context.Background(), "proj", "dev", 0, "") require.NoError(t, err) assert.Empty(t, nextToken) require.Len(t, apps, 1) @@ -682,54 +682,12 @@ func TestWatch_ExponentialBackoff(t *testing.T) { watchBackoffMax = 30 * time.Second }) - // Four watchers that each emit an Error event — only Error events trigger backoff. - // NewFakeWithChanSize(1,...) gives a buffer of 1 so pre-sends don't block before - // the consumer goroutine starts (NewFake() is unbuffered). - calls := make([]watchCall, 4) - for i := range calls { - w := k8swatch.NewFakeWithChanSize(1, false) - calls[i] = watchCall{watcher: w} - w.Error(&metav1.Status{Code: 410, Reason: metav1.StatusReasonExpired, Message: "resource version too old"}) - } - - c, mwc := newMultiClient(t, calls) - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - start := time.Now() - ch, err := c.Watch(ctx, "proj", "dev", "") - require.NoError(t, err) - - // Wait until at least 4 Watch() calls have been made. - require.Eventually(t, func() bool { - mwc.mu.Lock() - defer mwc.mu.Unlock() - return len(mwc.capturedRVs) >= 4 - }, 2*time.Second, 5*time.Millisecond) - - elapsed := time.Since(start) - // With 10ms+20ms+40ms backoffs before 4th call, minimum elapsed ≈ 70ms. - assert.GreaterOrEqual(t, elapsed, 60*time.Millisecond, "backoff should accumulate across error reconnects") - - cancel() - for range ch { - } -} - -func TestWatch_CleanCloseNoBackoff(t *testing.T) { - watchBackoffInitial = 50 * time.Millisecond - watchBackoffMax = 200 * time.Millisecond - t.Cleanup(func() { - watchBackoffInitial = 1 * time.Second - watchBackoffMax = 30 * time.Second - }) - - // Three watchers that close immediately (clean channel close, no Error event). + // Four watchers that each close immediately. calls := []watchCall{ {watcher: k8swatch.NewFake()}, {watcher: k8swatch.NewFake()}, {watcher: k8swatch.NewFake()}, + {watcher: k8swatch.NewFake()}, } for _, wc := range calls { wc.watcher.(*k8swatch.FakeWatcher).Stop() @@ -744,17 +702,19 @@ func TestWatch_CleanCloseNoBackoff(t *testing.T) { ch, err := c.Watch(ctx, "proj", "dev", "") require.NoError(t, err) + // Wait until at least 4 Watch() calls have been made. require.Eventually(t, func() bool { mwc.mu.Lock() defer mwc.mu.Unlock() - return len(mwc.capturedRVs) >= 3 - }, 500*time.Millisecond, 5*time.Millisecond) + return len(mwc.capturedRVs) >= 4 + }, 2*time.Second, 5*time.Millisecond) elapsed := time.Since(start) - // Clean closes must not apply backoff — all 3 reconnects should be nearly instant. - assert.Less(t, elapsed, 30*time.Millisecond, "clean closes should not apply backoff delay") + // With 10ms+20ms+40ms backoffs before 4th call, minimum elapsed ≈ 70ms. + assert.GreaterOrEqual(t, elapsed, 60*time.Millisecond, "backoff should accumulate across reconnects") cancel() + // Drain the channel to let the goroutine exit. for range ch { } } diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index dc847ec4823..41b4bf2b91f 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -7,7 +7,6 @@ import ( "connectrpc.com/connect" timestamppb "google.golang.org/protobuf/types/known/timestamppb" - k8serrors "k8s.io/apimachinery/pkg/api/errors" appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" @@ -61,23 +60,42 @@ func (s *InternalAppService) Create( LastTransitionTime: timestamppb.Now(), }, }, - Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg.BaseDomain), + Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg), } return connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil } // publicIngress builds the deterministic public URL for an app. -// Pattern: "https://{name}-{project}-{domain}.{base_domain}" -// Returns nil if BaseDomain is not configured. -func publicIngress(id *flyteapp.Identifier, baseDomain string) *flyteapp.Ingress { - if baseDomain == "" { +// When Traefik ingress is enabled (IngressEnabled + IngressAppsDomain), the URL is +// subdomain-based: {scheme}://{name}-{project}-{domain}.{IngressAppsDomain}[:{IngressAppsPort}]. +// Otherwise falls back to the Knative host pattern: {scheme}://{name}-{project}-{domain}.{base_domain}. +// Returns nil if neither is configured. +func publicIngress(id *flyteapp.Identifier, cfg *appconfig.InternalAppConfig) *flyteapp.Ingress { + if cfg.IngressEnabled && cfg.IngressAppsDomain != "" { + scheme := cfg.Scheme + if scheme == "" { + scheme = "http" + } + host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", + id.GetName(), id.GetProject(), id.GetDomain(), cfg.IngressAppsDomain)) + url := scheme + "://" + host + if cfg.IngressAppsPort != 0 { + url += fmt.Sprintf(":%d", cfg.IngressAppsPort) + } + return &flyteapp.Ingress{PublicUrl: url} + } + if cfg.BaseDomain == "" { return nil } + scheme := cfg.Scheme + if scheme == "" { + scheme = "https" + } host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", - id.GetName(), id.GetProject(), id.GetDomain(), baseDomain)) + id.GetName(), id.GetProject(), id.GetDomain(), cfg.BaseDomain)) return &flyteapp.Ingress{ - PublicUrl: "https://" + host, + PublicUrl: scheme + "://" + host, } } @@ -94,9 +112,6 @@ func (s *InternalAppService) Get( status, err := s.k8s.GetStatus(ctx, appID.AppId) if err != nil { - if k8serrors.IsNotFound(err) { - return nil, connect.NewError(connect.CodeNotFound, err) - } return nil, connect.NewError(connect.CodeInternal, err) } @@ -186,7 +201,7 @@ func (s *InternalAppService) List( token = r.GetToken() } - apps, nextToken, err := s.k8s.List(ctx, project, domain, "", limit, token) + apps, nextToken, err := s.k8s.List(ctx, project, domain, limit, token) if err != nil { return nil, connect.NewError(connect.CodeInternal, err) } @@ -215,18 +230,15 @@ func (s *InternalAppService) Watch( return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("org and cluster_id watch targets are not supported by the data plane")) } - // Start watch before listing so no events are lost between the two calls. - ch, err := s.k8s.Watch(ctx, project, domain, appName) - if err != nil { - return connect.NewError(connect.CodeInternal, err) - } - - // Send initial snapshot so the client has current state before streaming changes. - snapshot, _, err := s.k8s.List(ctx, project, domain, appName, 0, "") + // Send initial snapshot so the client has current state before watching for changes. + snapshot, _, err := s.k8s.List(ctx, project, domain, 0, "") if err != nil { return connect.NewError(connect.CodeInternal, err) } for _, app := range snapshot { + if appName != "" && app.GetMetadata().GetId().GetName() != appName { + continue + } if err := stream.Send(&flyteapp.WatchResponse{ Event: &flyteapp.WatchResponse_CreateEvent{ CreateEvent: &flyteapp.CreateEvent{App: app}, @@ -236,6 +248,11 @@ func (s *InternalAppService) Watch( } } + ch, err := s.k8s.Watch(ctx, project, domain, appName) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + for { select { case <-ctx.Done(): diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index c47fa16c9b8..2aa07125aa7 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "net/http" "net/http/httptest" "testing" @@ -12,8 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - kerrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" appconfig "github.com/flyteorg/flyte/v2/app/internal/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" @@ -47,8 +44,8 @@ func (m *mockAppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identi return args.Get(0).(*flyteapp.Status), args.Error(1) } -func (m *mockAppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) { - args := m.Called(ctx, project, domain, appName, limit, token) +func (m *mockAppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) { + args := m.Called(ctx, project, domain, limit, token) if args.Get(0) == nil { return nil, "", args.Error(2) } @@ -80,7 +77,9 @@ func (m *mockAppK8sClient) Watch(ctx context.Context, project, domain, appName s func testCfg() *appconfig.InternalAppConfig { return &appconfig.InternalAppConfig{ Enabled: true, - BaseDomain: "apps.example.com", + IngressEnabled: true, + IngressAppsDomain: "example.com", + Scheme: "https", DefaultRequestTimeout: 5 * time.Minute, MaxRequestTimeout: time.Hour, } @@ -131,7 +130,7 @@ func TestCreate_Success(t *testing.T) { resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_PENDING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) - assert.Equal(t, "https://myapp-proj-dev.apps.example.com", resp.Msg.App.Status.Ingress.PublicUrl) + assert.Equal(t, "https://myapp-proj-dev.example.com", resp.Msg.App.Status.Ingress.PublicUrl) k8s.AssertExpectations(t) } @@ -168,9 +167,26 @@ func TestCreate_MissingPayload(t *testing.T) { assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) } +func TestCreate_IngressWithPort(t *testing.T) { + k8s := &mockAppK8sClient{} + cfg := testCfg() + cfg.IngressAppsPort = 30081 + svc := NewInternalAppService(k8s, cfg) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + + resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, "https://myapp-proj-dev.example.com:30081", resp.Msg.App.Status.Ingress.PublicUrl) + k8s.AssertExpectations(t) +} + func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { k8s := &mockAppK8sClient{} cfg := testCfg() + cfg.IngressEnabled = false + cfg.IngressAppsDomain = "" cfg.BaseDomain = "" svc := NewInternalAppService(k8s, cfg) @@ -200,22 +216,6 @@ func TestGet_Success(t *testing.T) { k8s.AssertExpectations(t) } -func TestGet_NotFound(t *testing.T) { - k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) - - appID := testAppID() - notFoundErr := fmt.Errorf("KService myapp not found: %w", kerrors.NewNotFound(schema.GroupResource{}, "myapp")) - k8s.On("GetStatus", mock.Anything, appID).Return(nil, notFoundErr) - - _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ - Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, - })) - require.Error(t, err) - assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) - k8s.AssertExpectations(t) -} - func TestGet_MissingAppID(t *testing.T) { svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) @@ -294,7 +294,7 @@ func TestList_ByProject(t *testing.T) { svc := NewInternalAppService(k8s, testCfg()) apps := []*flyteapp.App{testApp()} - k8s.On("List", mock.Anything, "proj", "dev", "", uint32(10), "tok").Return(apps, "nexttok", nil) + k8s.On("List", mock.Anything, "proj", "dev", uint32(10), "tok").Return(apps, "nexttok", nil) resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{ FilterBy: &flyteapp.ListRequest_Project{ @@ -312,7 +312,7 @@ func TestList_NoFilter(t *testing.T) { k8s := &mockAppK8sClient{} svc := NewInternalAppService(k8s, testCfg()) - k8s.On("List", mock.Anything, "", "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) + k8s.On("List", mock.Anything, "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{})) require.NoError(t, err) @@ -329,8 +329,8 @@ func TestWatch_InitialSnapshot(t *testing.T) { ch := make(chan *flyteapp.WatchResponse) close(ch) + k8s.On("List", mock.Anything, "proj", "dev", uint32(0), "").Return(apps, "", nil) k8s.On("Watch", mock.Anything, "proj", "dev", "").Return((<-chan *flyteapp.WatchResponse)(ch), nil) - k8s.On("List", mock.Anything, "proj", "dev", "", uint32(0), "").Return(apps, "", nil) client := newTestClient(t, k8s) stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ @@ -358,8 +358,8 @@ func TestWatch_AppIDTarget(t *testing.T) { ch := make(chan *flyteapp.WatchResponse) close(ch) + k8s.On("List", mock.Anything, "proj", "dev", uint32(0), "").Return([]*flyteapp.App{}, "", nil) k8s.On("Watch", mock.Anything, "proj", "dev", "myapp").Return((<-chan *flyteapp.WatchResponse)(ch), nil) - k8s.On("List", mock.Anything, "proj", "dev", "myapp", uint32(0), "").Return([]*flyteapp.App{}, "", nil) client := newTestClient(t, k8s) stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ diff --git a/app/internal/setup.go b/app/internal/setup.go index 7c7ec7a0c53..91023cb4d7e 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -12,6 +12,7 @@ import ( appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" "github.com/flyteorg/flyte/v2/app/internal/service" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" + knativeapp "github.com/flyteorg/flyte/v2/flytestdlib/app" ) // Setup registers the InternalAppService handler on the SetupContext mux. @@ -23,7 +24,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter return nil } - if err := stdlibapp.InitAppScheme(); err != nil { + if err := knativeapp.InitAppScheme(); err != nil { return fmt.Errorf("internalapp: failed to register Knative scheme: %w", err) } diff --git a/app/service/app_service.go b/app/service/app_service.go index 55b70738117..e9d9e140747 100644 --- a/app/service/app_service.go +++ b/app/service/app_service.go @@ -3,10 +3,10 @@ package service import ( "context" "fmt" + "sync" "time" "connectrpc.com/connect" - "github.com/hashicorp/golang-lru/v2/expirable" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" @@ -18,20 +18,15 @@ import ( type AppService struct { appconnect.UnimplementedAppServiceHandler internalClient appconnect.AppServiceClient - // cache is nil when cacheTTL=0 (caching disabled). - cache *expirable.LRU[string, *flyteapp.App] + cache *appCache } // NewAppService creates a new AppService. // cacheTTL=0 disables caching (every Get calls InternalAppService). func NewAppService(internalClient appconnect.AppServiceClient, cacheTTL time.Duration) *AppService { - var cache *expirable.LRU[string, *flyteapp.App] - if cacheTTL > 0 { - cache = expirable.NewLRU[string, *flyteapp.App](0, nil, cacheTTL) - } return &AppService{ internalClient: internalClient, - cache: cache, + cache: newAppCache(cacheTTL), } } @@ -47,9 +42,7 @@ func (s *AppService) Create( if err != nil { return nil, err } - if s.cache != nil { - s.cache.Remove(cacheKey(req.Msg.GetApp().GetMetadata().GetId())) - } + s.cache.invalidate(cacheKey(req.Msg.GetApp().GetMetadata().GetId())) return resp, nil } @@ -59,8 +52,8 @@ func (s *AppService) Get( req *connect.Request[flyteapp.GetRequest], ) (*connect.Response[flyteapp.GetResponse], error) { appID, ok := req.Msg.GetIdentifier().(*flyteapp.GetRequest_AppId) - if ok && appID.AppId != nil && s.cache != nil { - if app, hit := s.cache.Get(cacheKey(appID.AppId)); hit { + if ok && appID.AppId != nil { + if app, hit := s.cache.get(cacheKey(appID.AppId)); hit { return connect.NewResponse(&flyteapp.GetResponse{App: app}), nil } } @@ -69,8 +62,8 @@ func (s *AppService) Get( if err != nil { return nil, err } - if ok && appID.AppId != nil && s.cache != nil { - s.cache.Add(cacheKey(appID.AppId), resp.Msg.GetApp()) + if ok && appID.AppId != nil { + s.cache.set(cacheKey(appID.AppId), resp.Msg.GetApp()) } return resp, nil } @@ -84,9 +77,7 @@ func (s *AppService) Update( if err != nil { return nil, err } - if s.cache != nil { - s.cache.Remove(cacheKey(req.Msg.GetApp().GetMetadata().GetId())) - } + s.cache.invalidate(cacheKey(req.Msg.GetApp().GetMetadata().GetId())) return resp, nil } @@ -99,9 +90,7 @@ func (s *AppService) Delete( if err != nil { return nil, err } - if s.cache != nil { - s.cache.Remove(cacheKey(req.Msg.GetAppId())) - } + s.cache.invalidate(cacheKey(req.Msg.GetAppId())) return resp, nil } @@ -132,6 +121,57 @@ func (s *AppService) Watch( return clientStream.Err() } +// --- Cache --- + +type cacheEntry struct { + app *flyteapp.App + expiresAt time.Time +} + +type appCache struct { + mu sync.RWMutex + items map[string]*cacheEntry + ttl time.Duration +} + +func newAppCache(ttl time.Duration) *appCache { + return &appCache{ + items: make(map[string]*cacheEntry), + ttl: ttl, + } +} + +// get returns the cached App for key, or (nil, false) if missing or expired. +func (c *appCache) get(key string) (*flyteapp.App, bool) { + if c.ttl == 0 { + return nil, false + } + c.mu.RLock() + entry, ok := c.items[key] + c.mu.RUnlock() + if !ok || time.Now().After(entry.expiresAt) { + return nil, false + } + return entry.app, true +} + +// set writes the App to the cache with the configured TTL. +func (c *appCache) set(key string, app *flyteapp.App) { + if c.ttl == 0 { + return + } + c.mu.Lock() + c.items[key] = &cacheEntry{app: app, expiresAt: time.Now().Add(c.ttl)} + c.mu.Unlock() +} + +// invalidate removes the cache entry for key. +func (c *appCache) invalidate(key string) { + c.mu.Lock() + delete(c.items, key) + c.mu.Unlock() +} + // cacheKey returns a stable string key for an app identifier. func cacheKey(id *flyteapp.Identifier) string { if id == nil { diff --git a/app/service/app_service_test.go b/app/service/app_service_test.go index 7b623b8ff8b..e985a89568e 100644 --- a/app/service/app_service_test.go +++ b/app/service/app_service_test.go @@ -130,7 +130,7 @@ func TestGet_CacheHit_SkipsInternal(t *testing.T) { // Pre-populate cache. appID := testAppID() - svc.cache.Add(cacheKey(appID), testApp()) + svc.cache.set(cacheKey(appID), testApp()) // Internal should NOT be called. resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ @@ -146,7 +146,7 @@ func TestGet_CacheExpired_CallsInternal(t *testing.T) { svc := NewAppService(internal, 1*time.Millisecond) appID := testAppID() - svc.cache.Add(cacheKey(appID), testApp()) + svc.cache.set(cacheKey(appID), testApp()) time.Sleep(5 * time.Millisecond) // let TTL expire app := testApp() @@ -169,7 +169,7 @@ func TestCreate_InvalidatesCache(t *testing.T) { app := testApp() // Pre-populate cache so we can confirm it's cleared. - svc.cache.Add(cacheKey(app.Metadata.Id), app) + svc.cache.set(cacheKey(app.Metadata.Id), app) internal.On("Create", mock.Anything, mock.Anything).Return( connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil, @@ -178,7 +178,7 @@ func TestCreate_InvalidatesCache(t *testing.T) { _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) - _, hit := svc.cache.Get(cacheKey(app.Metadata.Id)) + _, hit := svc.cache.get(cacheKey(app.Metadata.Id)) assert.False(t, hit, "cache should be invalidated after Create") internal.AssertExpectations(t) } @@ -188,7 +188,7 @@ func TestUpdate_InvalidatesCache(t *testing.T) { svc := NewAppService(internal, 30*time.Second) app := testApp() - svc.cache.Add(cacheKey(app.Metadata.Id), app) + svc.cache.set(cacheKey(app.Metadata.Id), app) internal.On("Update", mock.Anything, mock.Anything).Return( connect.NewResponse(&flyteapp.UpdateResponse{App: app}), nil, @@ -197,7 +197,7 @@ func TestUpdate_InvalidatesCache(t *testing.T) { _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) require.NoError(t, err) - _, hit := svc.cache.Get(cacheKey(app.Metadata.Id)) + _, hit := svc.cache.get(cacheKey(app.Metadata.Id)) assert.False(t, hit, "cache should be invalidated after Update") internal.AssertExpectations(t) } @@ -207,7 +207,7 @@ func TestDelete_InvalidatesCache(t *testing.T) { svc := NewAppService(internal, 30*time.Second) appID := testAppID() - svc.cache.Add(cacheKey(appID), testApp()) + svc.cache.set(cacheKey(appID), testApp()) internal.On("Delete", mock.Anything, mock.Anything).Return( connect.NewResponse(&flyteapp.DeleteResponse{}), nil, @@ -216,7 +216,7 @@ func TestDelete_InvalidatesCache(t *testing.T) { _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{AppId: appID})) require.NoError(t, err) - _, hit := svc.cache.Get(cacheKey(appID)) + _, hit := svc.cache.get(cacheKey(appID)) assert.False(t, hit, "cache should be invalidated after Delete") internal.AssertExpectations(t) } diff --git a/app/setup.go b/app/setup.go index 31fcdd8d6a8..38853d27606 100644 --- a/app/setup.go +++ b/app/setup.go @@ -8,10 +8,17 @@ import ( "github.com/flyteorg/flyte/v2/flytestdlib/logger" appconfig "github.com/flyteorg/flyte/v2/app/config" + appinternal "github.com/flyteorg/flyte/v2/app/internal" "github.com/flyteorg/flyte/v2/app/service" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" ) +// SetupInternal registers the data plane InternalAppService on the SetupContext mux. +// It must be called before Setup so the proxy can reach /internal/... on the same mux. +func SetupInternal(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.InternalAppConfig) error { + return appinternal.Setup(ctx, sc, cfg) +} + // Setup registers the control plane AppService handler on the SetupContext mux. // In unified mode (sc.BaseURL set), the proxy routes to InternalAppService on // the same mux via the /internal prefix — no network hop. In split mode, diff --git a/charts/flyte-devbox/templates/proxy/traefik-config.yaml b/charts/flyte-devbox/templates/proxy/traefik-config.yaml index 5657b622f81..854d8616d52 100644 --- a/charts/flyte-devbox/templates/proxy/traefik-config.yaml +++ b/charts/flyte-devbox/templates/proxy/traefik-config.yaml @@ -14,5 +14,18 @@ spec: transport: respondingTimeouts: readTimeout: 0 + apps: + port: 30081 + nodePort: 30081 + expose: + default: true + transport: + respondingTimeouts: + readTimeout: 0 websecure: expose: false + providers: + kubernetesCRD: + allowExternalNameServices: true + kubernetesIngress: + allowExternalNameServices: true diff --git a/charts/flyte-devbox/values.yaml b/charts/flyte-devbox/values.yaml index 85af9a39562..1cf06ec0129 100644 --- a/charts/flyte-devbox/values.yaml +++ b/charts/flyte-devbox/values.yaml @@ -66,6 +66,28 @@ flyte-binary: dbName: runs user: postgres password: postgres + manager: + internalApps: + enabled: true + baseDomain: "local.flyte.app" + scheme: "http" + ingressEnabled: true + ingressEntryPoint: "apps" + ingressAppsDomain: "localhost" + ingressAppsPort: 30081 + defaultEnvVars: + - name: FLYTE_AWS_ENDPOINT + value: 'http://rustfs.{{ .Release.Namespace }}:9000' + - name: FLYTE_AWS_ACCESS_KEY_ID + value: rustfs + - name: FLYTE_AWS_SECRET_ACCESS_KEY + value: rustfsstorage + - name: _U_EP_OVERRIDE + value: 'flyte-binary-http.{{ .Release.Namespace }}:8090' + - name: _U_INSECURE + value: "true" + - name: _U_USE_ACTIONS + value: "1" inlineConfigMap: '{{ include "flyte-devbox.configuration.inlineConfigMap" . }}' clusterResourceTemplates: inlineConfigMap: '{{ include "flyte-devbox.clusterResourceTemplates.inlineConfigMap" . }}' diff --git a/docker/devbox-bundled/Makefile b/docker/devbox-bundled/Makefile index 79762823e69..ddf29f4b270 100644 --- a/docker/devbox-bundled/Makefile +++ b/docker/devbox-bundled/Makefile @@ -74,6 +74,7 @@ build-gpu: build # 30001 - DB # 30002 - RustFS # 30080 - Flyte Proxy +# 30081 - Flyte Apps .PHONY: start start: FLYTE_DEVBOX_IMAGE := flyte-devbox:latest start: FLYTE_DEV := False @@ -93,6 +94,7 @@ start: --publish "30001:5432" \ --publish "30002:30002" \ --publish "30080:30080" \ + --publish "30081:30081" \ $(FLYTE_DEVBOX_IMAGE); \ fi @echo "Waiting for kubeconfig..." @@ -114,6 +116,18 @@ kubeconfig: sed -i -e "/server:/ s/: .*/: https:\/\/127.0.0.1:$(shell docker port flyte-devbox | grep ^6443 | awk '{print $$3}' | awk -F: '{print $$2}')/" .kube/kubeconfig echo "export KUBECONFIG=$(PWD)/.kube/kubeconfig" +.PHONY: setup-knative +setup-knative: + kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.21.2/serving-crds.yaml + kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.21.2/serving-core.yaml + kubectl apply -f https://github.com/knative-extensions/net-kourier/releases/download/knative-v1.21.0/kourier.yaml + kubectl patch configmap/config-network -n knative-serving --type merge \ + --patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}' + kubectl patch configmap/config-network -n knative-serving --type merge \ + --patch '{"data":{"domain-template":"{{.Name}}-{{.Namespace}}.{{.Domain}}"}}' + kubectl patch configmap/config-domain -n knative-serving --type merge \ + --patch '{"data":{"local.flyte.app":""}}' + .PHONY: stop stop: docker stop --time 5 flyte-devbox diff --git a/docker/devbox-bundled/manifests/complete.yaml b/docker/devbox-bundled/manifests/complete.yaml index f8f9c31b31a..262ad487739 100644 --- a/docker/devbox-bundled/manifests/complete.yaml +++ b/docker/devbox-bundled/manifests/complete.yaml @@ -536,6 +536,28 @@ data: auth_type: accesskey container: flyte-data 100-inline-config.yaml: | + manager: + internalApps: + baseDomain: local.flyte.app + defaultEnvVars: + - name: FLYTE_AWS_ENDPOINT + value: http://rustfs.flyte:9000 + - name: FLYTE_AWS_ACCESS_KEY_ID + value: rustfs + - name: FLYTE_AWS_SECRET_ACCESS_KEY + value: rustfsstorage + - name: _U_EP_OVERRIDE + value: flyte-binary-http.flyte:8090 + - name: _U_INSECURE + value: "true" + - name: _U_USE_ACTIONS + value: "1" + enabled: true + ingressAppsDomain: localhost + ingressAppsPort: 30081 + ingressEnabled: true + ingressEntryPoint: apps + scheme: http plugins: k8s: default-env-vars: @@ -913,7 +935,7 @@ spec: template: metadata: annotations: - checksum/configuration: 646d21c3d7efb87ed1de45515bf092b575b364810dd9264a2d9b2e7b26ddbbcb + checksum/configuration: 709f0513c6d5a65b47f3e9d42696c92cecfdc3073acac33c7ad32340a645bef4 checksum/configuration-secret: e70194084619f4a1d4017093aac6367047167107fd0222513a32a61734629cac labels: app.kubernetes.io/component: flyte-binary @@ -1144,8 +1166,21 @@ spec: transport: respondingTimeouts: readTimeout: 0 + apps: + port: 30081 + nodePort: 30081 + expose: + default: true + transport: + respondingTimeouts: + readTimeout: 0 websecure: expose: false + providers: + kubernetesCRD: + allowExternalNameServices: true + kubernetesIngress: + allowExternalNameServices: true --- apiVersion: networking.k8s.io/v1 kind: Ingress diff --git a/docker/devbox-bundled/manifests/dev.yaml b/docker/devbox-bundled/manifests/dev.yaml index f6d4bda06ea..41b78ad5640 100644 --- a/docker/devbox-bundled/manifests/dev.yaml +++ b/docker/devbox-bundled/manifests/dev.yaml @@ -758,8 +758,21 @@ spec: transport: respondingTimeouts: readTimeout: 0 + apps: + port: 30081 + nodePort: 30081 + expose: + default: true + transport: + respondingTimeouts: + readTimeout: 0 websecure: expose: false + providers: + kubernetesCRD: + allowExternalNameServices: true + kubernetesIngress: + allowExternalNameServices: true --- apiVersion: networking.k8s.io/v1 kind: Ingress diff --git a/flytestdlib/app/app.go b/flytestdlib/app/app.go index df6b7b75f05..0dcdf88e25f 100644 --- a/flytestdlib/app/app.go +++ b/flytestdlib/app/app.go @@ -1,12 +1,15 @@ package app import ( + "compress/gzip" "context" "errors" "fmt" + "io" "net/http" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -111,6 +114,7 @@ func (a *App) serve(ctx context.Context) error { if sc.Middleware != nil { handler = sc.Middleware(handler) } + handler = requestGzipDecompressMiddleware(handler) addr := fmt.Sprintf("%s:%d", sc.Host, sc.Port) server = &http.Server{ @@ -193,6 +197,35 @@ func (a *App) serve(ctx context.Context) error { return shutdownErr } +// requestGzipDecompressMiddleware pre-decompresses request bodies that carry +// Content-Encoding: gzip before they reach the connect-rpc handler. +// +// Some HTTP clients (e.g. pyqwest used by the Python connectrpc SDK) compress +// the request body at the application level and set Content-Encoding: gzip, +// but also use chunked transfer encoding for larger payloads. Go's h2c +// framing delivers the body in chunks, so by the time connect-go calls +// gzip.NewReader on the raw body stream the first bytes it receives may be a +// chunk-size line rather than the gzip magic bytes, causing "gzip: invalid +// header". Pre-reading and fully decompressing the body here, before h2c +// framing is involved, sidesteps the problem entirely. +func requestGzipDecompressMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") { + gr, err := gzip.NewReader(r.Body) + if err == nil { + r2 := r.Clone(r.Context()) + r2.Body = struct { + io.Reader + io.Closer + }{gr, r.Body} + r2.Header.Del("Content-Encoding") + r = r2 + } + } + next.ServeHTTP(w, r) + }) +} + func initConfig(cmd *cobra.Command) error { configAccessor = viper.NewAccessor(config.Options{ SearchPaths: []string{cfgFile, ".", "/etc/flyte/config"}, diff --git a/go.mod b/go.mod index acc2fdce874..d08b80b45f6 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 github.com/hashicorp/golang-lru v1.0.2 - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.7.6 diff --git a/go.sum b/go.sum index d2ef7ade542..70757065def 100644 --- a/go.sum +++ b/go.sum @@ -366,8 +366,6 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/manager/cmd/main.go b/manager/cmd/main.go index d4c7bdbba99..75f887e8e5e 100644 --- a/manager/cmd/main.go +++ b/manager/cmd/main.go @@ -6,8 +6,10 @@ import ( "net/http" "os" + flyteapp "github.com/flyteorg/flyte/v2/app" "github.com/flyteorg/flyte/v2/actions" "github.com/flyteorg/flyte/v2/flytestdlib/app" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" "github.com/flyteorg/flyte/v2/cache_service" "github.com/flyteorg/flyte/v2/dataproxy" "github.com/flyteorg/flyte/v2/events" @@ -50,6 +52,11 @@ func setup(ctx context.Context, sc *app.SetupContext) error { } sc.DB = db + // Register Knative types into the shared scheme so the K8s client can manage KServices. + if err := servingv1.AddToScheme(executor.Scheme()); err != nil { + return fmt.Errorf("failed to register Knative scheme: %w", err) + } + // Initialize Kubernetes client k8sClient, k8sConfig, err := app.InitKubernetesClient(ctx, app.K8sConfig{ KubeConfig: cfg.Kubernetes.KubeConfig, @@ -82,6 +89,13 @@ func setup(ctx context.Context, sc *app.SetupContext) error { if err := runs.Setup(ctx, sc); err != nil { return err } + // InternalAppService must be mounted before AppService so the proxy can reach it. + if err := flyteapp.SetupInternal(ctx, sc, &cfg.InternalApps); err != nil { + return err + } + if err := flyteapp.Setup(ctx, sc, &cfg.Apps); err != nil { + return err + } if err := dataproxy.Setup(ctx, sc); err != nil { return err } diff --git a/manager/config.yaml b/manager/config.yaml index 9cd43c3140a..21488787237 100644 --- a/manager/config.yaml +++ b/manager/config.yaml @@ -13,6 +13,33 @@ manager: namespace: "flyte" kubeconfig: "~/.kube/config" + apps: + cacheTtl: 30s + + internalApps: + enabled: true + baseDomain: "local.flyte.app" + scheme: "http" + defaultRequestTimeout: 300s + maxRequestTimeout: 3600s + ingressEnabled: true + ingressEntryPoint: "apps" + ingressAppsDomain: "localhost" + ingressAppsPort: 30081 + defaultEnvVars: + - name: FLYTE_AWS_ENDPOINT + value: "http://rustfs.flyte.svc.cluster.local:9000" + - name: FLYTE_AWS_ACCESS_KEY_ID + value: "rustfs" + - name: FLYTE_AWS_SECRET_ACCESS_KEY + value: "rustfsstorage" + - name: _U_EP_OVERRIDE + value: "flyte-binary-http.flyte.svc.cluster.local:8090" + - name: _U_INSECURE + value: "true" + - name: _U_USE_ACTIONS + value: "1" + # Webhook configuration # localCert: true writes the generated TLS cert to disk so the webhook server can read it. @@ -64,7 +91,7 @@ plugins: - FLYTE_AWS_ENDPOINT: "http://rustfs.flyte.svc.cluster.local:9000" - FLYTE_AWS_ACCESS_KEY_ID: "rustfs" - FLYTE_AWS_SECRET_ACCESS_KEY: "rustfsstorage" - - _U_EP_OVERRIDE: "flyte-devbox-local.flyte.svc.cluster.local:8090" + - _U_EP_OVERRIDE: "flyte-binary-http.flyte.svc.cluster.local:8090" - _U_INSECURE: "true" - _U_USE_ACTIONS: "1" diff --git a/manager/config/config.go b/manager/config/config.go index 9beded5ab8b..1a0c07addb7 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -1,6 +1,11 @@ package config -import "github.com/flyteorg/flyte/v2/flytestdlib/config" +import ( + "time" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + "github.com/flyteorg/flyte/v2/flytestdlib/config" +) const configSectionKey = "manager" @@ -16,6 +21,12 @@ type Config struct { // Kubernetes configuration Kubernetes KubernetesConfig `json:"kubernetes"` + + // Apps is the control plane AppService configuration. + Apps appconfig.AppConfig `json:"apps"` + + // InternalApps is the data plane InternalAppService configuration. + InternalApps appconfig.InternalAppConfig `json:"internalApps"` } // ServerConfig holds HTTP server configuration @@ -53,6 +64,14 @@ var defaultConfig = &Config{ Burst: 2000, Timeout: "30s", }, + Apps: appconfig.AppConfig{ + CacheTTL: 30 * time.Second, + }, + InternalApps: appconfig.InternalAppConfig{ + Enabled: false, + DefaultRequestTimeout: 300 * time.Second, + MaxRequestTimeout: 3600 * time.Second, + }, } var configSection = config.MustRegisterSection(configSectionKey, defaultConfig)