Skip to content

Commit 00a734e

Browse files
committed
feat(cache): add configurable JetStream KV replica count
NATS KV buckets in pkg/cache were created with the default replica count of 1, causing inconsistent reads in clustered NATS deployments. Add a WithReplicas option to pkg/cache and thread the replica count from Helm values through proto config, natsconn, and bundle caches. Signed-off-by: Jose I. Paris <jiparis@chainloop.dev>
1 parent a17d344 commit 00a734e

12 files changed

Lines changed: 101 additions & 15 deletions

File tree

app/controlplane/cmd/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,15 @@ func newNatsConfig(c *conf.Bootstrap_NatsServer) *natsconn.Config {
221221
return nil
222222
}
223223

224+
replicas := int(c.GetReplicas())
225+
if replicas < 1 {
226+
replicas = 1
227+
}
228+
224229
cfg := &natsconn.Config{
225-
URI: uri,
226-
Name: "chainloop-controlplane",
230+
URI: uri,
231+
Name: "chainloop-controlplane",
232+
Replicas: replicas,
227233
}
228234

229235
if c.GetToken() != "" {

app/controlplane/internal/conf/controlplane/config/v1/conf.pb.go

Lines changed: 17 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/controlplane/internal/conf/controlplane/config/v1/conf.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//
2-
// Copyright 2024-2025 The Chainloop Authors.
2+
// Copyright 2024-2026 The Chainloop Authors.
33
//
44
// Licensed under the Apache License, Version 2.0 (the "License");
55
// you may not use this file except in compliance with the License.
@@ -101,6 +101,9 @@ message Bootstrap {
101101
// Token based authentication
102102
string token = 2 [(buf.validate.field).string.min_len = 1];
103103
}
104+
// Number of replicas for JetStream KV buckets.
105+
// Defaults to 1. Set to 3 for production clusters.
106+
int32 replicas = 3;
104107
}
105108

106109
// External URL of the platform UI, if available

deployment/chainloop/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ description: Chainloop is an open source software supply chain control plane, a
77

88
type: application
99
# Bump the patch (not minor, not major) version on each change in the Chart Source code
10-
version: 1.364.0
10+
version: 1.364.1
1111
# Do not update appVersion, this is handled automatically by the release process
1212
appVersion: v1.90.0
1313

deployment/chainloop/templates/controlplane/secret-config.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,14 @@ stringData:
9696
{{- end }}
9797
9898
{{- if and .Values.controlplane.nats.enabled }}
99-
nats_server:
99+
nats_server:
100100
uri: {{ include "controlplane.nats.connection_string" . | quote }}
101101
{{- if ne .Values.controlplane.nats.token "" }}
102102
token: {{ .Values.controlplane.nats.token | quote }}
103103
{{- end }}
104+
{{- if .Values.controlplane.nats.replicas }}
105+
replicas: {{ .Values.controlplane.nats.replicas }}
106+
{{- end }}
104107
{{- end }}
105108
106109
credentials_service: {{- include "chainloop.credentials_service_settings" . | indent 6 }}

deployment/chainloop/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,13 @@ controlplane:
188188
## @param controlplane.nats.host NATS Host
189189
## @param controlplane.nats.port NATS Port
190190
## @param controlplane.nats.token NATS Client authentication token
191+
## @param controlplane.nats.replicas Number of JetStream KV replicas. Set to 3 for clustered NATS deployments.
191192
nats:
192193
enabled: false
193194
host: ""
194195
port: 4222
195196
token: ""
197+
replicas: 1
196198

197199
## @extra controlplane.onboarding.name Name of the organization to onboard
198200
## @extra controlplane.onboarding.role Role of the organization to onboard

pkg/cache/attestationbundle/attestationbundle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func New(ctx context.Context, rc *natsconn.ReloadableConnection, logger log.Logg
5252
if rc != nil {
5353
opts = append(opts, cache.WithNATS(rc.Conn, bucket))
5454
opts = append(opts, cache.WithReconnect(rc.Subscribe(ctx)))
55+
opts = append(opts, cache.WithReplicas(rc.Replicas))
5556
}
5657

5758
c, err := cache.New[[]byte](opts...)

pkg/cache/cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Logger interface {
4343
type config struct {
4444
ttl time.Duration
4545
maxBytes int64
46+
replicas int
4647
logger Logger
4748
natsConn *nats.Conn
4849
bucketName string
@@ -82,6 +83,13 @@ func WithDescription(desc string) Option {
8283
return func(c *config) { c.description = desc }
8384
}
8485

86+
// WithReplicas sets the number of JetStream KV replicas for the NATS bucket.
87+
// Defaults to 1 if not set. Set to match the cluster size (e.g. 3) for
88+
// production NATS clusters. Ignored for in-memory backend.
89+
func WithReplicas(n int) Option {
90+
return func(c *config) { c.replicas = n }
91+
}
92+
8593
// WithReconnect provides a channel that signals NATS reconnection events.
8694
func WithReconnect(ch <-chan struct{}) Option {
8795
return func(c *config) { c.reconnCh = ch }

pkg/cache/natskv.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (c *natsKVCache[T]) initBucket() error {
6767
Description: c.cfg.description,
6868
TTL: c.cfg.ttl,
6969
MaxBytes: c.cfg.maxBytes,
70+
Replicas: c.cfg.replicas,
7071
Storage: jetstream.MemoryStorage,
7172
})
7273
if err != nil {

pkg/cache/natskv_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
natsserver "github.com/nats-io/nats-server/v2/server"
2525
"github.com/nats-io/nats.go"
26+
"github.com/nats-io/nats.go/jetstream"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829
)
@@ -250,6 +251,53 @@ func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) {
250251
assert.False(t, ok, "oldest entry should have been evicted")
251252
}
252253

254+
func TestNATSKV_WithReplicas(t *testing.T) {
255+
nc := startEmbeddedNATS(t)
256+
257+
tests := []struct {
258+
name string
259+
replicas int
260+
wantRep int
261+
}{
262+
{"no WithReplicas defaults to 1", 0, 1},
263+
{"explicit 1 replica", 1, 1},
264+
}
265+
266+
for _, tt := range tests {
267+
t.Run(tt.name, func(t *testing.T) {
268+
bucket := sanitizeBucketName("test-replicas-" + tt.name)
269+
opts := []Option{
270+
WithTTL(5 * time.Second),
271+
WithNATS(nc, bucket),
272+
}
273+
if tt.replicas > 0 {
274+
opts = append(opts, WithReplicas(tt.replicas))
275+
}
276+
c, err := New[string](opts...)
277+
require.NoError(t, err)
278+
279+
// Verify replica count via the backing stream config
280+
nkv := c.(*natsKVCache[string])
281+
js, err := jetstream.New(nc)
282+
require.NoError(t, err)
283+
stream, err := js.Stream(context.Background(), "KV_"+nkv.bucket)
284+
require.NoError(t, err)
285+
assert.Equal(t, tt.wantRep, stream.CachedInfo().Config.Replicas)
286+
})
287+
}
288+
289+
// Replicas > 1 requires a multi-node NATS cluster. Verify that the option
290+
// is actually passed through by confirming that a single-node server rejects it.
291+
t.Run("replicas 3 rejected by single-node server", func(t *testing.T) {
292+
_, err := New[string](
293+
WithTTL(5*time.Second),
294+
WithNATS(nc, "test-replicas-3"),
295+
WithReplicas(3),
296+
)
297+
require.Error(t, err, "single-node NATS should reject replicas > 1")
298+
})
299+
}
300+
253301
func TestNew_WithNATSReturnsNATSBackend(t *testing.T) {
254302
nc := startEmbeddedNATS(t)
255303
c, err := New[string](

0 commit comments

Comments
 (0)