diff --git a/app/controlplane/cmd/main.go b/app/controlplane/cmd/main.go index 5fb2805ae..14c63be4c 100644 --- a/app/controlplane/cmd/main.go +++ b/app/controlplane/cmd/main.go @@ -146,7 +146,12 @@ func main() { app, cleanup, err := wireApp(ctx, &bc, credsWriter, logger, availablePlugins) if err != nil { - panic(err) + _ = logger.Log(log.LevelError, "msg", "failed to initialize control plane", "error", err.Error()) + // Invoke critical deferred cleanups explicitly since os.Exit skips defers. + cancel() + availablePlugins.Cleanup() + flush() + os.Exit(1) //nolint:gocritic // deferred cleanups called explicitly above } defer cleanup() diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index a0e8fd163..4d4c33d6f 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -22,11 +22,20 @@ import ( "errors" "fmt" "sync" + "time" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) +const ( + streamUpdateTimeout = 5 * time.Second + // initMaxWait covers several nats.ReconnectWait (2s) cycles so a transient + // drop during boot can heal before we give up. + initMaxWait = 30 * time.Second + initRetryPeriod = 2 * time.Second +) + type natsKVCache[T any] struct { mu sync.RWMutex kv jetstream.KeyValue @@ -44,7 +53,7 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) { cfg: cfg, } - if err := c.initBucket(); err != nil { + if err := c.initBucketWithRetry(initMaxWait, initRetryPeriod); err != nil { return nil, err } @@ -56,6 +65,48 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) { return c, nil } +// initBucketWithRetry runs initBucket with a bounded retry loop so transient +// NATS disconnects (handled by the client's background auto-reconnect) don't +// cause the whole service to refuse startup on a momentary blip. Retries only +// apply to connectivity errors; configuration errors fail fast. +func (c *natsKVCache[T]) initBucketWithRetry(maxWait, period time.Duration) error { + deadline := time.Now().Add(maxWait) + for attempt := 1; ; attempt++ { + err := c.initBucket() + if err == nil { + return nil + } + if !isRetryableInitError(err) { + return err + } + if time.Now().After(deadline) { + return fmt.Errorf("cache: bucket %q init failed after %s (%d attempts): %w", c.bucket, maxWait, attempt, err) + } + c.logger.Warnw("msg", "cache: bucket init failed, retrying", "bucket", c.bucket, "attempt", attempt, "error", err) + time.Sleep(period) + } +} + +// isRetryableInitError reports whether err looks like a transient NATS +// connectivity issue worth retrying. Config errors (e.g. unsupported replica +// count) fail fast so misconfigured deployments don't spin for the full budget. +func isRetryableInitError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + if errors.Is(err, nats.ErrConnectionClosed) || + errors.Is(err, nats.ErrConnectionDraining) || + errors.Is(err, nats.ErrNoServers) || + errors.Is(err, nats.ErrTimeout) || + errors.Is(err, nats.ErrDisconnected) { + return true + } + return false +} + func (c *natsKVCache[T]) initBucket() error { js, err := jetstream.New(c.conn) if err != nil { @@ -74,32 +125,51 @@ func (c *natsKVCache[T]) initBucket() error { return err } - // NATS KV hardcodes DiscardNew on the backing stream, which rejects writes - // when MaxBytes is reached. For cache use-cases we want DiscardOld so that - // the oldest entries are evicted automatically to make room for new ones. + // NATS KV hardcodes DiscardNew; we want DiscardOld so the cache evicts + // oldest entries when MaxBytes is reached. if c.cfg.maxBytes > 0 { - streamName := fmt.Sprintf("KV_%s", c.bucket) - stream, err := js.Stream(context.Background(), streamName) - if err != nil { - return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err) - } - cfg := stream.CachedInfo().Config - cfg.Discard = jetstream.DiscardOld - if _, err := js.UpdateStream(context.Background(), cfg); err != nil { - return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err) + if err := c.ensureDiscardOld(js); err != nil { + return err } } c.mu.Lock() c.kv = kv c.mu.Unlock() + + return nil +} + +func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) error { + streamName := fmt.Sprintf("KV_%s", c.bucket) + + ctx, cancel := context.WithTimeout(context.Background(), streamUpdateTimeout) + defer cancel() + + stream, err := js.Stream(ctx, streamName) + if err != nil { + return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err) + } + + cfg := stream.CachedInfo().Config + if cfg.Discard == jetstream.DiscardOld { + return nil + } + cfg.Discard = jetstream.DiscardOld + + if _, err := js.UpdateStream(ctx, cfg); err != nil { + return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err) + } return nil } func (c *natsKVCache[T]) watchReconnect(ch <-chan struct{}) { for range ch { c.logger.Infow("msg", "cache: NATS reconnected, reinitializing bucket", "bucket", c.bucket) - if err := c.initBucket(); err != nil { + // Share the initial-boot retry budget: a reconnect may race with + // NATS leader election / cluster settle, so one shot isn't enough. + // Runtime ops already fail-open, so this is best-effort. + if err := c.initBucketWithRetry(initMaxWait, initRetryPeriod); err != nil { c.logger.Warnw("msg", "cache: failed to reinitialize bucket after reconnect", "bucket", c.bucket, "error", err) } } diff --git a/pkg/cache/natskv_test.go b/pkg/cache/natskv_test.go index 6c3f4edcb..014cc5e07 100644 --- a/pkg/cache/natskv_test.go +++ b/pkg/cache/natskv_test.go @@ -251,6 +251,82 @@ func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) { assert.False(t, ok, "oldest entry should have been evicted") } +func TestNATSKV_EnsureDiscardOldSkipsWhenAlreadySet(t *testing.T) { + // Exercise both branches of ensureDiscardOld directly: when the stream's + // Discard policy already matches, UpdateStream must not be called. + nc := startEmbeddedNATS(t) + bucket := "test-idempotent" + + // First New creates the bucket and sets DiscardOld. + _, err := New[[]byte]( + WithTTL(time.Minute), + WithNATS(nc, bucket), + WithMaxBytes(10*1024), + ) + require.NoError(t, err) + + // Flip the backing stream back to DiscardNew so we can observe the update branch. + js, err := jetstream.New(nc) + require.NoError(t, err) + streamName := "KV_" + bucket + stream, err := js.Stream(context.Background(), streamName) + require.NoError(t, err) + cfg := stream.CachedInfo().Config + cfg.Discard = jetstream.DiscardNew + _, err = js.UpdateStream(context.Background(), cfg) + require.NoError(t, err) + + c := &natsKVCache[[]byte]{ + logger: nopLogger{}, + conn: nc, + bucket: bucket, + cfg: &config{logger: nopLogger{}, bucketName: bucket, maxBytes: 10 * 1024}, + } + + // Update branch: must flip Discard back to DiscardOld. + require.NoError(t, c.ensureDiscardOld(js)) + stream, err = js.Stream(context.Background(), streamName) + require.NoError(t, err) + require.Equal(t, jetstream.DiscardOld, stream.CachedInfo().Config.Discard) + + // Skip branch: with DiscardOld already set, ensureDiscardOld must not + // issue an UpdateStream call. Measure outbound NATS request count across + // a call — one Stream() lookup, zero UpdateStream() calls => 1 request. + before := nc.Stats().OutMsgs + require.NoError(t, c.ensureDiscardOld(js)) + delta := nc.Stats().OutMsgs - before + assert.LessOrEqual(t, delta, uint64(1), "skip path must not issue an UpdateStream request") +} + +func TestNATSKV_InitBucketRetriesOnTransientError(t *testing.T) { + // Verify the retry wrapper gives up cleanly (returns an error, no panic) + // when the NATS connection is unusable for the full budget. A closed + // connection is a deterministic way to make every attempt fail. + nc := startEmbeddedNATS(t) + nc.Close() + + c := &natsKVCache[[]byte]{ + logger: nopLogger{}, + conn: nc, + bucket: "test-retry-exhausted", + cfg: &config{ + logger: nopLogger{}, + bucketName: "test-retry-exhausted", + ttl: time.Minute, + maxBytes: 10 * 1024, + replicas: 1, + }, + } + + start := time.Now() + err := c.initBucketWithRetry(200*time.Millisecond, 50*time.Millisecond) + elapsed := time.Since(start) + + require.Error(t, err) + assert.GreaterOrEqual(t, elapsed, 200*time.Millisecond, "should have retried for at least the budget") + assert.Less(t, elapsed, 2*time.Second, "should not have hung beyond the budget") +} + func TestNATSKV_WithReplicas(t *testing.T) { nc := startEmbeddedNATS(t)