diff --git a/app/config/config.go b/app/config/config.go index cc2085c4ee..6532f30b5c 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -2,18 +2,21 @@ package config import "time" -// AppConfig holds configuration for the App deployment controller. +// AppConfig holds configuration for the control plane AppService. type AppConfig struct { - // Enabled controls whether the app deployment controller is started. - Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + // InternalAppServiceURL is the base URL of the InternalAppService (data plane). + // In unified mode this is overridden by the shared mux BaseURL. + InternalAppServiceURL string `json:"internalAppServiceUrl" pflag:",URL of the internal app service"` - // BaseDomain is the base domain used to generate public URLs for apps. - // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". - BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` - - // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. - DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` + // CacheTTL is the TTL for the in-memory app status cache. + // Defaults to 30s. Set to 0 to disable caching. + CacheTTL time.Duration `json:"cacheTtl" pflag:",TTL for app status cache"` +} - // MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s). - MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"` +// DefaultAppConfig returns the default control plane AppConfig. +func DefaultAppConfig() *AppConfig { + return &AppConfig{ + InternalAppServiceURL: "http://localhost:8091", + CacheTTL: 30 * time.Second, + } } diff --git a/app/internal/config/config.go b/app/internal/config/config.go new file mode 100644 index 0000000000..17e9e01dc2 --- /dev/null +++ b/app/internal/config/config.go @@ -0,0 +1,19 @@ +package config + +import "time" + +// InternalAppConfig holds configuration for the data plane app deployment controller. +type InternalAppConfig struct { + // Enabled controls whether the app deployment controller is started. + Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` + + // BaseDomain is the base domain used to generate public URLs for apps. + // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". + BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` + + // DefaultRequestTimeout is the request timeout applied to apps that don't specify one. + DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"` + + // MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s). + MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"` +} diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index ce24654367..ad3eeca95e 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -18,7 +18,7 @@ import ( ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/flyteorg/flyte/v2/app/config" + "github.com/flyteorg/flyte/v2/app/internal/config" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" ) @@ -78,11 +78,11 @@ type AppK8sClientInterface interface { type AppK8sClient struct { k8sClient client.WithWatch cache ctrlcache.Cache - cfg *config.AppConfig + cfg *config.InternalAppConfig } // NewAppK8sClient creates a new AppK8sClient. -func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.AppConfig) *AppK8sClient { +func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig) *AppK8sClient { return &AppK8sClient{ k8sClient: k8sClient, cache: cache, @@ -423,7 +423,7 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { } // buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template. -func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.AppConfig) map[string]string { +func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.InternalAppConfig) map[string]string { annotations := map[string]string{} autoscaling := spec.GetAutoscaling() if autoscaling == nil { diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 7fa30910c2..05e9f9d7d4 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -18,7 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/flyteorg/flyte/v2/app/config" + "github.com/flyteorg/flyte/v2/app/internal/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) @@ -55,7 +55,7 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { Build() return &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{ + cfg: &config.InternalAppConfig{ DefaultRequestTimeout: 5 * time.Minute, MaxRequestTimeout: time.Hour, }, @@ -228,7 +228,7 @@ func TestGetStatus_CurrentReplicas(t *testing.T) { Build() c := &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{}, + cfg: &config.InternalAppConfig{}, } id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} @@ -277,7 +277,7 @@ func TestList(t *testing.T) { Build() c := &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{ + cfg: &config.InternalAppConfig{ DefaultRequestTimeout: 5 * time.Minute, MaxRequestTimeout: time.Hour, }, @@ -291,43 +291,6 @@ func TestList(t *testing.T) { assert.Equal(t, "app1", apps[0].Metadata.Id.Name) } -func TestList_ByAppName(t *testing.T) { - s := testScheme(t) - ksvc1 := &servingv1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "app1", - Namespace: "proj-dev", - Labels: map[string]string{ - labelAppManaged: "true", - labelProject: "proj", - labelDomain: "dev", - labelAppName: "app1", - }, - Annotations: map[string]string{annotationAppID: "proj/dev/app1"}, - }, - } - ksvc2 := &servingv1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "app2", - Namespace: "proj-dev", - Labels: map[string]string{ - labelAppManaged: "true", - labelProject: "proj", - labelDomain: "dev", - labelAppName: "app2", - }, - Annotations: map[string]string{annotationAppID: "proj/dev/app2"}, - }, - } - fc := fake.NewClientBuilder().WithScheme(s).WithObjects(ksvc1, ksvc2).Build() - c := &AppK8sClient{k8sClient: fc, cfg: &config.AppConfig{}} - - apps, _, err := c.List(context.Background(), "proj", "dev", "app1", 0, "") - require.NoError(t, err) - require.Len(t, apps, 1) - assert.Equal(t, "app1", apps[0].Metadata.Id.Name) -} - func TestGetReplicas(t *testing.T) { s := testScheme(t) pod := &corev1.Pod{ @@ -348,7 +311,7 @@ func TestGetReplicas(t *testing.T) { fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{}, + cfg: &config.InternalAppConfig{}, } id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} @@ -370,7 +333,7 @@ func TestDeleteReplica(t *testing.T) { fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build() c := &AppK8sClient{ k8sClient: fc, - cfg: &config.AppConfig{}, + cfg: &config.InternalAppConfig{}, } replicaID := &flyteapp.ReplicaIdentifier{ diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index 3d75198a40..dc847ec482 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -9,7 +9,7 @@ import ( timestamppb "google.golang.org/protobuf/types/known/timestamppb" k8serrors "k8s.io/apimachinery/pkg/api/errors" - appconfig "github.com/flyteorg/flyte/v2/app/config" + appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" @@ -22,11 +22,11 @@ import ( type InternalAppService struct { appconnect.UnimplementedAppServiceHandler k8s appk8s.AppK8sClientInterface - cfg *appconfig.AppConfig + cfg *appconfig.InternalAppConfig } // NewInternalAppService creates a new InternalAppService. -func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.AppConfig) *InternalAppService { +func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.InternalAppConfig) *InternalAppService { return &InternalAppService{k8s: k8s, cfg: cfg} } diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index 08f7159e8e..c47fa16c9b 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -15,7 +15,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" - appconfig "github.com/flyteorg/flyte/v2/app/config" + appconfig "github.com/flyteorg/flyte/v2/app/internal/config" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" @@ -77,8 +77,8 @@ func (m *mockAppK8sClient) Watch(ctx context.Context, project, domain, appName s // --- helpers --- -func testCfg() *appconfig.AppConfig { - return &appconfig.AppConfig{ +func testCfg() *appconfig.InternalAppConfig { + return &appconfig.InternalAppConfig{ Enabled: true, BaseDomain: "apps.example.com", DefaultRequestTimeout: 5 * time.Minute, diff --git a/app/internal/setup.go b/app/internal/setup.go index 619bcb91d2..7c7ec7a0c5 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -8,7 +8,7 @@ import ( stdlibapp "github.com/flyteorg/flyte/v2/flytestdlib/app" "github.com/flyteorg/flyte/v2/flytestdlib/logger" - appconfig "github.com/flyteorg/flyte/v2/app/config" + appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" "github.com/flyteorg/flyte/v2/app/internal/service" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" @@ -17,7 +17,7 @@ import ( // Setup registers the InternalAppService handler on the SetupContext mux. // It is mounted at /internal to avoid collision with the control plane // AppService, which shares the same proto service definition. -func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.AppConfig) error { +func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.InternalAppConfig) error { if !cfg.Enabled { logger.Infof(ctx, "InternalAppService disabled (apps.enabled=false), skipping setup") return nil diff --git a/app/service/app_service.go b/app/service/app_service.go new file mode 100644 index 0000000000..55b7073811 --- /dev/null +++ b/app/service/app_service.go @@ -0,0 +1,141 @@ +package service + +import ( + "context" + "fmt" + "time" + + "connectrpc.com/connect" + "github.com/hashicorp/golang-lru/v2/expirable" + + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" +) + +// AppService is the control plane implementation of AppServiceHandler. +// It proxies all RPCs to InternalAppService (data plane) and maintains a +// per-instance TTL cache to reduce cross-plane RPC calls on Get. +type AppService struct { + appconnect.UnimplementedAppServiceHandler + internalClient appconnect.AppServiceClient + // cache is nil when cacheTTL=0 (caching disabled). + cache *expirable.LRU[string, *flyteapp.App] +} + +// NewAppService creates a new AppService. +// cacheTTL=0 disables caching (every Get calls InternalAppService). +func NewAppService(internalClient appconnect.AppServiceClient, cacheTTL time.Duration) *AppService { + var cache *expirable.LRU[string, *flyteapp.App] + if cacheTTL > 0 { + cache = expirable.NewLRU[string, *flyteapp.App](0, nil, cacheTTL) + } + return &AppService{ + internalClient: internalClient, + cache: cache, + } +} + +// Ensure AppService satisfies the generated handler interface. +var _ appconnect.AppServiceHandler = (*AppService)(nil) + +// Create forwards to InternalAppService and invalidates the cache entry. +func (s *AppService) Create( + ctx context.Context, + req *connect.Request[flyteapp.CreateRequest], +) (*connect.Response[flyteapp.CreateResponse], error) { + resp, err := s.internalClient.Create(ctx, req) + if err != nil { + return nil, err + } + if s.cache != nil { + s.cache.Remove(cacheKey(req.Msg.GetApp().GetMetadata().GetId())) + } + return resp, nil +} + +// Get returns the app, using the cache on hit and calling InternalAppService on miss. +func (s *AppService) Get( + ctx context.Context, + req *connect.Request[flyteapp.GetRequest], +) (*connect.Response[flyteapp.GetResponse], error) { + appID, ok := req.Msg.GetIdentifier().(*flyteapp.GetRequest_AppId) + if ok && appID.AppId != nil && s.cache != nil { + if app, hit := s.cache.Get(cacheKey(appID.AppId)); hit { + return connect.NewResponse(&flyteapp.GetResponse{App: app}), nil + } + } + + resp, err := s.internalClient.Get(ctx, req) + if err != nil { + return nil, err + } + if ok && appID.AppId != nil && s.cache != nil { + s.cache.Add(cacheKey(appID.AppId), resp.Msg.GetApp()) + } + return resp, nil +} + +// Update forwards to InternalAppService and invalidates the cache entry. +func (s *AppService) Update( + ctx context.Context, + req *connect.Request[flyteapp.UpdateRequest], +) (*connect.Response[flyteapp.UpdateResponse], error) { + resp, err := s.internalClient.Update(ctx, req) + if err != nil { + return nil, err + } + if s.cache != nil { + s.cache.Remove(cacheKey(req.Msg.GetApp().GetMetadata().GetId())) + } + return resp, nil +} + +// Delete forwards to InternalAppService and invalidates the cache entry. +func (s *AppService) Delete( + ctx context.Context, + req *connect.Request[flyteapp.DeleteRequest], +) (*connect.Response[flyteapp.DeleteResponse], error) { + resp, err := s.internalClient.Delete(ctx, req) + if err != nil { + return nil, err + } + if s.cache != nil { + s.cache.Remove(cacheKey(req.Msg.GetAppId())) + } + return resp, nil +} + +// List always forwards to InternalAppService — results vary by filter/pagination. +func (s *AppService) List( + ctx context.Context, + req *connect.Request[flyteapp.ListRequest], +) (*connect.Response[flyteapp.ListResponse], error) { + return s.internalClient.List(ctx, req) +} + +// Watch proxies the server-streaming Watch RPC to InternalAppService. +func (s *AppService) Watch( + ctx context.Context, + req *connect.Request[flyteapp.WatchRequest], + stream *connect.ServerStream[flyteapp.WatchResponse], +) error { + clientStream, err := s.internalClient.Watch(ctx, req) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + defer clientStream.Close() + for clientStream.Receive() { + if err := stream.Send(clientStream.Msg()); err != nil { + return err + } + } + return clientStream.Err() +} + +// cacheKey returns a stable string key for an app identifier. +func cacheKey(id *flyteapp.Identifier) string { + if id == nil { + return "" + } + return fmt.Sprintf("%s/%s/%s", id.GetProject(), id.GetDomain(), id.GetName()) +} diff --git a/app/service/app_service_test.go b/app/service/app_service_test.go new file mode 100644 index 0000000000..7b623b8ff8 --- /dev/null +++ b/app/service/app_service_test.go @@ -0,0 +1,288 @@ +package service + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" + flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" +) + +// mockInternalClient is a testify mock for appconnect.AppServiceClient. +type mockInternalClient struct { + mock.Mock +} + +func (m *mockInternalClient) Create(ctx context.Context, req *connect.Request[flyteapp.CreateRequest]) (*connect.Response[flyteapp.CreateResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.Response[flyteapp.CreateResponse]), args.Error(1) +} + +func (m *mockInternalClient) Get(ctx context.Context, req *connect.Request[flyteapp.GetRequest]) (*connect.Response[flyteapp.GetResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.Response[flyteapp.GetResponse]), args.Error(1) +} + +func (m *mockInternalClient) Update(ctx context.Context, req *connect.Request[flyteapp.UpdateRequest]) (*connect.Response[flyteapp.UpdateResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.Response[flyteapp.UpdateResponse]), args.Error(1) +} + +func (m *mockInternalClient) UpdateStatus(ctx context.Context, req *connect.Request[flyteapp.UpdateStatusRequest]) (*connect.Response[flyteapp.UpdateStatusResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, nil) +} + +func (m *mockInternalClient) Delete(ctx context.Context, req *connect.Request[flyteapp.DeleteRequest]) (*connect.Response[flyteapp.DeleteResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.Response[flyteapp.DeleteResponse]), args.Error(1) +} + +func (m *mockInternalClient) List(ctx context.Context, req *connect.Request[flyteapp.ListRequest]) (*connect.Response[flyteapp.ListResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.Response[flyteapp.ListResponse]), args.Error(1) +} + +func (m *mockInternalClient) Watch(ctx context.Context, req *connect.Request[flyteapp.WatchRequest]) (*connect.ServerStreamForClient[flyteapp.WatchResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.ServerStreamForClient[flyteapp.WatchResponse]), args.Error(1) +} + +func (m *mockInternalClient) Lease(ctx context.Context, req *connect.Request[flyteapp.LeaseRequest]) (*connect.ServerStreamForClient[flyteapp.LeaseResponse], error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*connect.ServerStreamForClient[flyteapp.LeaseResponse]), args.Error(1) +} + +// --- helpers --- + +func testAppID() *flyteapp.Identifier { + return &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} +} + +func testApp() *flyteapp.App { + return &flyteapp.App{ + Metadata: &flyteapp.Meta{Id: testAppID()}, + Spec: &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Container{ + Container: &flytecoreapp.Container{Image: "nginx:latest"}, + }, + }, + Status: &flyteapp.Status{ + Conditions: []*flyteapp.Condition{ + {DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE}, + }, + }, + } +} + +// --- Get with cache --- + +func TestGet_CacheMiss_CallsInternal(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + appID := testAppID() + app := testApp() + internal.On("Get", mock.Anything, mock.Anything).Return( + connect.NewResponse(&flyteapp.GetResponse{App: app}), nil, + ) + + resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + internal.AssertExpectations(t) +} + +func TestGet_CacheHit_SkipsInternal(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + // Pre-populate cache. + appID := testAppID() + svc.cache.Add(cacheKey(appID), testApp()) + + // Internal should NOT be called. + resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.NoError(t, err) + assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + internal.AssertNotCalled(t, "Get") +} + +func TestGet_CacheExpired_CallsInternal(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 1*time.Millisecond) + + appID := testAppID() + svc.cache.Add(cacheKey(appID), testApp()) + time.Sleep(5 * time.Millisecond) // let TTL expire + + app := testApp() + internal.On("Get", mock.Anything, mock.Anything).Return( + connect.NewResponse(&flyteapp.GetResponse{App: app}), nil, + ) + + _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ + Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, + })) + require.NoError(t, err) + internal.AssertExpectations(t) +} + +// --- Create / Update / Delete invalidate cache --- + +func TestCreate_InvalidatesCache(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + app := testApp() + // Pre-populate cache so we can confirm it's cleared. + svc.cache.Add(cacheKey(app.Metadata.Id), app) + + internal.On("Create", mock.Anything, mock.Anything).Return( + connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil, + ) + + _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) + require.NoError(t, err) + + _, hit := svc.cache.Get(cacheKey(app.Metadata.Id)) + assert.False(t, hit, "cache should be invalidated after Create") + internal.AssertExpectations(t) +} + +func TestUpdate_InvalidatesCache(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + app := testApp() + svc.cache.Add(cacheKey(app.Metadata.Id), app) + + internal.On("Update", mock.Anything, mock.Anything).Return( + connect.NewResponse(&flyteapp.UpdateResponse{App: app}), nil, + ) + + _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) + require.NoError(t, err) + + _, hit := svc.cache.Get(cacheKey(app.Metadata.Id)) + assert.False(t, hit, "cache should be invalidated after Update") + internal.AssertExpectations(t) +} + +func TestDelete_InvalidatesCache(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + appID := testAppID() + svc.cache.Add(cacheKey(appID), testApp()) + + internal.On("Delete", mock.Anything, mock.Anything).Return( + connect.NewResponse(&flyteapp.DeleteResponse{}), nil, + ) + + _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{AppId: appID})) + require.NoError(t, err) + + _, hit := svc.cache.Get(cacheKey(appID)) + assert.False(t, hit, "cache should be invalidated after Delete") + internal.AssertExpectations(t) +} + +// --- List always forwards --- + +func TestList_AlwaysCallsInternal(t *testing.T) { + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + internal.On("List", mock.Anything, mock.Anything).Return( + connect.NewResponse(&flyteapp.ListResponse{Apps: []*flyteapp.App{testApp()}}), nil, + ) + + resp, err := svc.List(context.Background(), connect.NewRequest(&flyteapp.ListRequest{})) + require.NoError(t, err) + assert.Len(t, resp.Msg.Apps, 1) + internal.AssertExpectations(t) +} + +// --- Watch streams through --- + +func TestWatch_ProxiesStream(t *testing.T) { + // Use a real httptest server to exercise the streaming path. + internal := &mockInternalClient{} + svc := NewAppService(internal, 30*time.Second) + + path, handler := appconnect.NewAppServiceHandler(svc) + mux := http.NewServeMux() + mux.Handle(path, handler) + server := httptest.NewServer(mux) + t.Cleanup(server.Close) + + // Mount InternalAppService on the same test server at /internal so the + // proxy can route to it. + internalSvcPath, internalSvcHandler := appconnect.NewAppServiceHandler( + &echoWatchService{app: testApp()}, + ) + mux.Handle("/internal"+internalSvcPath, http.StripPrefix("/internal", internalSvcHandler)) + + // Point AppService proxy at the internal path on the same server. + svc.internalClient = appconnect.NewAppServiceClient(http.DefaultClient, server.URL+"/internal") + + client := appconnect.NewAppServiceClient(http.DefaultClient, server.URL) + stream, err := client.Watch(context.Background(), connect.NewRequest(&flyteapp.WatchRequest{})) + require.NoError(t, err) + + require.True(t, stream.Receive()) + assert.Equal(t, "myapp", stream.Msg().GetCreateEvent().GetApp().GetMetadata().GetId().GetName()) + stream.Close() +} + +// echoWatchService sends one CreateEvent then closes the stream. +type echoWatchService struct { + appconnect.UnimplementedAppServiceHandler + app *flyteapp.App +} + +func (e *echoWatchService) Watch( + _ context.Context, + _ *connect.Request[flyteapp.WatchRequest], + stream *connect.ServerStream[flyteapp.WatchResponse], +) error { + return stream.Send(&flyteapp.WatchResponse{ + Event: &flyteapp.WatchResponse_CreateEvent{ + CreateEvent: &flyteapp.CreateEvent{App: e.app}, + }, + }) +} diff --git a/app/setup.go b/app/setup.go new file mode 100644 index 0000000000..31fcdd8d6a --- /dev/null +++ b/app/setup.go @@ -0,0 +1,36 @@ +package app + +import ( + "context" + "net/http" + + stdlibapp "github.com/flyteorg/flyte/v2/flytestdlib/app" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + + appconfig "github.com/flyteorg/flyte/v2/app/config" + "github.com/flyteorg/flyte/v2/app/service" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" +) + +// Setup registers the control plane AppService handler on the SetupContext mux. +// In unified mode (sc.BaseURL set), the proxy routes to InternalAppService on +// the same mux via the /internal prefix — no network hop. In split mode, +// cfg.InternalAppServiceURL points at the data plane host. +func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.AppConfig) error { + internalAppURL := cfg.InternalAppServiceURL + if sc.BaseURL != "" { + internalAppURL = sc.BaseURL + } + + internalClient := appconnect.NewAppServiceClient( + http.DefaultClient, + internalAppURL+"/internal", + ) + + appSvc := service.NewAppService(internalClient, cfg.CacheTTL) + path, handler := appconnect.NewAppServiceHandler(appSvc) + sc.Mux.Handle(path, handler) + logger.Infof(ctx, "Mounted AppService at %s", path) + + return nil +} diff --git a/go.mod b/go.mod index d08b80b45f..acc2fdce87 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 github.com/hashicorp/golang-lru v1.0.2 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.7.6 diff --git a/go.sum b/go.sum index 70757065de..d2ef7ade54 100644 --- a/go.sum +++ b/go.sum @@ -366,6 +366,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/runs/config/config.go b/runs/config/config.go index 590dcb0a6d..9c2ab1a73b 100644 --- a/runs/config/config.go +++ b/runs/config/config.go @@ -32,9 +32,6 @@ var defaultConfig = &Config{ ExecutionQPS: 10.0, ExecutionBurst: 20, }, - Apps: AppsConfig{ - InternalAppServiceURL: "http://localhost:8091", - }, } var configSection = config.MustRegisterSection(configSectionKey, defaultConfig) @@ -66,8 +63,6 @@ type Config struct { // TriggerScheduler configures the cron-based trigger scheduler worker. TriggerScheduler TriggerSchedulerConfig `json:"triggerScheduler"` - // Apps holds configuration for the App service. - Apps AppsConfig `json:"apps"` } // ServerConfig holds HTTP server configuration @@ -100,18 +95,6 @@ type TriggerSchedulerConfig struct { ExecutionBurst int `json:"executionBurst" pflag:",Burst size for CreateRun rate limiter"` } -// AppsConfig holds configuration for the App service in the runs (control plane). -type AppsConfig struct { - // PublicURLPattern is a Go template for generating public ingress URLs. - // Available variables: {{.Name}}, {{.Project}}, {{.Domain}} - // Example: "https://{{.Name}}-{{.Project}}.apps.flyte.example.com" - PublicURLPattern string `json:"publicUrlPattern" pflag:",URL pattern for app ingress"` - - // InternalAppServiceURL is the base URL of the InternalAppService (actions data plane). - // In unified mode this is overridden by sc.BaseURL. - InternalAppServiceURL string `json:"internalAppServiceUrl" pflag:",URL of the internal app service"` -} - // GetConfig returns the parsed runs configuration func GetConfig() *Config { return configSection.GetConfig().(*Config) diff --git a/runs/service/app_service.go b/runs/service/app_service.go deleted file mode 100644 index dbccfe3f9a..0000000000 --- a/runs/service/app_service.go +++ /dev/null @@ -1,80 +0,0 @@ -package service - -import ( - "context" - "errors" - - "connectrpc.com/connect" - - flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" - "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" -) - -// AppService is a dummy implementation that returns empty responses for all endpoints. -type AppService struct { - appconnect.UnimplementedAppServiceHandler -} - -func NewAppService() *AppService { - return &AppService{} -} - -var _ appconnect.AppServiceHandler = (*AppService)(nil) - -func (s *AppService) Create( - ctx context.Context, - req *connect.Request[flyteapp.CreateRequest], -) (*connect.Response[flyteapp.CreateResponse], error) { - return nil, connect.NewError(connect.CodeUnimplemented, errors.New("App service is not implemented")) -} - -func (s *AppService) Get( - ctx context.Context, - req *connect.Request[flyteapp.GetRequest], -) (*connect.Response[flyteapp.GetResponse], error) { - return connect.NewResponse(&flyteapp.GetResponse{}), nil -} - -func (s *AppService) Update( - ctx context.Context, - req *connect.Request[flyteapp.UpdateRequest], -) (*connect.Response[flyteapp.UpdateResponse], error) { - return connect.NewResponse(&flyteapp.UpdateResponse{}), nil -} - -func (s *AppService) UpdateStatus( - ctx context.Context, - req *connect.Request[flyteapp.UpdateStatusRequest], -) (*connect.Response[flyteapp.UpdateStatusResponse], error) { - return connect.NewResponse(&flyteapp.UpdateStatusResponse{}), nil -} - -func (s *AppService) Delete( - ctx context.Context, - req *connect.Request[flyteapp.DeleteRequest], -) (*connect.Response[flyteapp.DeleteResponse], error) { - return connect.NewResponse(&flyteapp.DeleteResponse{}), nil -} - -func (s *AppService) List( - ctx context.Context, - req *connect.Request[flyteapp.ListRequest], -) (*connect.Response[flyteapp.ListResponse], error) { - return connect.NewResponse(&flyteapp.ListResponse{}), nil -} - -func (s *AppService) Watch( - ctx context.Context, - req *connect.Request[flyteapp.WatchRequest], - stream *connect.ServerStream[flyteapp.WatchResponse], -) error { - return nil -} - -func (s *AppService) Lease( - ctx context.Context, - req *connect.Request[flyteapp.LeaseRequest], - stream *connect.ServerStream[flyteapp.LeaseResponse], -) error { - return nil -} diff --git a/runs/setup.go b/runs/setup.go index 85e90f1723..88f0663954 100644 --- a/runs/setup.go +++ b/runs/setup.go @@ -9,7 +9,6 @@ import ( "github.com/flyteorg/flyte/v2/flytestdlib/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/actions/actionsconnect" - flyteappconnect "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/auth/authconnect" projectpb "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project/projectconnect" @@ -93,11 +92,6 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { sc.Mux.Handle(authMetadataPath, authMetadataHandler) logger.Infof(ctx, "Mounted AuthMetadataService at %s", authMetadataPath) - appSvc := service.NewAppService() - appPath, appHandler := flyteappconnect.NewAppServiceHandler(appSvc) - sc.Mux.Handle(appPath, appHandler) - logger.Infof(ctx, "Mounted AppService at %s", appPath) - triggerSvc := service.NewTriggerService(repo) triggerPath, triggerHandler := triggerconnect.NewTriggerServiceHandler(triggerSvc) sc.Mux.Handle(triggerPath, triggerHandler)