From 372378f024d35704bdc21708069d8d3398b717f7 Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Thu, 9 Apr 2026 12:59:59 -0700 Subject: [PATCH 1/2] add: watch reconnect Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/internal/k8s/app_client.go | 174 ++++++++++++++++--- app/internal/k8s/app_client_test.go | 249 ++++++++++++++++++++++++++++ 2 files changed, 404 insertions(+), 19 deletions(-) diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index ad3eeca95e..9b018269e6 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -5,7 +5,9 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "math" "strings" + "time" "google.golang.org/protobuf/proto" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -172,51 +174,185 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e return nil } +// watchBackoff controls the reconnect timing for Watch. Declared as vars so +// tests can override them without sleeping. +var ( + watchBackoffInitial = 1 * time.Second + watchBackoffMax = 30 * time.Second + watchBackoffFactor = 2.0 +) + +// watchState holds mutable reconnect state for a single Watch call. +// It is goroutine-local — no mutex needed. +type watchState struct { + // lastResourceVersion is the most recent RV seen from any event or Bookmark. + // Passed to openWatch on reconnect so K8s resumes from exactly where we left off. + lastResourceVersion string + backoff time.Duration + consecutiveErrors int +} + +func (s *watchState) nextBackoff() time.Duration { + d := s.backoff + if d == 0 { + d = watchBackoffInitial + } + s.backoff = time.Duration(math.Min(float64(d)*watchBackoffFactor, float64(watchBackoffMax))) + return d +} + +func (s *watchState) resetBackoff() { + s.backoff = watchBackoffInitial + s.consecutiveErrors = 0 +} + // Watch returns a channel of WatchResponse events for KServices in the given // project/domain scope. If appName is non-empty, only events for that specific -// app are returned. The channel is closed when ctx is cancelled or the -// underlying watch terminates. +// app are returned. The channel is closed only when ctx is cancelled. +// +// The goroutine reconnects transparently when the underlying K8s watch closes +// unexpectedly, tracking resourceVersion to resume without gaps or replays. func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) { ns := appNamespace(project, domain) - labels := map[string]string{labelAppManaged: "true"} if appName != "" { labels[labelAppName] = strings.ToLower(appName) } - watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, + // Open the first watcher eagerly so initial errors (RBAC, missing CRD) are + // returned synchronously before spawning the goroutine. + watcher, err := c.openWatch(ctx, ns, labels, "") + if err != nil { + return nil, err + } + + ch := make(chan *flyteapp.WatchResponse, 64) + go c.watchLoop(ctx, ns, labels, watcher, ch) + return ch, nil +} + +// openWatch starts a K8s watch from resourceVersion (empty = watch from now). +func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[string]string, resourceVersion string) (k8swatch.Interface, error) { + opts := []client.ListOption{ client.InNamespace(ns), client.MatchingLabels(labels), - ) + } + if resourceVersion != "" { + opts = append(opts, &client.ListOptions{ + Raw: &metav1.ListOptions{ + ResourceVersion: resourceVersion, + AllowWatchBookmarks: true, + }, + }) + } + watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, opts...) if err != nil { return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err) } + return watcher, nil +} - ch := make(chan *flyteapp.WatchResponse, 64) - go func() { - defer close(ch) - defer watcher.Stop() - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - return +// watchLoop is the reconnect loop. It drains watcher until it closes, then +// reopens with exponential backoff. Closes ch only when ctx is cancelled. +func (c *AppK8sClient) watchLoop( + ctx context.Context, + ns string, + labels map[string]string, + watcher k8swatch.Interface, + ch chan<- *flyteapp.WatchResponse, +) { + defer close(ch) + defer watcher.Stop() + + state := &watchState{backoff: watchBackoffInitial} + + for { + reconnect := c.drainWatcher(ctx, watcher, ch, state) + if !reconnect { + return // ctx cancelled + } + + watcher.Stop() + state.consecutiveErrors++ + delay := state.nextBackoff() + logger.Warnf(ctx, "KService watch in namespace %s closed unexpectedly (attempt %d); reconnecting in %v", + ns, state.consecutiveErrors, delay) + + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + + newWatcher, err := c.openWatch(ctx, ns, labels, state.lastResourceVersion) + if err != nil { + logger.Errorf(ctx, "Failed to reopen KService watch in namespace %s: %v", ns, err) + // Use an immediately-closed watcher so the loop retries with further backoff. + watcher = k8swatch.NewEmptyWatch() + continue + } + watcher = newWatcher + } +} + +// drainWatcher processes events from watcher until the channel closes or ctx is done. +// Returns true if reconnect is needed, false if ctx was cancelled. +func (c *AppK8sClient) drainWatcher( + ctx context.Context, + watcher k8swatch.Interface, + ch chan<- *flyteapp.WatchResponse, + state *watchState, +) bool { + for { + select { + case <-ctx.Done(): + return false + case event, ok := <-watcher.ResultChan(): + if !ok { + return true + } + + c.updateResourceVersion(event, state) + + switch event.Type { + case k8swatch.Error: + if status, ok := event.Object.(*metav1.Status); ok { + logger.Warnf(ctx, "KService watch received error event (code=%d reason=%s): %s; will reconnect", + status.Code, status.Reason, status.Message) + } else { + logger.Warnf(ctx, "KService watch received error event (type %T); will reconnect", event.Object) } + return true + case k8swatch.Bookmark: + // resourceVersion already updated — nothing to forward. + state.resetBackoff() + default: resp := c.kserviceEventToWatchResponse(ctx, event) if resp == nil { continue } + state.resetBackoff() select { case ch <- resp: case <-ctx.Done(): - return + return false } } } - }() - return ch, nil + } +} + +// updateResourceVersion extracts and stores the latest resourceVersion from a watch event. +// Called before event type dispatch so both normal events and Bookmarks checkpoint the position. +func (c *AppK8sClient) updateResourceVersion(event k8swatch.Event, state *watchState) { + switch event.Type { + case k8swatch.Added, k8swatch.Modified, k8swatch.Deleted, k8swatch.Bookmark: + if ksvc, ok := event.Object.(*servingv1.Service); ok { + if rv := ksvc.GetResourceVersion(); rv != "" { + state.lastResourceVersion = rv + } + } + } } // kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse. diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 05e9f9d7d4..13ee9d0d2e 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -4,6 +4,8 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" + "sync" "testing" "time" @@ -500,3 +502,250 @@ func TestPodDeploymentStatus(t *testing.T) { }) } } + +// --- Watch reconnect tests --- + +// watchCall is a pre-programmed response for one call to multiWatchClient.Watch. +type watchCall struct { + watcher k8swatch.Interface + err error +} + +// multiWatchClient wraps the fake client but intercepts Watch() calls, returning +// pre-programmed watchers in sequence. All other methods delegate to the embedded fake. +type multiWatchClient struct { + client.WithWatch + mu sync.Mutex + calls []watchCall + callIdx int + // capturedRVs records the ResourceVersion passed to each Watch() call (for assertions). + capturedRVs []string +} + +func (m *multiWatchClient) Watch(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (k8swatch.Interface, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Extract ResourceVersion from Raw options if present. + lo := &client.ListOptions{} + for _, o := range opts { + o.ApplyToList(lo) + } + rv := "" + if lo.Raw != nil { + rv = lo.Raw.ResourceVersion + } + m.capturedRVs = append(m.capturedRVs, rv) + + if m.callIdx >= len(m.calls) { + // No more programmed calls — return a watcher that blocks until ctx is cancelled. + return k8swatch.NewFakeWithChanSize(0, false), nil + } + c := m.calls[m.callIdx] + m.callIdx++ + if c.err != nil { + return nil, c.err + } + return c.watcher, nil +} + +// newMultiClient builds an AppK8sClient backed by a multiWatchClient. +func newMultiClient(t *testing.T, calls []watchCall, objs ...client.Object) (*AppK8sClient, *multiWatchClient) { + t.Helper() + s := testScheme(t) + base := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() + mwc := &multiWatchClient{WithWatch: base, calls: calls} + return &AppK8sClient{ + k8sClient: mwc, + cfg: &config.InternalAppConfig{}, + }, mwc +} + +// testKsvc builds a minimal KService that kserviceToApp can parse. +func testKsvc(name, ns, rv string) *servingv1.Service { + return &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + ResourceVersion: rv, + Annotations: map[string]string{annotationAppID: "proj/dev/" + name}, + Labels: map[string]string{labelAppManaged: "true"}, + }, + } +} + +func TestWatch_ReconnectsOnChannelClose(t *testing.T) { + watchBackoffInitial = 0 + t.Cleanup(func() { watchBackoffInitial = 1 * time.Second }) + + w1 := k8swatch.NewFake() + w2 := k8swatch.NewFake() + + c, _ := newMultiClient(t, []watchCall{{watcher: w1}, {watcher: w2}}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ch, err := c.Watch(ctx, "proj", "dev", "") + require.NoError(t, err) + + // Send one event on w1 then close it (simulates K8s timeout/disconnect). + w1.Add(testKsvc("myapp", "proj-dev", "100")) + w1.Stop() + + recv1 := <-ch + require.NotNil(t, recv1.GetCreateEvent(), "expected CreateEvent from first watcher") + + // After reconnect, send an event on w2. + go func() { + time.Sleep(10 * time.Millisecond) + w2.Add(testKsvc("myapp", "proj-dev", "200")) + }() + + recv2 := <-ch + require.NotNil(t, recv2.GetCreateEvent(), "expected CreateEvent from second watcher after reconnect") +} + +func TestWatch_ReconnectsOnErrorEvent(t *testing.T) { + watchBackoffInitial = 0 + t.Cleanup(func() { watchBackoffInitial = 1 * time.Second }) + + w1 := k8swatch.NewFake() + w2 := k8swatch.NewFake() + + c, _ := newMultiClient(t, []watchCall{{watcher: w1}, {watcher: w2}}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ch, err := c.Watch(ctx, "proj", "dev", "") + require.NoError(t, err) + + // Send a K8s Error event — should trigger reconnect. + w1.Error(&metav1.Status{Code: 410, Reason: metav1.StatusReasonExpired, Message: "too old resource version"}) + + go func() { + time.Sleep(10 * time.Millisecond) + w2.Add(testKsvc("myapp", "proj-dev", "300")) + }() + + resp := <-ch + require.NotNil(t, resp.GetCreateEvent(), "expected CreateEvent from second watcher after error-triggered reconnect") +} + +func TestWatch_BookmarkUpdatesResourceVersion(t *testing.T) { + watchBackoffInitial = 0 + t.Cleanup(func() { watchBackoffInitial = 1 * time.Second }) + + w1 := k8swatch.NewFake() + w2 := k8swatch.NewFake() + + c, mwc := newMultiClient(t, []watchCall{{watcher: w1}, {watcher: w2}}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ch, err := c.Watch(ctx, "proj", "dev", "") + require.NoError(t, err) + + // Send a Bookmark with RV=999 then close w1. + w1.Action(k8swatch.Bookmark, &servingv1.Service{ + ObjectMeta: metav1.ObjectMeta{ResourceVersion: "999"}, + }) + w1.Stop() + + // Deliver an event from w2 after reconnect. + go func() { + time.Sleep(10 * time.Millisecond) + w2.Add(testKsvc("myapp", "proj-dev", "1000")) + }() + + resp := <-ch + require.NotNil(t, resp.GetCreateEvent()) + + // The second Watch() call should have been made with ResourceVersion="999". + mwc.mu.Lock() + rvs := append([]string(nil), mwc.capturedRVs...) + mwc.mu.Unlock() + + require.GreaterOrEqual(t, len(rvs), 2, "expected at least 2 Watch calls") + assert.Equal(t, "", rvs[0], "first Watch call should have no resourceVersion") + assert.Equal(t, "999", rvs[1], "second Watch call should use Bookmark resourceVersion") +} + +func TestWatch_ExponentialBackoff(t *testing.T) { + watchBackoffInitial = 10 * time.Millisecond + watchBackoffMax = 80 * time.Millisecond + watchBackoffFactor = 2.0 + t.Cleanup(func() { + watchBackoffInitial = 1 * time.Second + watchBackoffMax = 30 * time.Second + }) + + // Four watchers that each close immediately. + calls := []watchCall{ + {watcher: k8swatch.NewFake()}, + {watcher: k8swatch.NewFake()}, + {watcher: k8swatch.NewFake()}, + {watcher: k8swatch.NewFake()}, + } + for _, wc := range calls { + wc.watcher.(*k8swatch.FakeWatcher).Stop() + } + + c, mwc := newMultiClient(t, calls) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + ch, err := c.Watch(ctx, "proj", "dev", "") + require.NoError(t, err) + + // Wait until at least 4 Watch() calls have been made. + require.Eventually(t, func() bool { + mwc.mu.Lock() + defer mwc.mu.Unlock() + return len(mwc.capturedRVs) >= 4 + }, 2*time.Second, 5*time.Millisecond) + + elapsed := time.Since(start) + // With 10ms+20ms+40ms backoffs before 4th call, minimum elapsed ≈ 70ms. + assert.GreaterOrEqual(t, elapsed, 60*time.Millisecond, "backoff should accumulate across reconnects") + + cancel() + // Drain the channel to let the goroutine exit. + for range ch { + } +} + +func TestWatch_ContextCancelStopsLoop(t *testing.T) { + watchBackoffInitial = 0 + t.Cleanup(func() { watchBackoffInitial = 1 * time.Second }) + + w1 := k8swatch.NewFake() + c, _ := newMultiClient(t, []watchCall{{watcher: w1}}) + + ctx, cancel := context.WithCancel(context.Background()) + ch, err := c.Watch(ctx, "proj", "dev", "") + require.NoError(t, err) + + cancel() + + select { + case _, ok := <-ch: + assert.False(t, ok, "channel should be closed after ctx cancel") + case <-time.After(500 * time.Millisecond): + t.Fatal("channel not closed within 500ms of ctx cancel") + } +} + +func TestWatch_InitialWatchErrorReturnsError(t *testing.T) { + c, _ := newMultiClient(t, []watchCall{ + {watcher: nil, err: fmt.Errorf("RBAC denied")}, + }) + + ch, err := c.Watch(context.Background(), "proj", "dev", "") + require.Error(t, err) + assert.Nil(t, ch) +} From c534e7954b2df4246a0eada536802cd2d000aede Mon Sep 17 00:00:00 2001 From: "M. Adil Fayyaz" <62440954+AdilFayyaz@users.noreply.github.com> Date: Tue, 21 Apr 2026 13:09:08 -0700 Subject: [PATCH 2/2] fix: comments Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com> --- app/internal/k8s/app_client.go | 68 +++++++++++++++++------------ app/internal/k8s/app_client_test.go | 56 ++++++++++++++++++++---- 2 files changed, 87 insertions(+), 37 deletions(-) diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 9b018269e6..474823529e 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -232,18 +232,17 @@ func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName strin } // openWatch starts a K8s watch from resourceVersion (empty = watch from now). +// AllowWatchBookmarks is always set so K8s sends Bookmark events on every session, +// keeping lastResourceVersion current even when no objects change. func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[string]string, resourceVersion string) (k8swatch.Interface, error) { + rawOpts := &metav1.ListOptions{AllowWatchBookmarks: true} + if resourceVersion != "" { + rawOpts.ResourceVersion = resourceVersion + } opts := []client.ListOption{ client.InNamespace(ns), client.MatchingLabels(labels), - } - if resourceVersion != "" { - opts = append(opts, &client.ListOptions{ - Raw: &metav1.ListOptions{ - ResourceVersion: resourceVersion, - AllowWatchBookmarks: true, - }, - }) + &client.ListOptions{Raw: rawOpts}, } watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, opts...) if err != nil { @@ -253,7 +252,9 @@ func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[stri } // watchLoop is the reconnect loop. It drains watcher until it closes, then -// reopens with exponential backoff. Closes ch only when ctx is cancelled. +// reopens. Exponential backoff is applied only on K8s Error events; normal +// watch timeouts (clean channel close) reconnect immediately. Closes ch only +// when ctx is cancelled. func (c *AppK8sClient) watchLoop( ctx context.Context, ns string, @@ -267,21 +268,27 @@ func (c *AppK8sClient) watchLoop( state := &watchState{backoff: watchBackoffInitial} for { - reconnect := c.drainWatcher(ctx, watcher, ch, state) + reconnect, isError := c.drainWatcher(ctx, watcher, ch, state) if !reconnect { return // ctx cancelled } watcher.Stop() - state.consecutiveErrors++ - delay := state.nextBackoff() - logger.Warnf(ctx, "KService watch in namespace %s closed unexpectedly (attempt %d); reconnecting in %v", - ns, state.consecutiveErrors, delay) - select { - case <-ctx.Done(): - return - case <-time.After(delay): + if isError { + state.consecutiveErrors++ + delay := state.nextBackoff() + logger.Warnf(ctx, "KService watch in namespace %s closed with error (attempt %d); reconnecting in %v", + ns, state.consecutiveErrors, delay) + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + } else { + // Normal K8s watch timeout — reconnect immediately, no backoff. + state.resetBackoff() + logger.Debugf(ctx, "KService watch in namespace %s timed out naturally; reconnecting", ns) } newWatcher, err := c.openWatch(ctx, ns, labels, state.lastResourceVersion) @@ -296,24 +303,23 @@ func (c *AppK8sClient) watchLoop( } // drainWatcher processes events from watcher until the channel closes or ctx is done. -// Returns true if reconnect is needed, false if ctx was cancelled. +// Returns (reconnect, isError): reconnect=false means ctx was cancelled (stop the loop); +// isError=true means a K8s Error event triggered the close and backoff should be applied. func (c *AppK8sClient) drainWatcher( ctx context.Context, watcher k8swatch.Interface, ch chan<- *flyteapp.WatchResponse, state *watchState, -) bool { +) (bool, bool) { for { select { case <-ctx.Done(): - return false + return false, false case event, ok := <-watcher.ResultChan(): if !ok { - return true + return true, false // normal K8s watch timeout } - c.updateResourceVersion(event, state) - switch event.Type { case k8swatch.Error: if status, ok := event.Object.(*metav1.Status); ok { @@ -322,20 +328,24 @@ func (c *AppK8sClient) drainWatcher( } else { logger.Warnf(ctx, "KService watch received error event (type %T); will reconnect", event.Object) } - return true + return true, true // error — apply backoff on reconnect case k8swatch.Bookmark: - // resourceVersion already updated — nothing to forward. + // Update RV immediately — there is no delivery to confirm. + c.updateResourceVersion(event, state) state.resetBackoff() default: resp := c.kserviceEventToWatchResponse(ctx, event) if resp == nil { continue } - state.resetBackoff() select { case ch <- resp: + // Advance RV only after confirmed delivery so a failed send + // doesn't silently skip the event on the next reconnect. + c.updateResourceVersion(event, state) + state.resetBackoff() case <-ctx.Done(): - return false + return false, false } } } @@ -343,7 +353,7 @@ func (c *AppK8sClient) drainWatcher( } // updateResourceVersion extracts and stores the latest resourceVersion from a watch event. -// Called before event type dispatch so both normal events and Bookmarks checkpoint the position. +// For Bookmark events it is called immediately; for data events only after successful delivery. func (c *AppK8sClient) updateResourceVersion(event k8swatch.Event, state *watchState) { switch event.Type { case k8swatch.Added, k8swatch.Modified, k8swatch.Deleted, k8swatch.Bookmark: diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 13ee9d0d2e..cb04428a29 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -682,12 +682,54 @@ func TestWatch_ExponentialBackoff(t *testing.T) { watchBackoffMax = 30 * time.Second }) - // Four watchers that each close immediately. + // Four watchers that each emit an Error event — only Error events trigger backoff. + // NewFakeWithChanSize(1,...) gives a buffer of 1 so pre-sends don't block before + // the consumer goroutine starts (NewFake() is unbuffered). + calls := make([]watchCall, 4) + for i := range calls { + w := k8swatch.NewFakeWithChanSize(1, false) + calls[i] = watchCall{watcher: w} + w.Error(&metav1.Status{Code: 410, Reason: metav1.StatusReasonExpired, Message: "resource version too old"}) + } + + c, mwc := newMultiClient(t, calls) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + ch, err := c.Watch(ctx, "proj", "dev", "") + require.NoError(t, err) + + // Wait until at least 4 Watch() calls have been made. + require.Eventually(t, func() bool { + mwc.mu.Lock() + defer mwc.mu.Unlock() + return len(mwc.capturedRVs) >= 4 + }, 2*time.Second, 5*time.Millisecond) + + elapsed := time.Since(start) + // With 10ms+20ms+40ms backoffs before 4th call, minimum elapsed ≈ 70ms. + assert.GreaterOrEqual(t, elapsed, 60*time.Millisecond, "backoff should accumulate across error reconnects") + + cancel() + for range ch { + } +} + +func TestWatch_CleanCloseNoBackoff(t *testing.T) { + watchBackoffInitial = 50 * time.Millisecond + watchBackoffMax = 200 * time.Millisecond + t.Cleanup(func() { + watchBackoffInitial = 1 * time.Second + watchBackoffMax = 30 * time.Second + }) + + // Three watchers that close immediately (clean channel close, no Error event). calls := []watchCall{ {watcher: k8swatch.NewFake()}, {watcher: k8swatch.NewFake()}, {watcher: k8swatch.NewFake()}, - {watcher: k8swatch.NewFake()}, } for _, wc := range calls { wc.watcher.(*k8swatch.FakeWatcher).Stop() @@ -702,19 +744,17 @@ func TestWatch_ExponentialBackoff(t *testing.T) { ch, err := c.Watch(ctx, "proj", "dev", "") require.NoError(t, err) - // Wait until at least 4 Watch() calls have been made. require.Eventually(t, func() bool { mwc.mu.Lock() defer mwc.mu.Unlock() - return len(mwc.capturedRVs) >= 4 - }, 2*time.Second, 5*time.Millisecond) + return len(mwc.capturedRVs) >= 3 + }, 500*time.Millisecond, 5*time.Millisecond) elapsed := time.Since(start) - // With 10ms+20ms+40ms backoffs before 4th call, minimum elapsed ≈ 70ms. - assert.GreaterOrEqual(t, elapsed, 60*time.Millisecond, "backoff should accumulate across reconnects") + // Clean closes must not apply backoff — all 3 reconnects should be nearly instant. + assert.Less(t, elapsed, 30*time.Millisecond, "clean closes should not apply backoff delay") cancel() - // Drain the channel to let the goroutine exit. for range ch { } }