diff --git a/app/config/config.go b/app/config/config.go index 99feec4549..cc2085c4ee 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -7,6 +7,10 @@ type AppConfig struct { // Enabled controls whether the app deployment controller is started. Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + // BaseDomain is the base domain used to generate public URLs for apps. + // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". + BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` + // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index c12b9b1278..ce24654367 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -52,8 +52,11 @@ type AppK8sClientInterface interface { // Returns a not-found error (checkable with k8serrors.IsNotFound) if the KService does not exist. GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) - // List returns all apps (spec + live status) for the given project/domain scope. - List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) + // List returns apps for the given project/domain scope with optional pagination. + // If appName is non-empty, only the app with that name is returned. + // limit=0 means no limit. token is the K8s continue token from a previous call. + // Returns the apps, the continue token for the next page (empty if last page), and any error. + List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) // Delete removes the KService CRD entirely. The app must be re-created from scratch. // Use Stop to scale to zero while preserving the KService. @@ -66,8 +69,9 @@ type AppK8sClientInterface interface { DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error // Watch returns a channel of WatchResponse events for KServices matching the - // given project/domain scope. The channel is closed when ctx is cancelled. - Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) + // given project/domain scope. If appName is non-empty, only events for that + // specific app are returned. The channel is closed when ctx is cancelled. + Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) } // AppK8sClient implements AppK8sClientInterface using controller-runtime. @@ -169,13 +173,20 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e } // Watch returns a channel of WatchResponse events for KServices in the given -// project/domain scope. The channel is closed when ctx is cancelled or the +// project/domain scope. If appName is non-empty, only events for that specific +// app are returned. The channel is closed when ctx is cancelled or the // underlying watch terminates. -func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) { +func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) { ns := appNamespace(project, domain) + + labels := map[string]string{labelAppManaged: "true"} + if appName != "" { + labels[labelAppName] = strings.ToLower(appName) + } + watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, client.InNamespace(ns), - client.MatchingLabels{labelAppManaged: "true"}, + client.MatchingLabels(labels), ) if err != nil { return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err) @@ -258,16 +269,28 @@ func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier return c.kserviceToStatus(ctx, ksvc), nil } -// List returns all apps for the given project/domain by listing KServices in the -// project/domain namespace. -func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) { +// List returns apps for the given project/domain scope with optional pagination. +func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) { ns := appNamespace(project, domain) - list := &servingv1.ServiceList{} - if err := c.k8sClient.List(ctx, list, + + matchLabels := client.MatchingLabels{labelAppManaged: "true"} + if appName != "" { + matchLabels[labelAppName] = strings.ToLower(appName) + } + listOpts := []client.ListOption{ client.InNamespace(ns), - client.MatchingLabels{labelAppManaged: "true"}, - ); err != nil { - return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) + matchLabels, + } + if limit > 0 { + listOpts = append(listOpts, client.Limit(int64(limit))) + } + if token != "" { + listOpts = append(listOpts, client.Continue(token)) + } + + list := &servingv1.ServiceList{} + if err := c.k8sClient.List(ctx, list, listOpts...); err != nil { + return nil, "", fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err) } apps := make([]*flyteapp.App, 0, len(list.Items)) @@ -279,7 +302,7 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*fly } apps = append(apps, a) } - return apps, nil + return apps, list.Continue, nil } // --- Helpers --- @@ -302,7 +325,7 @@ func kserviceName(id *flyteapp.Identifier) string { // specSHA computes a SHA256 digest of the serialized App Spec proto. func specSHA(spec *flyteapp.Spec) (string, error) { - b, err := proto.MarshalOptions{Deterministic: true}.Marshal(spec) + b, err := proto.Marshal(spec) if err != nil { return "", fmt.Errorf("failed to marshal spec: %w", err) } @@ -358,10 +381,6 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err Template: servingv1.RevisionTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: templateAnnotations, - Labels: map[string]string{ - labelAppManaged: "true", - labelAppName: appID.GetName(), - }, }, Spec: servingv1.RevisionSpec{ PodSpec: podSpec, @@ -467,8 +486,8 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE case ksvc.IsFailed(): phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED - if condition := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); condition != nil { - message = condition.Message + if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { + message = c.Message } case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 706e21c0d1..7fa30910c2 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -283,13 +283,51 @@ func TestList(t *testing.T) { }, } - apps, err := c.List(context.Background(), "proj", "dev") + apps, nextToken, err := c.List(context.Background(), "proj", "dev", "", 0, "") require.NoError(t, err) + assert.Empty(t, nextToken) require.Len(t, apps, 1) assert.Equal(t, "proj", apps[0].Metadata.Id.Project) assert.Equal(t, "app1", apps[0].Metadata.Id.Name) } +func TestList_ByAppName(t *testing.T) { + s := testScheme(t) + ksvc1 := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "proj-dev", + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app1", + }, + Annotations: map[string]string{annotationAppID: "proj/dev/app1"}, + }, + } + ksvc2 := &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app2", + Namespace: "proj-dev", + Labels: map[string]string{ + labelAppManaged: "true", + labelProject: "proj", + labelDomain: "dev", + labelAppName: "app2", + }, + Annotations: map[string]string{annotationAppID: "proj/dev/app2"}, + }, + } + fc := fake.NewClientBuilder().WithScheme(s).WithObjects(ksvc1, ksvc2).Build() + c := &AppK8sClient{k8sClient: fc, cfg: &config.AppConfig{}} + + apps, _, err := c.List(context.Background(), "proj", "dev", "app1", 0, "") + require.NoError(t, err) + require.Len(t, apps, 1) + assert.Equal(t, "app1", apps[0].Metadata.Id.Name) +} + func TestGetReplicas(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go new file mode 100644 index 0000000000..3d75198a40 --- /dev/null +++ b/app/internal/service/internal_app_service.go @@ -0,0 +1,252 @@ +package service + +import ( + "context" + "fmt" + "strings" + + "connectrpc.com/connect" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" +) + +// InternalAppService is the data plane implementation of the AppService. +// It has direct K8s access via AppK8sClientInterface and no database dependency — +// all app state lives in KService CRDs. +type InternalAppService struct { + appconnect.UnimplementedAppServiceHandler + k8s appk8s.AppK8sClientInterface + cfg *appconfig.AppConfig +} + +// NewInternalAppService creates a new InternalAppService. +func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.AppConfig) *InternalAppService { + return &InternalAppService{k8s: k8s, cfg: cfg} +} + +// Ensure InternalAppService satisfies the generated handler interface. +var _ appconnect.AppServiceHandler = (*InternalAppService)(nil) + +// Create deploys a new app as a KService CRD. +func (s *InternalAppService) Create( + ctx context.Context, + req *connect.Request[flyteapp.CreateRequest], +) (*connect.Response[flyteapp.CreateResponse], error) { + app := req.Msg.GetApp() + if app.GetMetadata().GetId() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app identifier is required")) + } + if app.GetSpec() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app spec is required")) + } + if app.GetSpec().GetAppPayload() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app spec must include a container or pod payload")) + } + + if err := s.k8s.Deploy(ctx, app); err != nil { + logger.Errorf(ctx, "Failed to deploy app %s: %v", app.GetMetadata().GetId().GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + app.Status = &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + { + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, + LastTransitionTime: timestamppb.Now(), + }, + }, + Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg.BaseDomain), + } + + return connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil +} + +// publicIngress builds the deterministic public URL for an app. +// Pattern: "https://{name}-{project}-{domain}.{base_domain}" +// Returns nil if BaseDomain is not configured. +func publicIngress(id *flyteapp.Identifier, baseDomain string) *flyteapp.Ingress { + if baseDomain == "" { + return nil + } + host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", + id.GetName(), id.GetProject(), id.GetDomain(), baseDomain)) + return &flyteapp.Ingress{ + PublicUrl: "https://" + host, + } +} + +// Get retrieves an app and its live status from the KService CRD. +// Note: App.Spec is not populated — status and ingress URL are the authoritative fields. +func (s *InternalAppService) Get( + ctx context.Context, + req *connect.Request[flyteapp.GetRequest], +) (*connect.Response[flyteapp.GetResponse], error) { + appID, ok := req.Msg.GetIdentifier().(*flyteapp.GetRequest_AppId) + if !ok || appID.AppId == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app_id is required")) + } + + status, err := s.k8s.GetStatus(ctx, appID.AppId) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil, connect.NewError(connect.CodeNotFound, err) + } + return nil, connect.NewError(connect.CodeInternal, err) + } + + return connect.NewResponse(&flyteapp.GetResponse{ + App: &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: appID.AppId}, + Status: status, + }, + }), nil +} + +// Update modifies an app's spec or desired state. +// When Spec.DesiredState is STOPPED, the app is scaled to zero (KService kept). +// When Spec.DesiredState is STARTED or ACTIVE, the app is redeployed/resumed. +// Otherwise the spec update is applied and the app is redeployed. +func (s *InternalAppService) Update( + ctx context.Context, + req *connect.Request[flyteapp.UpdateRequest], +) (*connect.Response[flyteapp.UpdateResponse], error) { + app := req.Msg.GetApp() + if app.GetMetadata().GetId() == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app identifier is required")) + } + + appID := app.GetMetadata().GetId() + + switch app.GetSpec().GetDesiredState() { + case flyteapp.Spec_DESIRED_STATE_STOPPED: + if err := s.k8s.Stop(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to stop app %s: %v", appID.GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + default: + // UNSPECIFIED, STARTED, ACTIVE — deploy/redeploy the spec. + if err := s.k8s.Deploy(ctx, app); err != nil { + logger.Errorf(ctx, "Failed to update app %s: %v", appID.GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + } + + status, err := s.k8s.GetStatus(ctx, appID) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + app.Status = status + + return connect.NewResponse(&flyteapp.UpdateResponse{App: app}), nil +} + +// Delete removes the KService CRD for the given app entirely. +func (s *InternalAppService) Delete( + ctx context.Context, + req *connect.Request[flyteapp.DeleteRequest], +) (*connect.Response[flyteapp.DeleteResponse], error) { + appID := req.Msg.GetAppId() + if appID == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app_id is required")) + } + + if err := s.k8s.Delete(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to delete app %s: %v", appID.GetName(), err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + return connect.NewResponse(&flyteapp.DeleteResponse{}), nil +} + +// List returns apps for the requested scope with pagination. +func (s *InternalAppService) List( + ctx context.Context, + req *connect.Request[flyteapp.ListRequest], +) (*connect.Response[flyteapp.ListResponse], error) { + var project, domain string + + switch f := req.Msg.GetFilterBy().(type) { + case *flyteapp.ListRequest_Project: + project = f.Project.GetName() + domain = f.Project.GetDomain() + case *flyteapp.ListRequest_Org, *flyteapp.ListRequest_ClusterId: + return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("org and cluster_id filters are not supported by the data plane")) + } + + var limit uint32 + var token string + if r := req.Msg.GetRequest(); r != nil { + limit = r.GetLimit() + token = r.GetToken() + } + + apps, nextToken, err := s.k8s.List(ctx, project, domain, "", limit, token) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + return connect.NewResponse(&flyteapp.ListResponse{Apps: apps, Token: nextToken}), nil +} + +// Watch streams live KService events to the client. +// It first sends the current state as CreateEvents (initial snapshot), then streams changes. +func (s *InternalAppService) Watch( + ctx context.Context, + req *connect.Request[flyteapp.WatchRequest], + stream *connect.ServerStream[flyteapp.WatchResponse], +) error { + var project, domain, appName string + + switch t := req.Msg.GetTarget().(type) { + case *flyteapp.WatchRequest_Project: + project = t.Project.GetName() + domain = t.Project.GetDomain() + case *flyteapp.WatchRequest_AppId: + project = t.AppId.GetProject() + domain = t.AppId.GetDomain() + appName = t.AppId.GetName() + case *flyteapp.WatchRequest_Org, *flyteapp.WatchRequest_ClusterId: + return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("org and cluster_id watch targets are not supported by the data plane")) + } + + // Start watch before listing so no events are lost between the two calls. + ch, err := s.k8s.Watch(ctx, project, domain, appName) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + + // Send initial snapshot so the client has current state before streaming changes. + snapshot, _, err := s.k8s.List(ctx, project, domain, appName, 0, "") + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + for _, app := range snapshot { + if err := stream.Send(&flyteapp.WatchResponse{ + Event: &flyteapp.WatchResponse_CreateEvent{ + CreateEvent: &flyteapp.CreateEvent{App: app}, + }, + }); err != nil { + return err + } + } + + for { + select { + case <-ctx.Done(): + return nil + case event, ok := <-ch: + if !ok { + return nil + } + if err := stream.Send(event); err != nil { + return err + } + } + } +} diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go new file mode 100644 index 0000000000..08f7159e8e --- /dev/null +++ b/app/internal/service/internal_app_service_test.go @@ -0,0 +1,373 @@ +package service + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" + flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common" +) + +// mockAppK8sClient is a testify mock for AppK8sClientInterface. +type mockAppK8sClient struct { + mock.Mock +} + +func (m *mockAppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error { + return m.Called(ctx, app).Error(0) +} + +func (m *mockAppK8sClient) Stop(ctx context.Context, appID *flyteapp.Identifier) error { + return m.Called(ctx, appID).Error(0) +} + +func (m *mockAppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) error { + return m.Called(ctx, appID).Error(0) +} + +func (m *mockAppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error) { + args := m.Called(ctx, appID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*flyteapp.Status), args.Error(1) +} + +func (m *mockAppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) { + args := m.Called(ctx, project, domain, appName, limit, token) + if args.Get(0) == nil { + return nil, "", args.Error(2) + } + return args.Get(0).([]*flyteapp.App), args.String(1), args.Error(2) +} + +func (m *mockAppK8sClient) GetReplicas(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Replica, error) { + args := m.Called(ctx, appID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]*flyteapp.Replica), args.Error(1) +} + +func (m *mockAppK8sClient) DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error { + return m.Called(ctx, replicaID).Error(0) +} + +func (m *mockAppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) { + args := m.Called(ctx, project, domain, appName) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(<-chan *flyteapp.WatchResponse), args.Error(1) +} + +// --- helpers --- + +func testCfg() *appconfig.AppConfig { + return &appconfig.AppConfig{ + Enabled: true, + BaseDomain: "apps.example.com", + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + } +} + +func testAppID() *flyteapp.Identifier { + return &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} +} + +func testApp() *flyteapp.App { + return &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: testAppID()}, + Spec: &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Container{ + Container: &flytecoreapp.Container{Image: "nginx:latest"}, + }, + }, + } +} + +func testStatus(phase flyteapp.Status_DeploymentStatus) *flyteapp.Status { + return &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + {DeploymentStatus: phase}, + }, + } +} + +func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceClient { + svc := NewInternalAppService(k8s, testCfg()) + path, handler := appconnect.NewAppServiceHandler(svc) + mux := http.NewServeMux() + mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) + server := httptest.NewServer(mux) + t.Cleanup(server.Close) + return appconnect.NewAppServiceClient(http.DefaultClient, server.URL+"/internal") +} + +// --- Create --- + +func TestCreate_Success(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + + resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_PENDING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + assert.Equal(t, "https://myapp-proj-dev.apps.example.com", resp.Msg.App.Status.Ingress.PublicUrl) + k8s.AssertExpectations(t) +} + +func TestCreate_MissingID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ + App: &flyteapp.App{Spec: testApp().Spec}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +func TestCreate_MissingSpec(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ + App: &flyteapp.App{Metadata: &flyteapp.Meta{Id: testAppID()}}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +func TestCreate_MissingPayload(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ + App: &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: testAppID()}, + Spec: &flyteapp.Spec{}, + }, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { + k8s := &mockAppK8sClient{} + cfg := testCfg() + cfg.BaseDomain = "" + svc := NewInternalAppService(k8s, cfg) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + + resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) + require.NoError(t, err) + assert.Nil(t, resp.Msg.App.Status.Ingress) + k8s.AssertExpectations(t) +} + +// --- Get --- + +func TestGet_Success(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + appID := testAppID() + k8s.On("GetStatus", mock.Anything, appID).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE), nil) + + resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + k8s.AssertExpectations(t) +} + +func TestGet_NotFound(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + appID := testAppID() + notFoundErr := fmt.Errorf("KService myapp not found: %w", kerrors.NewNotFound(schema.GroupResource{}, "myapp")) + k8s.On("GetStatus", mock.Anything, appID).Return(nil, notFoundErr) + + _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) + k8s.AssertExpectations(t) +} + +func TestGet_MissingAppID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{})) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +// --- Update --- + +func TestUpdate_Deploy(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + app := testApp() + k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("GetStatus", mock.Anything, app.Metadata.Id).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING), nil) + + resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + k8s.AssertExpectations(t) +} + +func TestUpdate_Stop(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + app := testApp() + app.Spec.DesiredState = flyteapp.Spec_DESIRED_STATE_STOPPED + k8s.On("Stop", mock.Anything, app.Metadata.Id).Return(nil) + k8s.On("GetStatus", mock.Anything, app.Metadata.Id).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED), nil) + + resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + k8s.AssertExpectations(t) +} + +func TestUpdate_MissingID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{ + App: &flyteapp.App{}, + })) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +// --- Delete --- + +func TestDelete_Success(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + appID := testAppID() + k8s.On("Delete", mock.Anything, appID).Return(nil) + + _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{AppId: appID})) + require.NoError(t, err) + k8s.AssertExpectations(t) +} + +func TestDelete_MissingID(t *testing.T) { + svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + + _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{})) + require.Error(t, err) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) +} + +// --- List --- + +func TestList_ByProject(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + apps := []*flyteapp.App{testApp()} + k8s.On("List", mock.Anything, "proj", "dev", "", uint32(10), "tok").Return(apps, "nexttok", nil) + + resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{ + FilterBy: &flyteapp.ListRequest_Project{ + Project: &common.ProjectIdentifier{Name: "proj", Domain: "dev"}, + }, + Request: &common.ListRequest{Limit: 10, Token: "tok"}, + })) + require.NoError(t, err) + assert.Len(t, resp.Msg.Apps, 1) + assert.Equal(t, "nexttok", resp.Msg.Token) + k8s.AssertExpectations(t) +} + +func TestList_NoFilter(t *testing.T) { + k8s := &mockAppK8sClient{} + svc := NewInternalAppService(k8s, testCfg()) + + k8s.On("List", mock.Anything, "", "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) + + resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{})) + require.NoError(t, err) + assert.Empty(t, resp.Msg.Apps) + k8s.AssertExpectations(t) +} + +// --- Watch --- + +func TestWatch_InitialSnapshot(t *testing.T) { + k8s := &mockAppK8sClient{} + + apps := []*flyteapp.App{testApp()} + ch := make(chan *flyteapp.WatchResponse) + close(ch) + + k8s.On("Watch", mock.Anything, "proj", "dev", "").Return((<-chan *flyteapp.WatchResponse)(ch), nil) + k8s.On("List", mock.Anything, "proj", "dev", "", uint32(0), "").Return(apps, "", nil) + + client := newTestClient(t, k8s) + stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ + Target: &flyteapp.WatchRequest_Project{ + Project: &common.ProjectIdentifier{Name: "proj", Domain: "dev"}, + }, + })) + require.NoError(t, err) + + // Expect one CreateEvent from the initial snapshot. + require.True(t, stream.Receive()) + resp := stream.Msg() + ce, ok := resp.Event.(*flyteapp.WatchResponse_CreateEvent) + require.True(t, ok) + assert.Equal(t, "myapp", ce.CreateEvent.App.Metadata.Id.Name) + + // Channel is closed — stream should end. + assert.False(t, stream.Receive()) + k8s.AssertExpectations(t) +} + +func TestWatch_AppIDTarget(t *testing.T) { + k8s := &mockAppK8sClient{} + + ch := make(chan *flyteapp.WatchResponse) + close(ch) + + k8s.On("Watch", mock.Anything, "proj", "dev", "myapp").Return((<-chan *flyteapp.WatchResponse)(ch), nil) + k8s.On("List", mock.Anything, "proj", "dev", "myapp", uint32(0), "").Return([]*flyteapp.App{}, "", nil) + + client := newTestClient(t, k8s) + stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{ + Target: &flyteapp.WatchRequest_AppId{AppId: testAppID()}, + })) + require.NoError(t, err) + + // No snapshot apps, channel closed — stream ends immediately. + assert.False(t, stream.Receive()) + k8s.AssertExpectations(t) +} diff --git a/app/internal/setup.go b/app/internal/setup.go new file mode 100644 index 0000000000..619bcb91d2 --- /dev/null +++ b/app/internal/setup.go @@ -0,0 +1,38 @@ +package internal + +import ( + "context" + "fmt" + "net/http" + + stdlibapp "github.com/flyteorg/flyte/v2/flytestdlib/app" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/app/internal/service" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" +) + +// Setup registers the InternalAppService handler on the SetupContext mux. +// It is mounted at /internal to avoid collision with the control plane +// AppService, which shares the same proto service definition. +func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.AppConfig) error { + if !cfg.Enabled { + logger.Infof(ctx, "InternalAppService disabled (apps.enabled=false), skipping setup") + return nil + } + + if err := stdlibapp.InitAppScheme(); err != nil { + return fmt.Errorf("internalapp: failed to register Knative scheme: %w", err) + } + + appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg) + internalAppSvc := service.NewInternalAppService(appK8sClient, cfg) + + path, handler := appconnect.NewAppServiceHandler(internalAppSvc) + sc.Mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) + logger.Infof(ctx, "Mounted InternalAppService at /internal%s", path) + + return nil +}