From 280b63ed3bb292d1c7932b35e13268970b539f09 Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Fri, 22 May 2026 10:10:51 -0700 Subject: [PATCH 1/2] feat: support JetStream options Signed-off-by: Joe Riddle --- eventstore_test.go | 37 +++++++++++++++++++++++++++++++++++++ manager.go | 33 +++++++++++++++++++++++++++------ testutil/nats.go | 9 +++++++++ 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/eventstore_test.go b/eventstore_test.go index b93901c..4e3b42a 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/synadia-labs/rita/id" "github.com/synadia-labs/rita/testutil" "github.com/synadia-labs/rita/types" @@ -140,6 +141,42 @@ func TestEventStoreNoRegistry(t *testing.T) { is.Equal(events[0].Data, []byte("hello")) } +func TestWithJetStreamOpts_APIPrefix(t *testing.T) { + is := testutil.NewIs(t) + + srv := testutil.NewNatsServerWithDomain(t, "test") + defer testutil.ShutdownNatsServer(srv) + + nc, err := nats.Connect(srv.ClientURL()) + is.NoErr(err) + + m, err := New(nc, WithJetStreamOpts(func(opts *jetstream.JetStreamOptions) error { + opts.APIPrefix = "$JS.test.API" + return nil + })) + is.NoErr(err) + + ctx := context.Background() + es, err := m.CreateEventStore(ctx, EventStoreConfig{ + Name: "store", + }) + is.NoErr(err) + + seq, err := es.Append(ctx, []*Event{{ + Entity: "order.1", + Type: "foo", + Data: []byte("hello"), + }}) + is.NoErr(err) + is.Equal(seq, uint64(1)) + + var events eventSlice + _, err = es.Evolve(ctx, &events) + is.NoErr(err) + is.Equal(events[0].Type, "foo") + is.Equal(events[0].Data, []byte("hello")) +} + func TestEventStoreWithRegistry(t *testing.T) { is := testutil.NewIs(t) diff --git a/manager.go b/manager.go index 3cd6a52..2f0756b 100644 --- a/manager.go +++ b/manager.go @@ -60,6 +60,13 @@ func WithLogger(logger *slog.Logger) ManagerOption { }) } +func WithJetStreamOpts(jsOpts ...jetstream.JetStreamOpt) ManagerOption { + return managerOption(func(o *Manager) error { + o.jsOpts = jsOpts + return nil + }) +} + func eventSubject(name string, event *Event) string { return fmt.Sprintf(eventStoreSubjectTmpl+"%s.%s", name, event.Entity, event.Type) } @@ -83,6 +90,7 @@ type Manager struct { logger *slog.Logger nc *nats.Conn js jetstream.JetStream + jsOpts []jetstream.JetStreamOpt types *types.Registry id id.ID clock clock.Clock @@ -188,15 +196,9 @@ func (m *Manager) DeleteEventStore(ctx context.Context, name string) error { // New initializes a new Manager instance with a NATS connection. func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) { - js, err := jetstream.New(nc) - if err != nil { - return nil, err - } - m := &Manager{ nc: nc, logger: slog.Default(), - js: js, id: id.NUID, clock: clock.Time, } @@ -207,5 +209,24 @@ func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) { } } + var jsOpts jetstream.JetStreamOptions + for _, opt := range m.jsOpts { + if err := opt(&jsOpts); err != nil { + return nil, err + } + } + + var js jetstream.JetStream + var err error + if jsOpts.APIPrefix != "" { + js, err = jetstream.NewWithAPIPrefix(nc, jsOpts.APIPrefix, m.jsOpts...) + } else { + js, err = jetstream.New(nc, m.jsOpts...) + } + if err != nil { + return nil, err + } + m.js = js + return m, nil } diff --git a/testutil/nats.go b/testutil/nats.go index afcb36e..5175846 100644 --- a/testutil/nats.go +++ b/testutil/nats.go @@ -19,6 +19,15 @@ func NewNatsServer(tb testing.TB) *server.Server { return NewNatsServerWithDir(tb.TempDir()) } +func NewNatsServerWithDomain(tb testing.TB, domain string) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + opts.JetStreamDomain = domain + opts.StoreDir = tb.TempDir() + return natsserver.RunServer(&opts) +} + func ShutdownNatsServer(s *server.Server) { s.Shutdown() s.WaitForShutdown() From d76ca1f96e68a2ada317379970171c9fd1116398 Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Fri, 22 May 2026 10:21:20 -0700 Subject: [PATCH 2/2] refactor: simplify jetstream options Signed-off-by: Joe Riddle --- eventstore_test.go | 8 ++------ manager.go | 20 +++++++------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/eventstore_test.go b/eventstore_test.go index 4e3b42a..9351fd2 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "github.com/synadia-labs/rita/id" "github.com/synadia-labs/rita/testutil" "github.com/synadia-labs/rita/types" @@ -141,7 +140,7 @@ func TestEventStoreNoRegistry(t *testing.T) { is.Equal(events[0].Data, []byte("hello")) } -func TestWithJetStreamOpts_APIPrefix(t *testing.T) { +func TestWithAPIPrefix(t *testing.T) { is := testutil.NewIs(t) srv := testutil.NewNatsServerWithDomain(t, "test") @@ -150,10 +149,7 @@ func TestWithJetStreamOpts_APIPrefix(t *testing.T) { nc, err := nats.Connect(srv.ClientURL()) is.NoErr(err) - m, err := New(nc, WithJetStreamOpts(func(opts *jetstream.JetStreamOptions) error { - opts.APIPrefix = "$JS.test.API" - return nil - })) + m, err := New(nc, WithAPIPrefix("$JS.test.API")) is.NoErr(err) ctx := context.Background() diff --git a/manager.go b/manager.go index 2f0756b..c0f5000 100644 --- a/manager.go +++ b/manager.go @@ -60,9 +60,10 @@ func WithLogger(logger *slog.Logger) ManagerOption { }) } -func WithJetStreamOpts(jsOpts ...jetstream.JetStreamOpt) ManagerOption { +// WithAPIPrefix sets a custom JetStream API prefix on the NATS connection. +func WithAPIPrefix(apiPrefix string) ManagerOption { return managerOption(func(o *Manager) error { - o.jsOpts = jsOpts + o.apiPrefix = apiPrefix return nil }) } @@ -90,7 +91,7 @@ type Manager struct { logger *slog.Logger nc *nats.Conn js jetstream.JetStream - jsOpts []jetstream.JetStreamOpt + apiPrefix string types *types.Registry id id.ID clock clock.Clock @@ -209,19 +210,12 @@ func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) { } } - var jsOpts jetstream.JetStreamOptions - for _, opt := range m.jsOpts { - if err := opt(&jsOpts); err != nil { - return nil, err - } - } - var js jetstream.JetStream var err error - if jsOpts.APIPrefix != "" { - js, err = jetstream.NewWithAPIPrefix(nc, jsOpts.APIPrefix, m.jsOpts...) + if m.apiPrefix != "" { + js, err = jetstream.NewWithAPIPrefix(nc, m.apiPrefix) } else { - js, err = jetstream.New(nc, m.jsOpts...) + js, err = jetstream.New(nc) } if err != nil { return nil, err