From ebe64080a27e87555754776a7e1eac15045f71b8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 10:50:55 +0000 Subject: [PATCH 1/6] Initial plan From 8d23be74785c11add073080aa74796cd0e82a0bb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 10:58:51 +0000 Subject: [PATCH 2/6] Publish project.created NATS event on project creation - Add EventProjectCreated constant and ProjectEvent schema - Add ProjectEventsProcess subject function - Add StreamProjects and ConsumerProjectEventsProcess constants - Bootstrap project events stream and consumer - Add pubsub.Publisher to ProjectsController - Publish project.created event in CreateProject handler - Add test for NATS event publication on project creation Co-authored-by: jeroenrinzema <3440116+jeroenrinzema@users.noreply.github.com> --- .../controllers/v1/management/controller.go | 2 +- .../controllers/v1/management/projects.go | 24 +++++- .../v1/management/projects_test.go | 80 +++++++++++++++++-- internal/pubsub/consumer/bootstrap.go | 17 ++++ internal/pubsub/consumer/consumer.go | 2 + internal/pubsub/schemas/events.go | 15 ++++ 6 files changed, 133 insertions(+), 7 deletions(-) diff --git a/internal/http/controllers/v1/management/controller.go b/internal/http/controllers/v1/management/controller.go index 11f9936f..7da5c4f5 100644 --- a/internal/http/controllers/v1/management/controller.go +++ b/internal/http/controllers/v1/management/controller.go @@ -20,7 +20,7 @@ func NewController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB webhookCaller := webhook.NewCaller(logger.Named("webhook"), cfg.Webhook) controller := &Controller{ - ProjectsController: NewProjectsController(logger, managementDB, usersDB, journeyDB, webhookCaller), + ProjectsController: NewProjectsController(logger, managementDB, usersDB, journeyDB, webhookCaller, pub), CampaignsController: NewCampaignsController(logger, managementDB, usersDB), TemplatesController: NewTemplatesController(logger, managementDB), AdminsController: NewAdminsController(logger, managementDB), diff --git a/internal/http/controllers/v1/management/projects.go b/internal/http/controllers/v1/management/projects.go index 367cccd0..1a019a8d 100644 --- a/internal/http/controllers/v1/management/projects.go +++ b/internal/http/controllers/v1/management/projects.go @@ -14,6 +14,8 @@ import ( "github.com/lunogram/platform/internal/http/controllers/v1/management/oapi" "github.com/lunogram/platform/internal/http/json" "github.com/lunogram/platform/internal/http/problem" + "github.com/lunogram/platform/internal/pubsub" + "github.com/lunogram/platform/internal/pubsub/schemas" "github.com/lunogram/platform/internal/store" "github.com/lunogram/platform/internal/store/journey" "github.com/lunogram/platform/internal/store/management" @@ -25,7 +27,7 @@ import ( "golang.org/x/text/language/display" ) -func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB, webhookCaller *webhook.Caller) *ProjectsController { +func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB, webhookCaller *webhook.Caller, pub pubsub.Publisher) *ProjectsController { return &ProjectsController{ logger: logger, managementDB: managementDB, @@ -33,6 +35,7 @@ func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB journey: journey.NewState(journeyDB), users: subjects.NewState(usersDB), webhook: webhookCaller, + pub: pub, } } @@ -43,6 +46,7 @@ type ProjectsController struct { journey *journey.State users *subjects.State webhook *webhook.Caller + pub pubsub.Publisher } func (srv *ProjectsController) loadProjectCounts(ctx context.Context, project *management.Project) { @@ -270,6 +274,24 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ return } + if srv.pub != nil { + err = srv.pub.Publish(ctx, schemas.ProjectEventsProcess(*project.OrganizationID), schemas.ProjectEvent{ + ID: project.ID, + Name: schemas.EventProjectCreated, + OrganizationID: *project.OrganizationID, + Data: map[string]any{ + "name": project.Name, + "timezone": project.Timezone, + "locale": project.Locale, + }, + }) + if err != nil { + logger.Error("failed to publish project created event", zap.Error(err)) + oapi.WriteProblem(w, err) + return + } + } + json.Write(w, http.StatusCreated, project.OAPI()) } diff --git a/internal/http/controllers/v1/management/projects_test.go b/internal/http/controllers/v1/management/projects_test.go index 202daa76..29437e57 100644 --- a/internal/http/controllers/v1/management/projects_test.go +++ b/internal/http/controllers/v1/management/projects_test.go @@ -2,6 +2,7 @@ package v1 import ( "bytes" + "context" "encoding/json" "net/http" "net/http/httptest" @@ -13,6 +14,7 @@ import ( "github.com/lunogram/platform/internal/config" "github.com/lunogram/platform/internal/http/controllers/v1/management/oapi" + "github.com/lunogram/platform/internal/pubsub/schemas" "github.com/lunogram/platform/internal/store/management" teststore "github.com/lunogram/platform/internal/store/test" "github.com/lunogram/platform/internal/webhook" @@ -21,6 +23,24 @@ import ( "go.uber.org/zap/zaptest" ) +type publishedMessage struct { + subject schemas.Subject + data []byte +} + +type recordingPublisher struct { + messages []publishedMessage +} + +func (r *recordingPublisher) Publish(_ context.Context, subject schemas.Subject, v any) error { + data, err := json.Marshal(v) + if err != nil { + return err + } + r.messages = append(r.messages, publishedMessage{subject: subject, data: data}) + return nil +} + func TestCreateProject(t *testing.T) { t.Parallel() @@ -43,7 +63,7 @@ func TestCreateProject(t *testing.T) { admin, err := admins.GetAdmin(ctx, adminID) require.NoError(t, err) - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) type test struct { body oapi.CreateProjectJSONRequestBody @@ -136,7 +156,7 @@ func TestListProjects(t *testing.T) { require.NoError(t, err) } - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) res := httptest.NewRecorder() req := httptest.NewRequest("GET", "/api/admin/projects", nil) @@ -198,7 +218,7 @@ func TestGetProject(t *testing.T) { err = projectStore.AddProjectAdmin(ctx, projectID, adminID, "admin") require.NoError(t, err) - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) res := httptest.NewRecorder() req := httptest.NewRequest("GET", "/api/admin/projects/"+projectID.String(), nil) @@ -253,7 +273,7 @@ func TestUpdateProject(t *testing.T) { err = projectStore.AddProjectAdmin(ctx, projectID, adminID, "admin") require.NoError(t, err) - projects := NewProjectsController(logger, mgmt, usrs, jrny, nil) + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil) type test struct { body oapi.UpdateProjectJSONRequestBody @@ -349,7 +369,7 @@ func TestCreateProjectWebhook(t *testing.T) { ProjectCreatedURL: webhookServer.URL, }) - projects := NewProjectsController(logger, mgmt, usrs, jrny, caller) + projects := NewProjectsController(logger, mgmt, usrs, jrny, caller, nil) body := oapi.CreateProjectJSONRequestBody{ Name: "Webhook Test Project", @@ -378,3 +398,53 @@ func TestCreateProjectWebhook(t *testing.T) { require.Equal(t, orgID, receivedEvent.Project.OrganizationId) require.NotEqual(t, uuid.Nil, receivedEvent.Project.Id) } + +func TestCreateProjectPublishesNATSEvent(t *testing.T) { + t.Parallel() + + logger := zaptest.NewLogger(t) + ctx := t.Context() + mgmt, usrs, jrny := teststore.RunPostgreSQL(t) + + orgs := management.NewOrganizationsStore(mgmt) + orgID, err := orgs.CreateOrganization(ctx, "Test Organization") + require.NoError(t, err) + + pub := &recordingPublisher{} + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, pub) + + body := oapi.CreateProjectJSONRequestBody{ + Name: "NATS Test Project", + Timezone: "America/New_York", + Locale: "en-US", + } + + bb, err := json.Marshal(body) + require.NoError(t, err) + + res := httptest.NewRecorder() + req := httptest.NewRequest("POST", "/api/admin/projects", bytes.NewReader(bb)) + + claimAdmin := &rbac.Scope{ + OrganizationID: orgID, + } + req = req.WithContext(rbac.WithScope(req.Context(), claimAdmin)) + + projects.CreateProject(res, req) + + require.Equal(t, http.StatusCreated, res.Code, res.Body.String()) + require.Len(t, pub.messages, 1, "expected one NATS event to be published") + + expectedSubject := schemas.ProjectEventsProcess(orgID) + require.Equal(t, expectedSubject, pub.messages[0].subject) + + var event schemas.ProjectEvent + err = json.Unmarshal(pub.messages[0].data, &event) + require.NoError(t, err) + require.Equal(t, schemas.EventProjectCreated, event.Name) + require.Equal(t, orgID, event.OrganizationID) + require.NotEqual(t, uuid.Nil, event.ID) + require.Equal(t, "NATS Test Project", event.Data["name"]) + require.Equal(t, "America/New_York", event.Data["timezone"]) + require.Equal(t, "en-US", event.Data["locale"]) +} diff --git a/internal/pubsub/consumer/bootstrap.go b/internal/pubsub/consumer/bootstrap.go index ad69a1f0..b80c5766 100644 --- a/internal/pubsub/consumer/bootstrap.go +++ b/internal/pubsub/consumer/bootstrap.go @@ -206,6 +206,23 @@ func Bootstrap(ctx graceful.Context, logger *zap.Logger, jet jetstream.JetStream MaxDeliver: 5, }) + bootstrap.EnsureStream(ctx, jetstream.StreamConfig{ + Name: ns.Stream(StreamProjects), + Description: "Project event processing", + Subjects: []string{ns.Subject("projects.events.>")}, + Discard: jetstream.DiscardOld, + MaxAge: 24 * time.Hour, + Replicas: 1, + }) + + bootstrap.EnsureConsumer(ctx, ns.Stream(StreamProjects), jetstream.ConsumerConfig{ + Name: ns.Consumer(ConsumerProjectEventsProcess), + Description: "Processes incoming project events", + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: ns.Subject("projects.events.>"), + MaxDeliver: 5, + }) + return bootstrap.Error() } diff --git a/internal/pubsub/consumer/consumer.go b/internal/pubsub/consumer/consumer.go index b1bd3438..8757b19a 100644 --- a/internal/pubsub/consumer/consumer.go +++ b/internal/pubsub/consumer/consumer.go @@ -24,6 +24,7 @@ const ( StreamOrganizationUsers = "organizations-users" StreamOrganizationEvents = "organizations-events" StreamActions = "actions" + StreamProjects = "projects" ) // Subscription subjects for NATS core subscribers. @@ -47,6 +48,7 @@ const ( ConsumerOrganizationEventsProcess = "organizations-events-process" ConsumerOrganizationEventsSchema = "organizations-events-schema" ConsumerActionsSchema = "actions-schema" + ConsumerProjectEventsProcess = "projects-events-process" ) // Serve starts all JetStream consumers and registers their handlers. diff --git a/internal/pubsub/schemas/events.go b/internal/pubsub/schemas/events.go index a0854c01..488881bb 100644 --- a/internal/pubsub/schemas/events.go +++ b/internal/pubsub/schemas/events.go @@ -20,6 +20,8 @@ const ( EventOrganizationUserAdded = "organization.user.added" EventOrganizationUserUpdated = "organization.user.updated" EventOrganizationUserRemoved = "organization.user.removed" + + EventProjectCreated = "project.created" ) // UserEvent represents a tracked event with associated user and project information. @@ -243,3 +245,16 @@ type ActionSchema struct { func ActionsSchema(projectID uuid.UUID) Subject { return Subject(fmt.Sprintf("actions.schema.%s", projectID)) } + +// ProjectEvent represents an event that occurs on a project. +type ProjectEvent struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` + OrganizationID uuid.UUID `json:"organization_id"` + Data map[string]any `json:"data"` +} + +// ProjectEventsProcess returns the NATS subject for project event processing. +func ProjectEventsProcess(organizationID uuid.UUID) Subject { + return Subject(fmt.Sprintf("projects.events.%s", organizationID)) +} From 09899e5946872b21171f0ebd13b07daf58db92c1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 10:59:50 +0000 Subject: [PATCH 3/6] Make NATS event publish non-blocking on failure The project is already committed to the database when the NATS event is published, so a publish failure should not fail the HTTP response. Co-authored-by: jeroenrinzema <3440116+jeroenrinzema@users.noreply.github.com> --- internal/http/controllers/v1/management/projects.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/http/controllers/v1/management/projects.go b/internal/http/controllers/v1/management/projects.go index 1a019a8d..347d245a 100644 --- a/internal/http/controllers/v1/management/projects.go +++ b/internal/http/controllers/v1/management/projects.go @@ -287,8 +287,6 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ }) if err != nil { logger.Error("failed to publish project created event", zap.Error(err)) - oapi.WriteProblem(w, err) - return } } From b84949221916f188d76e765c4d17821e320216a9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:01:43 +0000 Subject: [PATCH 4/6] Publish project.created NATS event on project creation Co-authored-by: jeroenrinzema <3440116+jeroenrinzema@users.noreply.github.com> --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 5544e516..2969e7a0 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( go.opentelemetry.io/otel v1.38.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.18.0 + golang.org/x/text v0.30.0 ) require ( @@ -309,7 +310,6 @@ require ( golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.28.0 // indirect golang.org/x/sys v0.38.0 // indirect - golang.org/x/text v0.30.0 // indirect golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.37.0 // indirect golang.org/x/tools/go/expect v0.1.1-deprecated // indirect From dd26baa6da3ded015205ce157ab46ffd6f14dfb7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:48:04 +0000 Subject: [PATCH 5/6] Move NATS publish before tx.Commit so failures rollback the transaction The publish now happens inside the transaction scope. If it fails, the handler returns an error and the deferred tx.Rollback() cleans up. Added TestCreateProjectRollbackOnPublishFailure to verify this behavior. Co-authored-by: jeroenrinzema <3440116+jeroenrinzema@users.noreply.github.com> --- .../controllers/v1/management/projects.go | 34 +++++++------ .../v1/management/projects_test.go | 50 +++++++++++++++++++ 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/internal/http/controllers/v1/management/projects.go b/internal/http/controllers/v1/management/projects.go index 347d245a..d67e48fb 100644 --- a/internal/http/controllers/v1/management/projects.go +++ b/internal/http/controllers/v1/management/projects.go @@ -243,6 +243,24 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ return } + if srv.pub != nil { + err = srv.pub.Publish(ctx, schemas.ProjectEventsProcess(scope.OrganizationID), schemas.ProjectEvent{ + ID: projectID, + Name: schemas.EventProjectCreated, + OrganizationID: scope.OrganizationID, + Data: map[string]any{ + "name": body.Name, + "timezone": body.Timezone, + "locale": body.Locale, + }, + }) + if err != nil { + logger.Error("failed to publish project created event", zap.Error(err)) + oapi.WriteProblem(w, err) + return + } + } + err = tx.Commit() if err != nil { logger.Error("unexpected error while attempting to commit transaction", zap.Error(err)) @@ -274,22 +292,6 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ return } - if srv.pub != nil { - err = srv.pub.Publish(ctx, schemas.ProjectEventsProcess(*project.OrganizationID), schemas.ProjectEvent{ - ID: project.ID, - Name: schemas.EventProjectCreated, - OrganizationID: *project.OrganizationID, - Data: map[string]any{ - "name": project.Name, - "timezone": project.Timezone, - "locale": project.Locale, - }, - }) - if err != nil { - logger.Error("failed to publish project created event", zap.Error(err)) - } - } - json.Write(w, http.StatusCreated, project.OAPI()) } diff --git a/internal/http/controllers/v1/management/projects_test.go b/internal/http/controllers/v1/management/projects_test.go index 29437e57..7999f502 100644 --- a/internal/http/controllers/v1/management/projects_test.go +++ b/internal/http/controllers/v1/management/projects_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "net/http" "net/http/httptest" "sync/atomic" @@ -15,6 +16,7 @@ import ( "github.com/lunogram/platform/internal/http/controllers/v1/management/oapi" "github.com/lunogram/platform/internal/pubsub/schemas" + "github.com/lunogram/platform/internal/store" "github.com/lunogram/platform/internal/store/management" teststore "github.com/lunogram/platform/internal/store/test" "github.com/lunogram/platform/internal/webhook" @@ -41,6 +43,12 @@ func (r *recordingPublisher) Publish(_ context.Context, subject schemas.Subject, return nil } +type failingPublisher struct{} + +func (f *failingPublisher) Publish(_ context.Context, _ schemas.Subject, _ any) error { + return errors.New("publish failed") +} + func TestCreateProject(t *testing.T) { t.Parallel() @@ -448,3 +456,45 @@ func TestCreateProjectPublishesNATSEvent(t *testing.T) { require.Equal(t, "America/New_York", event.Data["timezone"]) require.Equal(t, "en-US", event.Data["locale"]) } + +func TestCreateProjectRollbackOnPublishFailure(t *testing.T) { + t.Parallel() + + logger := zaptest.NewLogger(t) + ctx := t.Context() + mgmt, usrs, jrny := teststore.RunPostgreSQL(t) + + orgs := management.NewOrganizationsStore(mgmt) + orgID, err := orgs.CreateOrganization(ctx, "Test Organization") + require.NoError(t, err) + + pub := &failingPublisher{} + projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, pub) + + body := oapi.CreateProjectJSONRequestBody{ + Name: "Rollback Test Project", + Timezone: "America/New_York", + Locale: "en-US", + } + + bb, err := json.Marshal(body) + require.NoError(t, err) + + res := httptest.NewRecorder() + req := httptest.NewRequest("POST", "/api/admin/projects", bytes.NewReader(bb)) + + claimAdmin := &rbac.Scope{ + OrganizationID: orgID, + } + req = req.WithContext(rbac.WithScope(req.Context(), claimAdmin)) + + projects.CreateProject(res, req) + + require.Equal(t, http.StatusInternalServerError, res.Code, "request should fail when publish fails") + + // Verify the project was not persisted (transaction was rolled back) + mgmtState := management.NewState(mgmt) + _, total, err := mgmtState.ListProjects(ctx, orgID, store.Pagination{Limit: 10, Offset: 0}, "") + require.NoError(t, err) + require.Equal(t, 0, total, "project should not exist after rollback") +} From 02bb262bd24e8a13d16a4d728e21fc71c64581bf Mon Sep 17 00:00:00 2001 From: Jeroen Rinzema Date: Fri, 6 Mar 2026 13:47:51 +0100 Subject: [PATCH 6/6] feat: include more data in project.created event --- internal/http/controllers/v1/management/projects.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/http/controllers/v1/management/projects.go b/internal/http/controllers/v1/management/projects.go index d67e48fb..9f7aab68 100644 --- a/internal/http/controllers/v1/management/projects.go +++ b/internal/http/controllers/v1/management/projects.go @@ -249,9 +249,11 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ Name: schemas.EventProjectCreated, OrganizationID: scope.OrganizationID, Data: map[string]any{ - "name": body.Name, - "timezone": body.Timezone, - "locale": body.Locale, + "id": projectID, + "organization_id": scope.OrganizationID, + "name": body.Name, + "timezone": body.Timezone, + "locale": body.Locale, }, }) if err != nil {