diff --git a/eventstore_test.go b/eventstore_test.go index b93901c..9351fd2 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -140,6 +140,39 @@ func TestEventStoreNoRegistry(t *testing.T) { is.Equal(events[0].Data, []byte("hello")) } +func TestWithAPIPrefix(t *testing.T) { + is := testutil.NewIs(t) + + srv := testutil.NewNatsServerWithDomain(t, "test") + defer testutil.ShutdownNatsServer(srv) + + nc, err := nats.Connect(srv.ClientURL()) + is.NoErr(err) + + m, err := New(nc, WithAPIPrefix("$JS.test.API")) + is.NoErr(err) + + ctx := context.Background() + es, err := m.CreateEventStore(ctx, EventStoreConfig{ + Name: "store", + }) + is.NoErr(err) + + seq, err := es.Append(ctx, []*Event{{ + Entity: "order.1", + Type: "foo", + Data: []byte("hello"), + }}) + is.NoErr(err) + is.Equal(seq, uint64(1)) + + var events eventSlice + _, err = es.Evolve(ctx, &events) + is.NoErr(err) + is.Equal(events[0].Type, "foo") + is.Equal(events[0].Data, []byte("hello")) +} + func TestEventStoreWithRegistry(t *testing.T) { is := testutil.NewIs(t) diff --git a/manager.go b/manager.go index 3cd6a52..c0f5000 100644 --- a/manager.go +++ b/manager.go @@ -60,6 +60,14 @@ func WithLogger(logger *slog.Logger) ManagerOption { }) } +// WithAPIPrefix sets a custom JetStream API prefix on the NATS connection. +func WithAPIPrefix(apiPrefix string) ManagerOption { + return managerOption(func(o *Manager) error { + o.apiPrefix = apiPrefix + return nil + }) +} + func eventSubject(name string, event *Event) string { return fmt.Sprintf(eventStoreSubjectTmpl+"%s.%s", name, event.Entity, event.Type) } @@ -83,6 +91,7 @@ type Manager struct { logger *slog.Logger nc *nats.Conn js jetstream.JetStream + apiPrefix string types *types.Registry id id.ID clock clock.Clock @@ -188,15 +197,9 @@ func (m *Manager) DeleteEventStore(ctx context.Context, name string) error { // New initializes a new Manager instance with a NATS connection. func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) { - js, err := jetstream.New(nc) - if err != nil { - return nil, err - } - m := &Manager{ nc: nc, logger: slog.Default(), - js: js, id: id.NUID, clock: clock.Time, } @@ -207,5 +210,17 @@ func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) { } } + var js jetstream.JetStream + var err error + if m.apiPrefix != "" { + js, err = jetstream.NewWithAPIPrefix(nc, m.apiPrefix) + } else { + js, err = jetstream.New(nc) + } + if err != nil { + return nil, err + } + m.js = js + return m, nil } diff --git a/testutil/nats.go b/testutil/nats.go index afcb36e..5175846 100644 --- a/testutil/nats.go +++ b/testutil/nats.go @@ -19,6 +19,15 @@ func NewNatsServer(tb testing.TB) *server.Server { return NewNatsServerWithDir(tb.TempDir()) } +func NewNatsServerWithDomain(tb testing.TB, domain string) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + opts.JetStreamDomain = domain + opts.StoreDir = tb.TempDir() + return natsserver.RunServer(&opts) +} + func ShutdownNatsServer(s *server.Server) { s.Shutdown() s.WaitForShutdown()