Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 162 additions & 16 deletions app/internal/k8s/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -172,51 +174,195 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e
return nil
}

// watchBackoff controls the reconnect timing for Watch. Declared as vars so
// tests can override them without sleeping.
var (
watchBackoffInitial = 1 * time.Second
watchBackoffMax = 30 * time.Second
watchBackoffFactor = 2.0
)

// watchState holds mutable reconnect state for a single Watch call.
// It is goroutine-local — no mutex needed.
type watchState struct {
// lastResourceVersion is the most recent RV seen from any event or Bookmark.
// Passed to openWatch on reconnect so K8s resumes from exactly where we left off.
lastResourceVersion string
backoff time.Duration
consecutiveErrors int
}

func (s *watchState) nextBackoff() time.Duration {
d := s.backoff
if d == 0 {
d = watchBackoffInitial
}
s.backoff = time.Duration(math.Min(float64(d)*watchBackoffFactor, float64(watchBackoffMax)))
return d
}

func (s *watchState) resetBackoff() {
s.backoff = watchBackoffInitial
s.consecutiveErrors = 0
}

// Watch returns a channel of WatchResponse events for KServices in the given
// project/domain scope. If appName is non-empty, only events for that specific
// app are returned. The channel is closed when ctx is cancelled or the
// underlying watch terminates.
// app are returned. The channel is closed only when ctx is cancelled.
//
// The goroutine reconnects transparently when the underlying K8s watch closes
// unexpectedly, tracking resourceVersion to resume without gaps or replays.
func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) {
ns := appNamespace(project, domain)

labels := map[string]string{labelAppManaged: "true"}
if appName != "" {
labels[labelAppName] = strings.ToLower(appName)
}

watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{},
// Open the first watcher eagerly so initial errors (RBAC, missing CRD) are
// returned synchronously before spawning the goroutine.
watcher, err := c.openWatch(ctx, ns, labels, "")
if err != nil {
return nil, err
}

ch := make(chan *flyteapp.WatchResponse, 64)
go c.watchLoop(ctx, ns, labels, watcher, ch)
return ch, nil
}

// openWatch starts a K8s watch from resourceVersion (empty = watch from now).
// AllowWatchBookmarks is always set so K8s sends Bookmark events on every session,
// keeping lastResourceVersion current even when no objects change.
func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[string]string, resourceVersion string) (k8swatch.Interface, error) {
rawOpts := &metav1.ListOptions{AllowWatchBookmarks: true}
if resourceVersion != "" {
rawOpts.ResourceVersion = resourceVersion
}
opts := []client.ListOption{
client.InNamespace(ns),
client.MatchingLabels(labels),
)
&client.ListOptions{Raw: rawOpts},
}
watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, opts...)
if err != nil {
return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err)
}
return watcher, nil
}

ch := make(chan *flyteapp.WatchResponse, 64)
go func() {
defer close(ch)
defer watcher.Stop()
for {
// watchLoop is the reconnect loop. It drains watcher until it closes, then
// reopens. Exponential backoff is applied only on K8s Error events; normal
// watch timeouts (clean channel close) reconnect immediately. Closes ch only
// when ctx is cancelled.
func (c *AppK8sClient) watchLoop(
ctx context.Context,
ns string,
labels map[string]string,
watcher k8swatch.Interface,
ch chan<- *flyteapp.WatchResponse,
) {
defer close(ch)
defer watcher.Stop()

state := &watchState{backoff: watchBackoffInitial}

for {
reconnect, isError := c.drainWatcher(ctx, watcher, ch, state)
if !reconnect {
return // ctx cancelled
}

watcher.Stop()

if isError {
state.consecutiveErrors++
delay := state.nextBackoff()
logger.Warnf(ctx, "KService watch in namespace %s closed with error (attempt %d); reconnecting in %v",
ns, state.consecutiveErrors, delay)
select {
case <-ctx.Done():
return
case event, ok := <-watcher.ResultChan():
if !ok {
return
case <-time.After(delay):
}
} else {
// Normal K8s watch timeout — reconnect immediately, no backoff.
state.resetBackoff()
logger.Debugf(ctx, "KService watch in namespace %s timed out naturally; reconnecting", ns)
}

newWatcher, err := c.openWatch(ctx, ns, labels, state.lastResourceVersion)
if err != nil {
logger.Errorf(ctx, "Failed to reopen KService watch in namespace %s: %v", ns, err)
// Use an immediately-closed watcher so the loop retries with further backoff.
watcher = k8swatch.NewEmptyWatch()
continue
}
watcher = newWatcher
}
}

// drainWatcher processes events from watcher until the channel closes or ctx is done.
// Returns (reconnect, isError): reconnect=false means ctx was cancelled (stop the loop);
// isError=true means a K8s Error event triggered the close and backoff should be applied.
func (c *AppK8sClient) drainWatcher(
ctx context.Context,
watcher k8swatch.Interface,
ch chan<- *flyteapp.WatchResponse,
state *watchState,
) (bool, bool) {
for {
select {
case <-ctx.Done():
return false, false
case event, ok := <-watcher.ResultChan():
if !ok {
return true, false // normal K8s watch timeout
}

switch event.Type {
case k8swatch.Error:
if status, ok := event.Object.(*metav1.Status); ok {
logger.Warnf(ctx, "KService watch received error event (code=%d reason=%s): %s; will reconnect",
status.Code, status.Reason, status.Message)
} else {
logger.Warnf(ctx, "KService watch received error event (type %T); will reconnect", event.Object)
}
return true, true // error — apply backoff on reconnect
case k8swatch.Bookmark:
// Update RV immediately — there is no delivery to confirm.
c.updateResourceVersion(event, state)
state.resetBackoff()
default:
resp := c.kserviceEventToWatchResponse(ctx, event)
if resp == nil {
continue
}
select {
case ch <- resp:
// Advance RV only after confirmed delivery so a failed send
// doesn't silently skip the event on the next reconnect.
c.updateResourceVersion(event, state)
state.resetBackoff()
case <-ctx.Done():
return
return false, false
}
}
}
}()
return ch, nil
}
}

// updateResourceVersion extracts and stores the latest resourceVersion from a watch event.
// For Bookmark events it is called immediately; for data events only after successful delivery.
func (c *AppK8sClient) updateResourceVersion(event k8swatch.Event, state *watchState) {
switch event.Type {
case k8swatch.Added, k8swatch.Modified, k8swatch.Deleted, k8swatch.Bookmark:
if ksvc, ok := event.Object.(*servingv1.Service); ok {
if rv := ksvc.GetResourceVersion(); rv != "" {
state.lastResourceVersion = rv
}
}
}
}

// kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse.
Expand Down
Loading
Loading