Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,39 @@ func TestEventStoreNoRegistry(t *testing.T) {
is.Equal(events[0].Data, []byte("hello"))
}

func TestWithAPIPrefix(t *testing.T) {
is := testutil.NewIs(t)

srv := testutil.NewNatsServerWithDomain(t, "test")
defer testutil.ShutdownNatsServer(srv)

nc, err := nats.Connect(srv.ClientURL())
is.NoErr(err)

m, err := New(nc, WithAPIPrefix("$JS.test.API"))
is.NoErr(err)

ctx := context.Background()
es, err := m.CreateEventStore(ctx, EventStoreConfig{
Name: "store",
})
is.NoErr(err)

seq, err := es.Append(ctx, []*Event{{
Entity: "order.1",
Type: "foo",
Data: []byte("hello"),
}})
is.NoErr(err)
is.Equal(seq, uint64(1))

var events eventSlice
_, err = es.Evolve(ctx, &events)
is.NoErr(err)
is.Equal(events[0].Type, "foo")
is.Equal(events[0].Data, []byte("hello"))
}

func TestEventStoreWithRegistry(t *testing.T) {
is := testutil.NewIs(t)

Expand Down
27 changes: 21 additions & 6 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func WithLogger(logger *slog.Logger) ManagerOption {
})
}

// WithAPIPrefix sets a custom JetStream API prefix on the NATS connection.
func WithAPIPrefix(apiPrefix string) ManagerOption {
Comment thread
joeriddles marked this conversation as resolved.
Comment thread
joeriddles marked this conversation as resolved.
return managerOption(func(o *Manager) error {
o.apiPrefix = apiPrefix
return nil
})
}

func eventSubject(name string, event *Event) string {
return fmt.Sprintf(eventStoreSubjectTmpl+"%s.%s", name, event.Entity, event.Type)
}
Expand All @@ -83,6 +91,7 @@ type Manager struct {
logger *slog.Logger
nc *nats.Conn
js jetstream.JetStream
apiPrefix string
Comment thread
joeriddles marked this conversation as resolved.
types *types.Registry
id id.ID
clock clock.Clock
Expand Down Expand Up @@ -188,15 +197,9 @@ func (m *Manager) DeleteEventStore(ctx context.Context, name string) error {

// New initializes a new Manager instance with a NATS connection.
func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) {
js, err := jetstream.New(nc)
if err != nil {
return nil, err
}

m := &Manager{
nc: nc,
logger: slog.Default(),
js: js,
id: id.NUID,
clock: clock.Time,
}
Expand All @@ -207,5 +210,17 @@ func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error) {
}
}

var js jetstream.JetStream
var err error
if m.apiPrefix != "" {
js, err = jetstream.NewWithAPIPrefix(nc, m.apiPrefix)
} else {
js, err = jetstream.New(nc)
}
if err != nil {
return nil, err
}
m.js = js

return m, nil
}
9 changes: 9 additions & 0 deletions testutil/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ func NewNatsServer(tb testing.TB) *server.Server {
return NewNatsServerWithDir(tb.TempDir())
}

func NewNatsServerWithDomain(tb testing.TB, domain string) *server.Server {
opts := natsserver.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
opts.JetStreamDomain = domain
opts.StoreDir = tb.TempDir()
return natsserver.RunServer(&opts)
}

func ShutdownNatsServer(s *server.Server) {
s.Shutdown()
s.WaitForShutdown()
Expand Down
Loading