Skip to content
Merged
25 changes: 14 additions & 11 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package config

import "time"

// AppConfig holds configuration for the App deployment controller.
// AppConfig holds configuration for the control plane AppService.
type AppConfig struct {
// Enabled controls whether the app deployment controller is started.
Enabled bool `json:"enabled" pflag:",Enable app deployment controller"`
// InternalAppServiceURL is the base URL of the InternalAppService (data plane).
// In unified mode this is overridden by the shared mux BaseURL.
InternalAppServiceURL string `json:"internalAppServiceUrl" pflag:",URL of the internal app service"`

// BaseDomain is the base domain used to generate public URLs for apps.
// Apps are exposed at "{name}-{project}-{domain}.{base_domain}".
BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"`

// DefaultRequestTimeout is the request timeout applied to apps that don't specify one.
DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"`
// CacheTTL is the TTL for the in-memory app status cache.
// Defaults to 30s. Set to 0 to disable caching.
CacheTTL time.Duration `json:"cacheTtl" pflag:",TTL for app status cache"`
}

// MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s).
MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"`
// DefaultAppConfig returns the default control plane AppConfig.
func DefaultAppConfig() *AppConfig {
return &AppConfig{
InternalAppServiceURL: "http://localhost:8091",
CacheTTL: 30 * time.Second,
}
}
19 changes: 19 additions & 0 deletions app/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package config

import "time"

// InternalAppConfig holds configuration for the data plane app deployment controller.
type InternalAppConfig struct {
// Enabled controls whether the app deployment controller is started.
Enabled bool `json:"enabled" pflag:",Enable app deployment controller"`

// BaseDomain is the base domain used to generate public URLs for apps.
// Apps are exposed at "{name}-{project}-{domain}.{base_domain}".
BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"`

// DefaultRequestTimeout is the request timeout applied to apps that don't specify one.
DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"`

// MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s).
MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"`
}
8 changes: 4 additions & 4 deletions app/internal/k8s/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/v2/app/config"
"github.com/flyteorg/flyte/v2/app/internal/config"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
)
Expand Down Expand Up @@ -78,11 +78,11 @@ type AppK8sClientInterface interface {
type AppK8sClient struct {
k8sClient client.WithWatch
cache ctrlcache.Cache
cfg *config.AppConfig
cfg *config.InternalAppConfig
}

// NewAppK8sClient creates a new AppK8sClient.
func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.AppConfig) *AppK8sClient {
func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig) *AppK8sClient {
return &AppK8sClient{
k8sClient: k8sClient,
cache: cache,
Expand Down Expand Up @@ -423,7 +423,7 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) {
}

// buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template.
func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.AppConfig) map[string]string {
func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.InternalAppConfig) map[string]string {
annotations := map[string]string{}
autoscaling := spec.GetAutoscaling()
if autoscaling == nil {
Expand Down
49 changes: 6 additions & 43 deletions app/internal/k8s/app_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/flyteorg/flyte/v2/app/config"
"github.com/flyteorg/flyte/v2/app/internal/config"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)
Expand Down Expand Up @@ -55,7 +55,7 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient {
Build()
return &AppK8sClient{
k8sClient: fc,
cfg: &config.AppConfig{
cfg: &config.InternalAppConfig{
DefaultRequestTimeout: 5 * time.Minute,
MaxRequestTimeout: time.Hour,
},
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestGetStatus_CurrentReplicas(t *testing.T) {
Build()
c := &AppK8sClient{
k8sClient: fc,
cfg: &config.AppConfig{},
cfg: &config.InternalAppConfig{},
}

id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestList(t *testing.T) {
Build()
c := &AppK8sClient{
k8sClient: fc,
cfg: &config.AppConfig{
cfg: &config.InternalAppConfig{
DefaultRequestTimeout: 5 * time.Minute,
MaxRequestTimeout: time.Hour,
},
Expand All @@ -291,43 +291,6 @@ func TestList(t *testing.T) {
assert.Equal(t, "app1", apps[0].Metadata.Id.Name)
}

func TestList_ByAppName(t *testing.T) {
s := testScheme(t)
ksvc1 := &servingv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "app1",
Namespace: "proj-dev",
Labels: map[string]string{
labelAppManaged: "true",
labelProject: "proj",
labelDomain: "dev",
labelAppName: "app1",
},
Annotations: map[string]string{annotationAppID: "proj/dev/app1"},
},
}
ksvc2 := &servingv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "app2",
Namespace: "proj-dev",
Labels: map[string]string{
labelAppManaged: "true",
labelProject: "proj",
labelDomain: "dev",
labelAppName: "app2",
},
Annotations: map[string]string{annotationAppID: "proj/dev/app2"},
},
}
fc := fake.NewClientBuilder().WithScheme(s).WithObjects(ksvc1, ksvc2).Build()
c := &AppK8sClient{k8sClient: fc, cfg: &config.AppConfig{}}

apps, _, err := c.List(context.Background(), "proj", "dev", "app1", 0, "")
require.NoError(t, err)
require.Len(t, apps, 1)
assert.Equal(t, "app1", apps[0].Metadata.Id.Name)
}

func TestGetReplicas(t *testing.T) {
s := testScheme(t)
pod := &corev1.Pod{
Expand All @@ -348,7 +311,7 @@ func TestGetReplicas(t *testing.T) {
fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build()
c := &AppK8sClient{
k8sClient: fc,
cfg: &config.AppConfig{},
cfg: &config.InternalAppConfig{},
}

id := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}
Expand All @@ -370,7 +333,7 @@ func TestDeleteReplica(t *testing.T) {
fc := fake.NewClientBuilder().WithScheme(s).WithObjects(pod).Build()
c := &AppK8sClient{
k8sClient: fc,
cfg: &config.AppConfig{},
cfg: &config.InternalAppConfig{},
}

replicaID := &flyteapp.ReplicaIdentifier{
Expand Down
6 changes: 3 additions & 3 deletions app/internal/service/internal_app_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
k8serrors "k8s.io/apimachinery/pkg/api/errors"

appconfig "github.com/flyteorg/flyte/v2/app/config"
appconfig "github.com/flyteorg/flyte/v2/app/internal/config"
appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
Expand All @@ -22,11 +22,11 @@ import (
type InternalAppService struct {
appconnect.UnimplementedAppServiceHandler
k8s appk8s.AppK8sClientInterface
cfg *appconfig.AppConfig
cfg *appconfig.InternalAppConfig
}

// NewInternalAppService creates a new InternalAppService.
func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.AppConfig) *InternalAppService {
func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.InternalAppConfig) *InternalAppService {
return &InternalAppService{k8s: k8s, cfg: cfg}
}

Expand Down
6 changes: 3 additions & 3 deletions app/internal/service/internal_app_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"

appconfig "github.com/flyteorg/flyte/v2/app/config"
appconfig "github.com/flyteorg/flyte/v2/app/internal/config"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect"
flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
Expand Down Expand Up @@ -77,8 +77,8 @@ func (m *mockAppK8sClient) Watch(ctx context.Context, project, domain, appName s

// --- helpers ---

func testCfg() *appconfig.AppConfig {
return &appconfig.AppConfig{
func testCfg() *appconfig.InternalAppConfig {
return &appconfig.InternalAppConfig{
Enabled: true,
BaseDomain: "apps.example.com",
DefaultRequestTimeout: 5 * time.Minute,
Expand Down
4 changes: 2 additions & 2 deletions app/internal/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
stdlibapp "github.com/flyteorg/flyte/v2/flytestdlib/app"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"

appconfig "github.com/flyteorg/flyte/v2/app/config"
appconfig "github.com/flyteorg/flyte/v2/app/internal/config"
appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s"
"github.com/flyteorg/flyte/v2/app/internal/service"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect"
Expand All @@ -17,7 +17,7 @@ import (
// Setup registers the InternalAppService handler on the SetupContext mux.
// It is mounted at /internal<path> to avoid collision with the control plane
// AppService, which shares the same proto service definition.
func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.AppConfig) error {
func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.InternalAppConfig) error {
if !cfg.Enabled {
logger.Infof(ctx, "InternalAppService disabled (apps.enabled=false), skipping setup")
return nil
Expand Down
141 changes: 141 additions & 0 deletions app/service/app_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package service

import (
"context"
"fmt"
"time"

"connectrpc.com/connect"
"github.com/hashicorp/golang-lru/v2/expirable"

flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect"
)

// AppService is the control plane implementation of AppServiceHandler.
// It proxies all RPCs to InternalAppService (data plane) and maintains a
// per-instance TTL cache to reduce cross-plane RPC calls on Get.
type AppService struct {
appconnect.UnimplementedAppServiceHandler
internalClient appconnect.AppServiceClient
// cache is nil when cacheTTL=0 (caching disabled).
cache *expirable.LRU[string, *flyteapp.App]
}

// NewAppService creates a new AppService.
// cacheTTL=0 disables caching (every Get calls InternalAppService).
func NewAppService(internalClient appconnect.AppServiceClient, cacheTTL time.Duration) *AppService {
var cache *expirable.LRU[string, *flyteapp.App]
if cacheTTL > 0 {
cache = expirable.NewLRU[string, *flyteapp.App](0, nil, cacheTTL)
}
return &AppService{
internalClient: internalClient,
cache: cache,
}
}

// Ensure AppService satisfies the generated handler interface.
var _ appconnect.AppServiceHandler = (*AppService)(nil)

// Create forwards to InternalAppService and invalidates the cache entry.
func (s *AppService) Create(
ctx context.Context,
req *connect.Request[flyteapp.CreateRequest],
) (*connect.Response[flyteapp.CreateResponse], error) {
resp, err := s.internalClient.Create(ctx, req)
if err != nil {
return nil, err
}
if s.cache != nil {
s.cache.Remove(cacheKey(req.Msg.GetApp().GetMetadata().GetId()))
}
return resp, nil
}

// Get returns the app, using the cache on hit and calling InternalAppService on miss.
func (s *AppService) Get(
ctx context.Context,
req *connect.Request[flyteapp.GetRequest],
) (*connect.Response[flyteapp.GetResponse], error) {
appID, ok := req.Msg.GetIdentifier().(*flyteapp.GetRequest_AppId)
if ok && appID.AppId != nil && s.cache != nil {
if app, hit := s.cache.Get(cacheKey(appID.AppId)); hit {
return connect.NewResponse(&flyteapp.GetResponse{App: app}), nil
}
}

resp, err := s.internalClient.Get(ctx, req)
if err != nil {
return nil, err
}
if ok && appID.AppId != nil && s.cache != nil {
s.cache.Add(cacheKey(appID.AppId), resp.Msg.GetApp())
}
return resp, nil
}

// Update forwards to InternalAppService and invalidates the cache entry.
func (s *AppService) Update(
ctx context.Context,
req *connect.Request[flyteapp.UpdateRequest],
) (*connect.Response[flyteapp.UpdateResponse], error) {
resp, err := s.internalClient.Update(ctx, req)
if err != nil {
return nil, err
}
if s.cache != nil {
s.cache.Remove(cacheKey(req.Msg.GetApp().GetMetadata().GetId()))
}
return resp, nil
}

// Delete forwards to InternalAppService and invalidates the cache entry.
func (s *AppService) Delete(
ctx context.Context,
req *connect.Request[flyteapp.DeleteRequest],
) (*connect.Response[flyteapp.DeleteResponse], error) {
resp, err := s.internalClient.Delete(ctx, req)
if err != nil {
return nil, err
}
if s.cache != nil {
s.cache.Remove(cacheKey(req.Msg.GetAppId()))
}
return resp, nil
}

// List always forwards to InternalAppService — results vary by filter/pagination.
func (s *AppService) List(
ctx context.Context,
req *connect.Request[flyteapp.ListRequest],
) (*connect.Response[flyteapp.ListResponse], error) {
return s.internalClient.List(ctx, req)
}

// Watch proxies the server-streaming Watch RPC to InternalAppService.
func (s *AppService) Watch(
ctx context.Context,
req *connect.Request[flyteapp.WatchRequest],
stream *connect.ServerStream[flyteapp.WatchResponse],
) error {
clientStream, err := s.internalClient.Watch(ctx, req)
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}
defer clientStream.Close()
for clientStream.Receive() {
if err := stream.Send(clientStream.Msg()); err != nil {
return err
}
}
return clientStream.Err()
}

// cacheKey returns a stable string key for an app identifier.
func cacheKey(id *flyteapp.Identifier) string {
if id == nil {
return ""
}
return fmt.Sprintf("%s/%s/%s", id.GetProject(), id.GetDomain(), id.GetName())
}
Loading
Loading