diff --git a/go.mod b/go.mod index 5544e516..2969e7a0 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( go.opentelemetry.io/otel v1.38.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.18.0 + golang.org/x/text v0.30.0 ) require ( @@ -309,7 +310,6 @@ require ( golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.28.0 // indirect golang.org/x/sys v0.38.0 // indirect - golang.org/x/text v0.30.0 // indirect golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.37.0 // indirect golang.org/x/tools/go/expect v0.1.1-deprecated // indirect diff --git a/internal/http/controllers/v1/management/controller.go b/internal/http/controllers/v1/management/controller.go index 11f9936f..7da5c4f5 100644 --- a/internal/http/controllers/v1/management/controller.go +++ b/internal/http/controllers/v1/management/controller.go @@ -20,7 +20,7 @@ func NewController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB webhookCaller := webhook.NewCaller(logger.Named("webhook"), cfg.Webhook) controller := &Controller{ - ProjectsController: NewProjectsController(logger, managementDB, usersDB, journeyDB, webhookCaller), + ProjectsController: NewProjectsController(logger, managementDB, usersDB, journeyDB, webhookCaller, pub), CampaignsController: NewCampaignsController(logger, managementDB, usersDB), TemplatesController: NewTemplatesController(logger, managementDB), AdminsController: NewAdminsController(logger, managementDB), diff --git a/internal/http/controllers/v1/management/projects.go b/internal/http/controllers/v1/management/projects.go index 367cccd0..9f7aab68 100644 --- a/internal/http/controllers/v1/management/projects.go +++ b/internal/http/controllers/v1/management/projects.go @@ -14,6 +14,8 @@ import ( "github.com/lunogram/platform/internal/http/controllers/v1/management/oapi" "github.com/lunogram/platform/internal/http/json" "github.com/lunogram/platform/internal/http/problem" + "github.com/lunogram/platform/internal/pubsub" + "github.com/lunogram/platform/internal/pubsub/schemas" "github.com/lunogram/platform/internal/store" "github.com/lunogram/platform/internal/store/journey" "github.com/lunogram/platform/internal/store/management" @@ -25,7 +27,7 @@ import ( "golang.org/x/text/language/display" ) -func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB, webhookCaller *webhook.Caller) *ProjectsController { +func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB, webhookCaller *webhook.Caller, pub pubsub.Publisher) *ProjectsController { return &ProjectsController{ logger: logger, managementDB: managementDB, @@ -33,6 +35,7 @@ func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB journey: journey.NewState(journeyDB), users: subjects.NewState(usersDB), webhook: webhookCaller, + pub: pub, } } @@ -43,6 +46,7 @@ type ProjectsController struct { journey *journey.State users *subjects.State webhook *webhook.Caller + pub pubsub.Publisher } func (srv *ProjectsController) loadProjectCounts(ctx context.Context, project *management.Project) { @@ -239,6 +243,26 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ return } + if srv.pub != nil { + err = srv.pub.Publish(ctx, schemas.ProjectEventsProcess(scope.OrganizationID), schemas.ProjectEvent{ + ID: projectID, + Name: schemas.EventProjectCreated, + OrganizationID: scope.OrganizationID, + Data: map[string]any{ + "id": projectID, + "organization_id": scope.OrganizationID, + "name": body.Name, + "timezone": body.Timezone, + "locale": body.Locale, + }, + }) + if err != nil { + logger.Error("failed to publish project created event", zap.Error(err)) + oapi.WriteProblem(w, err) + return + } + } + err = tx.Commit() if err != nil { logger.Error("unexpected error while attempting to commit transaction", zap.Error(err)) diff --git a/internal/http/controllers/v1/management/projects_test.go b/internal/http/controllers/v1/management/projects_test.go index 202daa76..7999f502 100644 --- a/internal/http/controllers/v1/management/projects_test.go +++ b/internal/http/controllers/v1/management/projects_test.go @@ -2,7 +2,9 @@ package v1 import ( "bytes" + "context" "encoding/json" + "errors" "net/http" "net/http/httptest" "sync/atomic" @@ -13,6 +15,8 @@ import ( "github.com/lunogram/platform/internal/config" "github.com/lunogram/platform/internal/http/controllers/v1/management/oapi" + "github.com/lunogram/platform/internal/pubsub/schemas" + "github.com/lunogram/platform/internal/store" "github.com/lunogram/platform/internal/store/management" teststore "github.com/lunogram/platform/internal/store/test" "github.com/lunogram/platform/internal/webhook" @@ -21,6 +25,30 @@ import ( "go.uber.org/zap/zaptest" ) +type publishedMessage struct { + subject schemas.Subject + data []byte +} + +type recordingPublisher struct { + messages []publishedMessage +} + +func (r *recordingPublisher) Publish(_ context.Context, subject schemas.Subject, v any) error { + data, err := json.Marshal(v) + if err != nil { + return err + } + r.messages = append(r.messages, publishedMessage{subject: subject, data: data}) + return nil +} + +type failingPublisher struct{} + +func (f *failingPublisher) Publish(_ context.Context, _ schemas.Subject, _ any) error { + return errors.New("publish failed") +} + func TestCreateProject(t *testing.T) { t.Parallel() @@ -43,7 +71,7 @@ func TestCreateProject(t *testing.T) { admin, err := admins.GetAdmin(ctx, adminID) require.NoError(t, err) - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) type test struct { body oapi.CreateProjectJSONRequestBody @@ -136,7 +164,7 @@ func TestListProjects(t *testing.T) { require.NoError(t, err) } - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) res := httptest.NewRecorder() req := httptest.NewRequest("GET", "/api/admin/projects", nil) @@ -198,7 +226,7 @@ func TestGetProject(t *testing.T) { err = projectStore.AddProjectAdmin(ctx, projectID, adminID, "admin") require.NoError(t, err) - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) res := httptest.NewRecorder() req := httptest.NewRequest("GET", "/api/admin/projects/"+projectID.String(), nil) @@ -253,7 +281,7 @@ func TestUpdateProject(t *testing.T) { err = projectStore.AddProjectAdmin(ctx, projectID, adminID, "admin") require.NoError(t, err) - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) type test struct { body oapi.UpdateProjectJSONRequestBody @@ -349,7 +377,7 @@ func TestCreateProjectWebhook(t *testing.T) { ProjectCreatedURL: webhookServer.URL, }) - projects := NewProjectsController(logger, mgmt, usrs, jrny, caller) + projects := NewProjectsController(logger, mgmt, usrs, jrny, caller, nil) body := oapi.CreateProjectJSONRequestBody{ Name: "Webhook Test Project", @@ -378,3 +406,95 @@ func TestCreateProjectWebhook(t *testing.T) { require.Equal(t, orgID, receivedEvent.Project.OrganizationId) require.NotEqual(t, uuid.Nil, receivedEvent.Project.Id) } + +func TestCreateProjectPublishesNATSEvent(t *testing.T) { + t.Parallel() + + logger := zaptest.NewLogger(t) + ctx := t.Context() + mgmt, usrs, jrny := teststore.RunPostgreSQL(t) + + orgs := management.NewOrganizationsStore(mgmt) + orgID, err := orgs.CreateOrganization(ctx, "Test Organization") + require.NoError(t, err) + + pub := &recordingPublisher{} + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, pub) + + body := oapi.CreateProjectJSONRequestBody{ + Name: "NATS Test Project", + Timezone: "America/New_York", + Locale: "en-US", + } + + bb, err := json.Marshal(body) + require.NoError(t, err) + + res := httptest.NewRecorder() + req := httptest.NewRequest("POST", "/api/admin/projects", bytes.NewReader(bb)) + + claimAdmin := &rbac.Scope{ + OrganizationID: orgID, + } + req = req.WithContext(rbac.WithScope(req.Context(), claimAdmin)) + + projects.CreateProject(res, req) + + require.Equal(t, http.StatusCreated, res.Code, res.Body.String()) + require.Len(t, pub.messages, 1, "expected one NATS event to be published") + + expectedSubject := schemas.ProjectEventsProcess(orgID) + require.Equal(t, expectedSubject, pub.messages[0].subject) + + var event schemas.ProjectEvent + err = json.Unmarshal(pub.messages[0].data, &event) + require.NoError(t, err) + require.Equal(t, schemas.EventProjectCreated, event.Name) + require.Equal(t, orgID, event.OrganizationID) + require.NotEqual(t, uuid.Nil, event.ID) + require.Equal(t, "NATS Test Project", event.Data["name"]) + require.Equal(t, "America/New_York", event.Data["timezone"]) + require.Equal(t, "en-US", event.Data["locale"]) +} + +func TestCreateProjectRollbackOnPublishFailure(t *testing.T) { + t.Parallel() + + logger := zaptest.NewLogger(t) + ctx := t.Context() + mgmt, usrs, jrny := teststore.RunPostgreSQL(t) + + orgs := management.NewOrganizationsStore(mgmt) + orgID, err := orgs.CreateOrganization(ctx, "Test Organization") + require.NoError(t, err) + + pub := &failingPublisher{} + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, pub) + + body := oapi.CreateProjectJSONRequestBody{ + Name: "Rollback Test Project", + Timezone: "America/New_York", + Locale: "en-US", + } + + bb, err := json.Marshal(body) + require.NoError(t, err) + + res := httptest.NewRecorder() + req := httptest.NewRequest("POST", "/api/admin/projects", bytes.NewReader(bb)) + + claimAdmin := &rbac.Scope{ + OrganizationID: orgID, + } + req = req.WithContext(rbac.WithScope(req.Context(), claimAdmin)) + + projects.CreateProject(res, req) + + require.Equal(t, http.StatusInternalServerError, res.Code, "request should fail when publish fails") + + // Verify the project was not persisted (transaction was rolled back) + mgmtState := management.NewState(mgmt) + _, total, err := mgmtState.ListProjects(ctx, orgID, store.Pagination{Limit: 10, Offset: 0}, "") + require.NoError(t, err) + require.Equal(t, 0, total, "project should not exist after rollback") +} diff --git a/internal/pubsub/consumer/bootstrap.go b/internal/pubsub/consumer/bootstrap.go index ad69a1f0..b80c5766 100644 --- a/internal/pubsub/consumer/bootstrap.go +++ b/internal/pubsub/consumer/bootstrap.go @@ -206,6 +206,23 @@ func Bootstrap(ctx graceful.Context, logger *zap.Logger, jet jetstream.JetStream MaxDeliver: 5, }) + bootstrap.EnsureStream(ctx, jetstream.StreamConfig{ + Name: ns.Stream(StreamProjects), + Description: "Project event processing", + Subjects: []string{ns.Subject("projects.events.>")}, + Discard: jetstream.DiscardOld, + MaxAge: 24 * time.Hour, + Replicas: 1, + }) + + bootstrap.EnsureConsumer(ctx, ns.Stream(StreamProjects), jetstream.ConsumerConfig{ + Name: ns.Consumer(ConsumerProjectEventsProcess), + Description: "Processes incoming project events", + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: ns.Subject("projects.events.>"), + MaxDeliver: 5, + }) + return bootstrap.Error() } diff --git a/internal/pubsub/consumer/consumer.go b/internal/pubsub/consumer/consumer.go index b1bd3438..8757b19a 100644 --- a/internal/pubsub/consumer/consumer.go +++ b/internal/pubsub/consumer/consumer.go @@ -24,6 +24,7 @@ const ( StreamOrganizationUsers = "organizations-users" StreamOrganizationEvents = "organizations-events" StreamActions = "actions" + StreamProjects = "projects" ) // Subscription subjects for NATS core subscribers. @@ -47,6 +48,7 @@ const ( ConsumerOrganizationEventsProcess = "organizations-events-process" ConsumerOrganizationEventsSchema = "organizations-events-schema" ConsumerActionsSchema = "actions-schema" + ConsumerProjectEventsProcess = "projects-events-process" ) // Serve starts all JetStream consumers and registers their handlers. diff --git a/internal/pubsub/schemas/events.go b/internal/pubsub/schemas/events.go index a0854c01..488881bb 100644 --- a/internal/pubsub/schemas/events.go +++ b/internal/pubsub/schemas/events.go @@ -20,6 +20,8 @@ const ( EventOrganizationUserAdded = "organization.user.added" EventOrganizationUserUpdated = "organization.user.updated" EventOrganizationUserRemoved = "organization.user.removed" + + EventProjectCreated = "project.created" ) // UserEvent represents a tracked event with associated user and project information. @@ -243,3 +245,16 @@ type ActionSchema struct { func ActionsSchema(projectID uuid.UUID) Subject { return Subject(fmt.Sprintf("actions.schema.%s", projectID)) } + +// ProjectEvent represents an event that occurs on a project. +type ProjectEvent struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` + OrganizationID uuid.UUID `json:"organization_id"` + Data map[string]any `json:"data"` +} + +// ProjectEventsProcess returns the NATS subject for project event processing. +func ProjectEventsProcess(organizationID uuid.UUID) Subject { + return Subject(fmt.Sprintf("projects.events.%s", organizationID)) +}