Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/controlplane/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ func main() {

app, cleanup, err := wireApp(ctx, &bc, credsWriter, logger, availablePlugins)
if err != nil {
panic(err)
_ = logger.Log(log.LevelError, "msg", "failed to initialize control plane", "error", err.Error())
// Invoke critical deferred cleanups explicitly since os.Exit skips defers.
cancel()
availablePlugins.Cleanup()
flush()
os.Exit(1) //nolint:gocritic // deferred cleanups called explicitly above
}
defer cleanup()

Expand Down
98 changes: 84 additions & 14 deletions pkg/cache/natskv.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

const (
streamUpdateTimeout = 5 * time.Second
// initMaxWait covers several nats.ReconnectWait (2s) cycles so a transient
// drop during boot can heal before we give up.
initMaxWait = 30 * time.Second
initRetryPeriod = 2 * time.Second
)

type natsKVCache[T any] struct {
mu sync.RWMutex
kv jetstream.KeyValue
Expand All @@ -44,7 +53,7 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) {
cfg: cfg,
}

if err := c.initBucket(); err != nil {
if err := c.initBucketWithRetry(initMaxWait, initRetryPeriod); err != nil {
return nil, err
}

Expand All @@ -56,6 +65,48 @@ func newNATSKV[T any](cfg *config) (*natsKVCache[T], error) {
return c, nil
}

// initBucketWithRetry runs initBucket with a bounded retry loop so transient
// NATS disconnects (handled by the client's background auto-reconnect) don't
// cause the whole service to refuse startup on a momentary blip. Retries only
// apply to connectivity errors; configuration errors fail fast.
func (c *natsKVCache[T]) initBucketWithRetry(maxWait, period time.Duration) error {
deadline := time.Now().Add(maxWait)
for attempt := 1; ; attempt++ {
err := c.initBucket()
if err == nil {
return nil
}
if !isRetryableInitError(err) {
return err
}
if time.Now().After(deadline) {
return fmt.Errorf("cache: bucket %q init failed after %s (%d attempts): %w", c.bucket, maxWait, attempt, err)
}
c.logger.Warnw("msg", "cache: bucket init failed, retrying", "bucket", c.bucket, "attempt", attempt, "error", err)
time.Sleep(period)
}
}

// isRetryableInitError reports whether err looks like a transient NATS
// connectivity issue worth retrying. Config errors (e.g. unsupported replica
// count) fail fast so misconfigured deployments don't spin for the full budget.
func isRetryableInitError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
if errors.Is(err, nats.ErrConnectionClosed) ||
errors.Is(err, nats.ErrConnectionDraining) ||
errors.Is(err, nats.ErrNoServers) ||
errors.Is(err, nats.ErrTimeout) ||
errors.Is(err, nats.ErrDisconnected) {
return true
}
return false
}

func (c *natsKVCache[T]) initBucket() error {
js, err := jetstream.New(c.conn)
if err != nil {
Expand All @@ -74,32 +125,51 @@ func (c *natsKVCache[T]) initBucket() error {
return err
}

// NATS KV hardcodes DiscardNew on the backing stream, which rejects writes
// when MaxBytes is reached. For cache use-cases we want DiscardOld so that
// the oldest entries are evicted automatically to make room for new ones.
// NATS KV hardcodes DiscardNew; we want DiscardOld so the cache evicts
// oldest entries when MaxBytes is reached.
if c.cfg.maxBytes > 0 {
streamName := fmt.Sprintf("KV_%s", c.bucket)
stream, err := js.Stream(context.Background(), streamName)
if err != nil {
return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err)
}
cfg := stream.CachedInfo().Config
cfg.Discard = jetstream.DiscardOld
if _, err := js.UpdateStream(context.Background(), cfg); err != nil {
return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err)
if err := c.ensureDiscardOld(js); err != nil {
return err
}
}

c.mu.Lock()
c.kv = kv
c.mu.Unlock()

return nil
}

func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) error {
streamName := fmt.Sprintf("KV_%s", c.bucket)

ctx, cancel := context.WithTimeout(context.Background(), streamUpdateTimeout)
defer cancel()

stream, err := js.Stream(ctx, streamName)
if err != nil {
return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err)
}

cfg := stream.CachedInfo().Config
if cfg.Discard == jetstream.DiscardOld {
return nil
}
cfg.Discard = jetstream.DiscardOld

if _, err := js.UpdateStream(ctx, cfg); err != nil {
return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err)
}
return nil
}

func (c *natsKVCache[T]) watchReconnect(ch <-chan struct{}) {
for range ch {
c.logger.Infow("msg", "cache: NATS reconnected, reinitializing bucket", "bucket", c.bucket)
if err := c.initBucket(); err != nil {
// Share the initial-boot retry budget: a reconnect may race with
// NATS leader election / cluster settle, so one shot isn't enough.
// Runtime ops already fail-open, so this is best-effort.
if err := c.initBucketWithRetry(initMaxWait, initRetryPeriod); err != nil {
c.logger.Warnw("msg", "cache: failed to reinitialize bucket after reconnect", "bucket", c.bucket, "error", err)
}
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/cache/natskv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,82 @@ func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) {
assert.False(t, ok, "oldest entry should have been evicted")
}

func TestNATSKV_EnsureDiscardOldSkipsWhenAlreadySet(t *testing.T) {
// Exercise both branches of ensureDiscardOld directly: when the stream's
// Discard policy already matches, UpdateStream must not be called.
nc := startEmbeddedNATS(t)
bucket := "test-idempotent"

// First New creates the bucket and sets DiscardOld.
_, err := New[[]byte](
WithTTL(time.Minute),
WithNATS(nc, bucket),
WithMaxBytes(10*1024),
)
require.NoError(t, err)

// Flip the backing stream back to DiscardNew so we can observe the update branch.
js, err := jetstream.New(nc)
require.NoError(t, err)
streamName := "KV_" + bucket
stream, err := js.Stream(context.Background(), streamName)
require.NoError(t, err)
cfg := stream.CachedInfo().Config
cfg.Discard = jetstream.DiscardNew
_, err = js.UpdateStream(context.Background(), cfg)
require.NoError(t, err)

c := &natsKVCache[[]byte]{
logger: nopLogger{},
conn: nc,
bucket: bucket,
cfg: &config{logger: nopLogger{}, bucketName: bucket, maxBytes: 10 * 1024},
}

// Update branch: must flip Discard back to DiscardOld.
require.NoError(t, c.ensureDiscardOld(js))
stream, err = js.Stream(context.Background(), streamName)
require.NoError(t, err)
require.Equal(t, jetstream.DiscardOld, stream.CachedInfo().Config.Discard)

// Skip branch: with DiscardOld already set, ensureDiscardOld must not
// issue an UpdateStream call. Measure outbound NATS request count across
// a call — one Stream() lookup, zero UpdateStream() calls => 1 request.
before := nc.Stats().OutMsgs
require.NoError(t, c.ensureDiscardOld(js))
delta := nc.Stats().OutMsgs - before
assert.LessOrEqual(t, delta, uint64(1), "skip path must not issue an UpdateStream request")
}

func TestNATSKV_InitBucketRetriesOnTransientError(t *testing.T) {
// Verify the retry wrapper gives up cleanly (returns an error, no panic)
// when the NATS connection is unusable for the full budget. A closed
// connection is a deterministic way to make every attempt fail.
nc := startEmbeddedNATS(t)
nc.Close()

c := &natsKVCache[[]byte]{
logger: nopLogger{},
conn: nc,
bucket: "test-retry-exhausted",
cfg: &config{
logger: nopLogger{},
bucketName: "test-retry-exhausted",
ttl: time.Minute,
maxBytes: 10 * 1024,
replicas: 1,
},
}

start := time.Now()
err := c.initBucketWithRetry(200*time.Millisecond, 50*time.Millisecond)
elapsed := time.Since(start)

require.Error(t, err)
assert.GreaterOrEqual(t, elapsed, 200*time.Millisecond, "should have retried for at least the budget")
assert.Less(t, elapsed, 2*time.Second, "should not have hung beyond the budget")
}

func TestNATSKV_WithReplicas(t *testing.T) {
nc := startEmbeddedNATS(t)

Expand Down
Loading