Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ type AppConfig struct {
// Enabled controls whether the app deployment controller is started.
Enabled bool `json:"enabled" pflag:",Enable app deployment controller"`

// BaseDomain is the base domain used to generate public URLs for apps.
// Apps are exposed at "{name}-{project}-{domain}.{base_domain}".
BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"`

// DefaultRequestTimeout is the request timeout applied to apps that don't specify one.
DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"`

Expand Down
65 changes: 42 additions & 23 deletions app/internal/k8s/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ type AppK8sClientInterface interface {
// Returns a not-found error (checkable with k8serrors.IsNotFound) if the KService does not exist.
GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error)

// List returns all apps (spec + live status) for the given project/domain scope.
List(ctx context.Context, project, domain string) ([]*flyteapp.App, error)
// List returns apps for the given project/domain scope with optional pagination.
// If appName is non-empty, only the app with that name is returned.
// limit=0 means no limit. token is the K8s continue token from a previous call.
// Returns the apps, the continue token for the next page (empty if last page), and any error.
List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error)

// Delete removes the KService CRD entirely. The app must be re-created from scratch.
// Use Stop to scale to zero while preserving the KService.
Expand All @@ -66,8 +69,9 @@ type AppK8sClientInterface interface {
DeleteReplica(ctx context.Context, replicaID *flyteapp.ReplicaIdentifier) error

// Watch returns a channel of WatchResponse events for KServices matching the
// given project/domain scope. The channel is closed when ctx is cancelled.
Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error)
// given project/domain scope. If appName is non-empty, only events for that
// specific app are returned. The channel is closed when ctx is cancelled.
Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error)
}

// AppK8sClient implements AppK8sClientInterface using controller-runtime.
Expand Down Expand Up @@ -169,13 +173,20 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e
}

// Watch returns a channel of WatchResponse events for KServices in the given
// project/domain scope. The channel is closed when ctx is cancelled or the
// project/domain scope. If appName is non-empty, only events for that specific
// app are returned. The channel is closed when ctx is cancelled or the
// underlying watch terminates.
func (c *AppK8sClient) Watch(ctx context.Context, project, domain string) (<-chan *flyteapp.WatchResponse, error) {
func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) {
ns := appNamespace(project, domain)

labels := map[string]string{labelAppManaged: "true"}
if appName != "" {
labels[labelAppName] = strings.ToLower(appName)
}

watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{},
client.InNamespace(ns),
client.MatchingLabels{labelAppManaged: "true"},
client.MatchingLabels(labels),
)
if err != nil {
return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err)
Expand Down Expand Up @@ -258,16 +269,28 @@ func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier
return c.kserviceToStatus(ctx, ksvc), nil
}

// List returns all apps for the given project/domain by listing KServices in the
// project/domain namespace.
func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*flyteapp.App, error) {
// List returns apps for the given project/domain scope with optional pagination.
func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) {
ns := appNamespace(project, domain)
list := &servingv1.ServiceList{}
if err := c.k8sClient.List(ctx, list,

matchLabels := client.MatchingLabels{labelAppManaged: "true"}
if appName != "" {
matchLabels[labelAppName] = strings.ToLower(appName)
}
listOpts := []client.ListOption{
client.InNamespace(ns),
client.MatchingLabels{labelAppManaged: "true"},
); err != nil {
return nil, fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err)
matchLabels,
}
if limit > 0 {
listOpts = append(listOpts, client.Limit(int64(limit)))
}
if token != "" {
listOpts = append(listOpts, client.Continue(token))
}

list := &servingv1.ServiceList{}
if err := c.k8sClient.List(ctx, list, listOpts...); err != nil {
return nil, "", fmt.Errorf("failed to list KServices for %s/%s: %w", project, domain, err)
}

apps := make([]*flyteapp.App, 0, len(list.Items))
Expand All @@ -279,7 +302,7 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string) ([]*fly
}
apps = append(apps, a)
}
return apps, nil
return apps, list.Continue, nil
}

// --- Helpers ---
Expand All @@ -302,7 +325,7 @@ func kserviceName(id *flyteapp.Identifier) string {

// specSHA computes a SHA256 digest of the serialized App Spec proto.
func specSHA(spec *flyteapp.Spec) (string, error) {
b, err := proto.MarshalOptions{Deterministic: true}.Marshal(spec)
b, err := proto.Marshal(spec)
if err != nil {
return "", fmt.Errorf("failed to marshal spec: %w", err)
}
Expand Down Expand Up @@ -358,10 +381,6 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err
Template: servingv1.RevisionTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: templateAnnotations,
Labels: map[string]string{
labelAppManaged: "true",
labelAppName: appID.GetName(),
},
},
Spec: servingv1.RevisionSpec{
PodSpec: podSpec,
Expand Down Expand Up @@ -467,8 +486,8 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser
phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE
case ksvc.IsFailed():
phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED
if condition := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); condition != nil {
message = condition.Message
if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil {
message = c.Message
}
case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName:
phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING
Expand Down
40 changes: 39 additions & 1 deletion app/internal/k8s/app_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,51 @@ func TestList(t *testing.T) {
},
}

apps, err := c.List(context.Background(), "proj", "dev")
apps, nextToken, err := c.List(context.Background(), "proj", "dev", "", 0, "")
require.NoError(t, err)
assert.Empty(t, nextToken)
require.Len(t, apps, 1)
assert.Equal(t, "proj", apps[0].Metadata.Id.Project)
assert.Equal(t, "app1", apps[0].Metadata.Id.Name)
}

func TestList_ByAppName(t *testing.T) {
s := testScheme(t)
ksvc1 := &servingv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "app1",
Namespace: "proj-dev",
Labels: map[string]string{
labelAppManaged: "true",
labelProject: "proj",
labelDomain: "dev",
labelAppName: "app1",
},
Annotations: map[string]string{annotationAppID: "proj/dev/app1"},
},
}
ksvc2 := &servingv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "app2",
Namespace: "proj-dev",
Labels: map[string]string{
labelAppManaged: "true",
labelProject: "proj",
labelDomain: "dev",
labelAppName: "app2",
},
Annotations: map[string]string{annotationAppID: "proj/dev/app2"},
},
}
fc := fake.NewClientBuilder().WithScheme(s).WithObjects(ksvc1, ksvc2).Build()
c := &AppK8sClient{k8sClient: fc, cfg: &config.AppConfig{}}

apps, _, err := c.List(context.Background(), "proj", "dev", "app1", 0, "")
require.NoError(t, err)
require.Len(t, apps, 1)
assert.Equal(t, "app1", apps[0].Metadata.Id.Name)
}

func TestGetReplicas(t *testing.T) {
s := testScheme(t)
pod := &corev1.Pod{
Expand Down
Loading
Loading