Skip to content

Commit a491504

Browse files
committed
fix(cache): make NATS KV bucket init idempotent and fail-open
The control plane panicked on startup when UpdateStream on the KV backing stream hit the NATS default request timeout. initBucket now skips UpdateStream when Discard is already DiscardOld and logs a warning instead of returning an error when the JetStream metadata calls fail, matching the fail-open pattern used by the cache's runtime operations. Signed-off-by: Miguel Martinez <miguel@chainloop.dev>
1 parent d290a51 commit a491504

2 files changed

Lines changed: 100 additions & 16 deletions

File tree

pkg/cache/natskv.go

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ import (
2222
"errors"
2323
"fmt"
2424
"sync"
25+
"time"
2526

2627
"github.com/nats-io/nats.go"
2728
"github.com/nats-io/nats.go/jetstream"
2829
)
2930

31+
const streamUpdateTimeout = 5 * time.Second
32+
3033
type natsKVCache[T any] struct {
3134
mu sync.RWMutex
3235
kv jetstream.KeyValue
@@ -74,28 +77,42 @@ func (c *natsKVCache[T]) initBucket() error {
7477
return err
7578
}
7679

77-
// NATS KV hardcodes DiscardNew on the backing stream, which rejects writes
78-
// when MaxBytes is reached. For cache use-cases we want DiscardOld so that
79-
// the oldest entries are evicted automatically to make room for new ones.
80-
if c.cfg.maxBytes > 0 {
81-
streamName := fmt.Sprintf("KV_%s", c.bucket)
82-
stream, err := js.Stream(context.Background(), streamName)
83-
if err != nil {
84-
return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err)
85-
}
86-
cfg := stream.CachedInfo().Config
87-
cfg.Discard = jetstream.DiscardOld
88-
if _, err := js.UpdateStream(context.Background(), cfg); err != nil {
89-
return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err)
90-
}
91-
}
92-
9380
c.mu.Lock()
9481
c.kv = kv
9582
c.mu.Unlock()
83+
84+
// NATS KV hardcodes DiscardNew; we want DiscardOld so the cache evicts
85+
// oldest entries at MaxBytes. Fail-open to avoid crashing on NATS slowness.
86+
if c.cfg.maxBytes > 0 {
87+
c.ensureDiscardOld(js)
88+
}
89+
9690
return nil
9791
}
9892

93+
func (c *natsKVCache[T]) ensureDiscardOld(js jetstream.JetStream) {
94+
streamName := fmt.Sprintf("KV_%s", c.bucket)
95+
96+
ctx, cancel := context.WithTimeout(context.Background(), streamUpdateTimeout)
97+
defer cancel()
98+
99+
stream, err := js.Stream(ctx, streamName)
100+
if err != nil {
101+
c.logger.Warnw("msg", "cache: failed to get backing stream, skipping discard policy update", "stream", streamName, "error", err)
102+
return
103+
}
104+
105+
cfg := stream.CachedInfo().Config
106+
if cfg.Discard == jetstream.DiscardOld {
107+
return
108+
}
109+
cfg.Discard = jetstream.DiscardOld
110+
111+
if _, err := js.UpdateStream(ctx, cfg); err != nil {
112+
c.logger.Warnw("msg", "cache: failed to set DiscardOld on stream, continuing without it", "stream", streamName, "error", err)
113+
}
114+
}
115+
99116
func (c *natsKVCache[T]) watchReconnect(ch <-chan struct{}) {
100117
for range ch {
101118
c.logger.Infow("msg", "cache: NATS reconnected, reinitializing bucket", "bucket", c.bucket)

pkg/cache/natskv_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,73 @@ func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) {
251251
assert.False(t, ok, "oldest entry should have been evicted")
252252
}
253253

254+
func TestNATSKV_InitBucketIsIdempotent(t *testing.T) {
255+
// Simulates a pod restart: first New() sets DiscardOld on the backing
256+
// stream, second New() against the same NATS sees it's already correct
257+
// and must not issue another UpdateStream.
258+
nc := startEmbeddedNATS(t)
259+
bucket := "test-idempotent"
260+
261+
c1, err := New[[]byte](
262+
WithTTL(time.Minute),
263+
WithNATS(nc, bucket),
264+
WithMaxBytes(10*1024),
265+
)
266+
require.NoError(t, err)
267+
_ = c1
268+
269+
js, err := jetstream.New(nc)
270+
require.NoError(t, err)
271+
streamName := "KV_" + bucket
272+
stream, err := js.Stream(context.Background(), streamName)
273+
require.NoError(t, err)
274+
info, err := stream.Info(context.Background())
275+
require.NoError(t, err)
276+
firstUpdate := info.Created
277+
firstDiscard := info.Config.Discard
278+
require.Equal(t, jetstream.DiscardOld, firstDiscard, "first init must set DiscardOld")
279+
280+
// Second init against the same bucket must be a no-op for the update path.
281+
c2, err := New[[]byte](
282+
WithTTL(time.Minute),
283+
WithNATS(nc, bucket),
284+
WithMaxBytes(10*1024),
285+
)
286+
require.NoError(t, err)
287+
_ = c2
288+
289+
info2, err := stream.Info(context.Background())
290+
require.NoError(t, err)
291+
assert.Equal(t, firstUpdate, info2.Created, "stream should not have been recreated")
292+
assert.Equal(t, jetstream.DiscardOld, info2.Config.Discard)
293+
}
294+
295+
func TestNATSKV_InitBucketFailsOpenOnStreamUpdateError(t *testing.T) {
296+
// If the JetStream metadata calls time out or fail, init must not crash
297+
// the process — the runtime Get/Set already fail-open, so the cache
298+
// simply degrades. We exercise this by cancelling the context in the
299+
// middle of ensureDiscardOld via a NATS connection that's already closed.
300+
nc := startEmbeddedNATS(t)
301+
bucket := "test-failopen"
302+
303+
c, err := New[[]byte](
304+
WithTTL(time.Minute),
305+
WithNATS(nc, bucket),
306+
WithMaxBytes(10*1024),
307+
)
308+
require.NoError(t, err)
309+
310+
// Now close the NATS connection and call ensureDiscardOld directly —
311+
// it must return without panicking or returning an error.
312+
nc.Close()
313+
nkv := c.(*natsKVCache[[]byte])
314+
js, err := jetstream.New(nkv.conn)
315+
require.NoError(t, err)
316+
require.NotPanics(t, func() {
317+
nkv.ensureDiscardOld(js)
318+
})
319+
}
320+
254321
func TestNATSKV_WithReplicas(t *testing.T) {
255322
nc := startEmbeddedNATS(t)
256323

0 commit comments

Comments
 (0)