From a491504dfa7fa474af57dd4992b1bc47ec83bf4d Mon Sep 17 00:00:00 2001 From: Miguel Martinez Date: Thu, 16 Apr 2026 22:33:46 +0200 Subject: [PATCH 1/5] fix(cache): make NATS KV bucket init idempotent and fail-open The control plane panicked on startup when UpdateStream on the KV backing stream hit the NATS default request timeout. initBucket now skips UpdateStream when Discard is already DiscardOld and logs a warning instead of returning an error when the JetStream metadata calls fail, matching the fail-open pattern used by the cache's runtime operations. Signed-off-by: Miguel Martinez --- pkg/cache/natskv.go | 49 +++++++++++++++++++---------- pkg/cache/natskv_test.go | 67 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 16 deletions(-) diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index a0e8fd163..2ae913a3b 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -22,11 +22,14 @@ import ( "errors" "fmt" "sync" + "time" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) +const streamUpdateTimeout = 5 * time.Second + type natsKVCache[T any] struct { mu sync.RWMutex kv jetstream.KeyValue @@ -74,28 +77,42 @@ func (c *natsKVCache[T]) initBucket() error { return err } - // NATS KV hardcodes DiscardNew on the backing stream, which rejects writes - // when MaxBytes is reached. For cache use-cases we want DiscardOld so that - // the oldest entries are evicted automatically to make room for new ones. - if c.cfg.maxBytes > 0 { - streamName := fmt.Sprintf("KV_%s", c.bucket) - stream, err := js.Stream(context.Background(), streamName) - if err != nil { - return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err) - } - cfg := stream.CachedInfo().Config - cfg.Discard = jetstream.DiscardOld - if _, err := js.UpdateStream(context.Background(), cfg); err != nil { - return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err) - } - } - c.mu.Lock() c.kv = kv c.mu.Unlock() + + // NATS KV hardcodes DiscardNew; we want DiscardOld so the cache evicts + // oldest entries at MaxBytes. Fail-open to avoid crashing on NATS slowness. + if c.cfg.maxBytes > 0 { + c.ensureDiscardOld(js) + } + return nil } +func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) { + streamName := fmt.Sprintf("KV_%s", c.bucket) + + ctx, cancel := context.WithTimeout(context.Background(), streamUpdateTimeout) + defer cancel() + + stream, err := js.Stream(ctx, streamName) + if err != nil { + c.logger.Warnw("msg", "cache: failed to get backing stream, skipping discard policy update", "stream", streamName, "error", err) + return + } + + cfg := stream.CachedInfo().Config + if cfg.Discard == jetstream.DiscardOld { + return + } + cfg.Discard = jetstream.DiscardOld + + if _, err := js.UpdateStream(ctx, cfg); err != nil { + c.logger.Warnw("msg", "cache: failed to set DiscardOld on stream, continuing without it", "stream", streamName, "error", err) + } +} + func (c *natsKVCache[T]) watchReconnect(ch <-chan struct{}) { for range ch { c.logger.Infow("msg", "cache: NATS reconnected, reinitializing bucket", "bucket", c.bucket) diff --git a/pkg/cache/natskv_test.go b/pkg/cache/natskv_test.go index 6c3f4edcb..349a97087 100644 --- a/pkg/cache/natskv_test.go +++ b/pkg/cache/natskv_test.go @@ -251,6 +251,73 @@ func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) { assert.False(t, ok, "oldest entry should have been evicted") } +func TestNATSKV_InitBucketIsIdempotent(t *testing.T) { + // Simulates a pod restart: first New() sets DiscardOld on the backing + // stream, second New() against the same NATS sees it's already correct + // and must not issue another UpdateStream. + nc := startEmbeddedNATS(t) + bucket := "test-idempotent" + + c1, err := New[[]byte]( + WithTTL(time.Minute), + WithNATS(nc, bucket), + WithMaxBytes(10*1024), + ) + require.NoError(t, err) + _ = c1 + + js, err := jetstream.New(nc) + require.NoError(t, err) + streamName := "KV_" + bucket + stream, err := js.Stream(context.Background(), streamName) + require.NoError(t, err) + info, err := stream.Info(context.Background()) + require.NoError(t, err) + firstUpdate := info.Created + firstDiscard := info.Config.Discard + require.Equal(t, jetstream.DiscardOld, firstDiscard, "first init must set DiscardOld") + + // Second init against the same bucket must be a no-op for the update path. + c2, err := New[[]byte]( + WithTTL(time.Minute), + WithNATS(nc, bucket), + WithMaxBytes(10*1024), + ) + require.NoError(t, err) + _ = c2 + + info2, err := stream.Info(context.Background()) + require.NoError(t, err) + assert.Equal(t, firstUpdate, info2.Created, "stream should not have been recreated") + assert.Equal(t, jetstream.DiscardOld, info2.Config.Discard) +} + +func TestNATSKV_InitBucketFailsOpenOnStreamUpdateError(t *testing.T) { + // If the JetStream metadata calls time out or fail, init must not crash + // the process — the runtime Get/Set already fail-open, so the cache + // simply degrades. We exercise this by cancelling the context in the + // middle of ensureDiscardOld via a NATS connection that's already closed. + nc := startEmbeddedNATS(t) + bucket := "test-failopen" + + c, err := New[[]byte]( + WithTTL(time.Minute), + WithNATS(nc, bucket), + WithMaxBytes(10*1024), + ) + require.NoError(t, err) + + // Now close the NATS connection and call ensureDiscardOld directly — + // it must return without panicking or returning an error. + nc.Close() + nkv := c.(*natsKVCache[[]byte]) + js, err := jetstream.New(nkv.conn) + require.NoError(t, err) + require.NotPanics(t, func() { + nkv.ensureDiscardOld(js) + }) +} + func TestNATSKV_WithReplicas(t *testing.T) { nc := startEmbeddedNATS(t) From 81033f96335ff821a654a95177bc43f27693a10d Mon Sep 17 00:00:00 2001 From: Miguel Martinez Date: Thu, 16 Apr 2026 22:49:32 +0200 Subject: [PATCH 2/5] fix(cache): retry bucket init on transient NATS disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NATS auto-reconnects (MaxReconnects=-1, ReconnectWait=2s), but the initial cache bucket init had no retry path — a drop mid-init would surface as an immediate error. Wrap initBucket in a bounded retry (30s total, 2s period) that only retries connection-level errors so configuration failures still fail fast. Also skip UpdateStream when the backing stream already has DiscardOld, avoiding redundant metadata calls on every boot, and replace the panic(err) in main.go with a logged exit so a genuinely unavailable NATS fails startup cleanly instead of dumping a stack trace. Signed-off-by: Miguel Martinez --- app/controlplane/cmd/main.go | 7 +++- pkg/cache/natskv.go | 79 ++++++++++++++++++++++++++++++------ pkg/cache/natskv_test.go | 52 ++++++++++++++---------- 3 files changed, 103 insertions(+), 35 deletions(-) diff --git a/app/controlplane/cmd/main.go b/app/controlplane/cmd/main.go index 5fb2805ae..14c63be4c 100644 --- a/app/controlplane/cmd/main.go +++ b/app/controlplane/cmd/main.go @@ -146,7 +146,12 @@ func main() { app, cleanup, err := wireApp(ctx, &bc, credsWriter, logger, availablePlugins) if err != nil { - panic(err) + _ = logger.Log(log.LevelError, "msg", "failed to initialize control plane", "error", err.Error()) + // Invoke critical deferred cleanups explicitly since os.Exit skips defers. + cancel() + availablePlugins.Cleanup() + flush() + os.Exit(1) //nolint:gocritic // deferred cleanups called explicitly above } defer cleanup() diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index 2ae913a3b..57ee47fa7 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -28,7 +28,15 @@ import ( "github.com/nats-io/nats.go/jetstream" ) -const streamUpdateTimeout = 5 * time.Second +const ( + streamUpdateTimeout = 5 * time.Second + // initMaxWait bounds the total time we'll retry bucket init on the initial + // boot path. NATS auto-reconnects every ReconnectWait (2s), so this gives + // the client several reconnect cycles to heal a transient drop before we + // give up and refuse to start. + initMaxWait = 30 * time.Second + initRetryPeriod = 2 * time.Second +) type natsKVCache[T any] struct { mu sync.RWMutex @@ -47,7 +55,7 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) { cfg: cfg, } - if err := c.initBucket(); err != nil { + if err := c.initBucketWithRetry(initMaxWait, initRetryPeriod); err != nil { return nil, err } @@ -59,6 +67,49 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) { return c, nil } +// initBucketWithRetry runs initBucket with a bounded retry loop so transient +// NATS disconnects (handled by the client's background auto-reconnect) don't +// cause the whole service to refuse startup on a momentary blip. Retries only +// apply to connectivity errors; configuration errors fail fast. +func (c *natsKVCache[T]) initBucketWithRetry(maxWait, period time.Duration) error { + deadline := time.Now().Add(maxWait) + var lastErr error + for attempt := 1; ; attempt++ { + lastErr = c.initBucket() + if lastErr == nil { + return nil + } + if !isRetryableInitError(lastErr) { + return lastErr + } + if time.Now().After(deadline) { + return fmt.Errorf("cache: bucket %q init failed after %s (%d attempts): %w", c.bucket, maxWait, attempt, lastErr) + } + c.logger.Warnw("msg", "cache: bucket init failed, retrying", "bucket", c.bucket, "attempt", attempt, "error", lastErr) + time.Sleep(period) + } +} + +// isRetryableInitError reports whether err looks like a transient NATS +// connectivity issue worth retrying. Config errors (e.g. unsupported replica +// count) fail fast so misconfigured deployments don't spin for the full budget. +func isRetryableInitError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + if errors.Is(err, nats.ErrConnectionClosed) || + errors.Is(err, nats.ErrConnectionDraining) || + errors.Is(err, nats.ErrNoServers) || + errors.Is(err, nats.ErrTimeout) || + errors.Is(err, nats.ErrDisconnected) { + return true + } + return false +} + func (c *natsKVCache[T]) initBucket() error { js, err := jetstream.New(c.conn) if err != nil { @@ -77,20 +128,22 @@ func (c *natsKVCache[T]) initBucket() error { return err } - c.mu.Lock() - c.kv = kv - c.mu.Unlock() - // NATS KV hardcodes DiscardNew; we want DiscardOld so the cache evicts - // oldest entries at MaxBytes. Fail-open to avoid crashing on NATS slowness. + // oldest entries when MaxBytes is reached. if c.cfg.maxBytes > 0 { - c.ensureDiscardOld(js) + if err := c.ensureDiscardOld(js); err != nil { + return err + } } + c.mu.Lock() + c.kv = kv + c.mu.Unlock() + return nil } -func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) { +func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) error { streamName := fmt.Sprintf("KV_%s", c.bucket) ctx, cancel := context.WithTimeout(context.Background(), streamUpdateTimeout) @@ -98,19 +151,19 @@ func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) { stream, err := js.Stream(ctx, streamName) if err != nil { - c.logger.Warnw("msg", "cache: failed to get backing stream, skipping discard policy update", "stream", streamName, "error", err) - return + return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err) } cfg := stream.CachedInfo().Config if cfg.Discard == jetstream.DiscardOld { - return + return nil } cfg.Discard = jetstream.DiscardOld if _, err := js.UpdateStream(ctx, cfg); err != nil { - c.logger.Warnw("msg", "cache: failed to set DiscardOld on stream, continuing without it", "stream", streamName, "error", err) + return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err) } + return nil } func (c *natsKVCache[T]) watchReconnect(ch <-chan struct{}) { diff --git a/pkg/cache/natskv_test.go b/pkg/cache/natskv_test.go index 349a97087..f912fbf65 100644 --- a/pkg/cache/natskv_test.go +++ b/pkg/cache/natskv_test.go @@ -59,6 +59,13 @@ type testStruct struct { Value int `json:"value"` } +type testLogger struct{} + +func (testLogger) Debugw(...any) {} +func (testLogger) Warnw(...any) {} +func (testLogger) Infow(...any) {} +func (testLogger) Errorw(...any) {} + func TestNATSKV_GetSetDelete(t *testing.T) { nc := startEmbeddedNATS(t) @@ -292,30 +299,33 @@ func TestNATSKV_InitBucketIsIdempotent(t *testing.T) { assert.Equal(t, jetstream.DiscardOld, info2.Config.Discard) } -func TestNATSKV_InitBucketFailsOpenOnStreamUpdateError(t *testing.T) { - // If the JetStream metadata calls time out or fail, init must not crash - // the process — the runtime Get/Set already fail-open, so the cache - // simply degrades. We exercise this by cancelling the context in the - // middle of ensureDiscardOld via a NATS connection that's already closed. +func TestNATSKV_InitBucketRetriesOnTransientError(t *testing.T) { + // Verify the retry wrapper gives up cleanly (returns an error, no panic) + // when the NATS connection is unusable for the full budget. A closed + // connection is a deterministic way to make every attempt fail. nc := startEmbeddedNATS(t) - bucket := "test-failopen" + nc.Close() - c, err := New[[]byte]( - WithTTL(time.Minute), - WithNATS(nc, bucket), - WithMaxBytes(10*1024), - ) - require.NoError(t, err) + c := &natsKVCache[[]byte]{ + logger: testLogger{}, + conn: nc, + bucket: "test-retry-exhausted", + cfg: &config{ + logger: testLogger{}, + bucketName: "test-retry-exhausted", + ttl: time.Minute, + maxBytes: 10 * 1024, + replicas: 1, + }, + } - // Now close the NATS connection and call ensureDiscardOld directly — - // it must return without panicking or returning an error. - nc.Close() - nkv := c.(*natsKVCache[[]byte]) - js, err := jetstream.New(nkv.conn) - require.NoError(t, err) - require.NotPanics(t, func() { - nkv.ensureDiscardOld(js) - }) + start := time.Now() + err := c.initBucketWithRetry(200*time.Millisecond, 50*time.Millisecond) + elapsed := time.Since(start) + + require.Error(t, err) + assert.GreaterOrEqual(t, elapsed, 200*time.Millisecond, "should have retried for at least the budget") + assert.Less(t, elapsed, 2*time.Second, "should not have hung beyond the budget") } func TestNATSKV_WithReplicas(t *testing.T) { From 3f9a0230472c122350db6e420698a501bf5375bd Mon Sep 17 00:00:00 2001 From: Miguel Martinez Date: Thu, 16 Apr 2026 23:06:42 +0200 Subject: [PATCH 3/5] fix(cache): retry bucket re-init on NATS reconnect watchReconnect previously ran a single initBucket attempt after each reconnect signal. A reconnect that races with NATS leader election or cluster settle would leave the bucket config stale until the next disconnect fired another signal. Reuse the bounded retry loop so reconnect handling survives an unsettled restart window. Signed-off-by: Miguel Martinez --- pkg/cache/natskv.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index 57ee47fa7..197e4b470 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -169,7 +169,10 @@ func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) error { func (c *natsKVCache[T]) watchReconnect(ch <-chan struct{}) { for range ch { c.logger.Infow("msg", "cache: NATS reconnected, reinitializing bucket", "bucket", c.bucket) - if err := c.initBucket(); err != nil { + // Share the initial-boot retry budget: a reconnect may race with + // NATS leader election / cluster settle, so one shot isn't enough. + // Runtime ops already fail-open, so this is best-effort. + if err := c.initBucketWithRetry(initMaxWait, initRetryPeriod); err != nil { c.logger.Warnw("msg", "cache: failed to reinitialize bucket after reconnect", "bucket", c.bucket, "error", err) } } From 048cdc20ef2481c49950f794912c8479fdacede9 Mon Sep 17 00:00:00 2001 From: Miguel Martinez Date: Thu, 16 Apr 2026 23:41:02 +0200 Subject: [PATCH 4/5] refactor(cache): tighten retry helper after review Reuse nopLogger from pkg/cache instead of duplicating a test stub, drop the redundant lastErr local in the retry loop, and trim the initMaxWait comment to its intent. Signed-off-by: Miguel Martinez --- pkg/cache/natskv.go | 19 ++++++++----------- pkg/cache/natskv_test.go | 11 ++--------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index 197e4b470..4d4c33d6f 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -30,10 +30,8 @@ import ( const ( streamUpdateTimeout = 5 * time.Second - // initMaxWait bounds the total time we'll retry bucket init on the initial - // boot path. NATS auto-reconnects every ReconnectWait (2s), so this gives - // the client several reconnect cycles to heal a transient drop before we - // give up and refuse to start. + // initMaxWait covers several nats.ReconnectWait (2s) cycles so a transient + // drop during boot can heal before we give up. initMaxWait = 30 * time.Second initRetryPeriod = 2 * time.Second ) @@ -73,19 +71,18 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) { // apply to connectivity errors; configuration errors fail fast. func (c *natsKVCache[T]) initBucketWithRetry(maxWait, period time.Duration) error { deadline := time.Now().Add(maxWait) - var lastErr error for attempt := 1; ; attempt++ { - lastErr = c.initBucket() - if lastErr == nil { + err := c.initBucket() + if err == nil { return nil } - if !isRetryableInitError(lastErr) { - return lastErr + if !isRetryableInitError(err) { + return err } if time.Now().After(deadline) { - return fmt.Errorf("cache: bucket %q init failed after %s (%d attempts): %w", c.bucket, maxWait, attempt, lastErr) + return fmt.Errorf("cache: bucket %q init failed after %s (%d attempts): %w", c.bucket, maxWait, attempt, err) } - c.logger.Warnw("msg", "cache: bucket init failed, retrying", "bucket", c.bucket, "attempt", attempt, "error", lastErr) + c.logger.Warnw("msg", "cache: bucket init failed, retrying", "bucket", c.bucket, "attempt", attempt, "error", err) time.Sleep(period) } } diff --git a/pkg/cache/natskv_test.go b/pkg/cache/natskv_test.go index f912fbf65..d010e4f84 100644 --- a/pkg/cache/natskv_test.go +++ b/pkg/cache/natskv_test.go @@ -59,13 +59,6 @@ type testStruct struct { Value int `json:"value"` } -type testLogger struct{} - -func (testLogger) Debugw(...any) {} -func (testLogger) Warnw(...any) {} -func (testLogger) Infow(...any) {} -func (testLogger) Errorw(...any) {} - func TestNATSKV_GetSetDelete(t *testing.T) { nc := startEmbeddedNATS(t) @@ -307,11 +300,11 @@ func TestNATSKV_InitBucketRetriesOnTransientError(t *testing.T) { nc.Close() c := &natsKVCache[[]byte]{ - logger: testLogger{}, + logger: nopLogger{}, conn: nc, bucket: "test-retry-exhausted", cfg: &config{ - logger: testLogger{}, + logger: nopLogger{}, bucketName: "test-retry-exhausted", ttl: time.Minute, maxBytes: 10 * 1024, From baaf396973fe0d3122fa2312c39c52415480f6a3 Mon Sep 17 00:00:00 2001 From: Miguel Martinez Date: Thu, 16 Apr 2026 23:43:43 +0200 Subject: [PATCH 5/5] test(cache): assert skip branch via NATS request count Previous idempotency test checked info.Created across inits, but Created is creation-time metadata and never changes on UpdateStream, so the assertion was a no-op. Exercise ensureDiscardOld directly on both branches: flip the backing stream to DiscardNew, verify the update branch restores it, then measure nc.Stats().OutMsgs across a second call to confirm UpdateStream is not issued when Discard is already correct. Signed-off-by: Miguel Martinez --- pkg/cache/natskv_test.go | 48 ++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/pkg/cache/natskv_test.go b/pkg/cache/natskv_test.go index d010e4f84..014cc5e07 100644 --- a/pkg/cache/natskv_test.go +++ b/pkg/cache/natskv_test.go @@ -251,45 +251,51 @@ func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) { assert.False(t, ok, "oldest entry should have been evicted") } -func TestNATSKV_InitBucketIsIdempotent(t *testing.T) { - // Simulates a pod restart: first New() sets DiscardOld on the backing - // stream, second New() against the same NATS sees it's already correct - // and must not issue another UpdateStream. +func TestNATSKV_EnsureDiscardOldSkipsWhenAlreadySet(t *testing.T) { + // Exercise both branches of ensureDiscardOld directly: when the stream's + // Discard policy already matches, UpdateStream must not be called. nc := startEmbeddedNATS(t) bucket := "test-idempotent" - c1, err := New[[]byte]( + // First New creates the bucket and sets DiscardOld. + _, err := New[[]byte]( WithTTL(time.Minute), WithNATS(nc, bucket), WithMaxBytes(10*1024), ) require.NoError(t, err) - _ = c1 + // Flip the backing stream back to DiscardNew so we can observe the update branch. js, err := jetstream.New(nc) require.NoError(t, err) streamName := "KV_" + bucket stream, err := js.Stream(context.Background(), streamName) require.NoError(t, err) - info, err := stream.Info(context.Background()) + cfg := stream.CachedInfo().Config + cfg.Discard = jetstream.DiscardNew + _, err = js.UpdateStream(context.Background(), cfg) require.NoError(t, err) - firstUpdate := info.Created - firstDiscard := info.Config.Discard - require.Equal(t, jetstream.DiscardOld, firstDiscard, "first init must set DiscardOld") - // Second init against the same bucket must be a no-op for the update path. - c2, err := New[[]byte]( - WithTTL(time.Minute), - WithNATS(nc, bucket), - WithMaxBytes(10*1024), - ) - require.NoError(t, err) - _ = c2 + c := &natsKVCache[[]byte]{ + logger: nopLogger{}, + conn: nc, + bucket: bucket, + cfg: &config{logger: nopLogger{}, bucketName: bucket, maxBytes: 10 * 1024}, + } - info2, err := stream.Info(context.Background()) + // Update branch: must flip Discard back to DiscardOld. + require.NoError(t, c.ensureDiscardOld(js)) + stream, err = js.Stream(context.Background(), streamName) require.NoError(t, err) - assert.Equal(t, firstUpdate, info2.Created, "stream should not have been recreated") - assert.Equal(t, jetstream.DiscardOld, info2.Config.Discard) + require.Equal(t, jetstream.DiscardOld, stream.CachedInfo().Config.Discard) + + // Skip branch: with DiscardOld already set, ensureDiscardOld must not + // issue an UpdateStream call. Measure outbound NATS request count across + // a call — one Stream() lookup, zero UpdateStream() calls => 1 request. + before := nc.Stats().OutMsgs + require.NoError(t, c.ensureDiscardOld(js)) + delta := nc.Stats().OutMsgs - before + assert.LessOrEqual(t, delta, uint64(1), "skip path must not issue an UpdateStream request") } func TestNATSKV_InitBucketRetriesOnTransientError(t *testing.T) {