Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
go.opentelemetry.io/otel v1.38.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.18.0
golang.org/x/text v0.30.0
)

require (
Expand Down Expand Up @@ -309,7 +310,6 @@ require (
golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.37.0 // indirect
golang.org/x/tools/go/expect v0.1.1-deprecated // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/http/controllers/v1/management/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB
webhookCaller := webhook.NewCaller(logger.Named("webhook"), cfg.Webhook)

controller := &Controller{
ProjectsController: NewProjectsController(logger, managementDB, usersDB, journeyDB, webhookCaller),
ProjectsController: NewProjectsController(logger, managementDB, usersDB, journeyDB, webhookCaller, pub),
CampaignsController: NewCampaignsController(logger, managementDB, usersDB),
TemplatesController: NewTemplatesController(logger, managementDB),
AdminsController: NewAdminsController(logger, managementDB),
Expand Down
26 changes: 25 additions & 1 deletion internal/http/controllers/v1/management/projects.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/lunogram/platform/internal/http/controllers/v1/management/oapi"
"github.com/lunogram/platform/internal/http/json"
"github.com/lunogram/platform/internal/http/problem"
"github.com/lunogram/platform/internal/pubsub"
"github.com/lunogram/platform/internal/pubsub/schemas"
"github.com/lunogram/platform/internal/store"
"github.com/lunogram/platform/internal/store/journey"
"github.com/lunogram/platform/internal/store/management"
Expand All @@ -25,14 +27,15 @@ import (
"golang.org/x/text/language/display"
)

func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB, webhookCaller *webhook.Caller) *ProjectsController {
func NewProjectsController(logger *zap.Logger, managementDB, usersDB, journeyDB *sqlx.DB, webhookCaller *webhook.Caller, pub pubsub.Publisher) *ProjectsController {
return &ProjectsController{
logger: logger,
managementDB: managementDB,
store: management.NewState(managementDB),
journey: journey.NewState(journeyDB),
users: subjects.NewState(usersDB),
webhook: webhookCaller,
pub: pub,
}
}

Expand All @@ -43,6 +46,7 @@ type ProjectsController struct {
journey *journey.State
users *subjects.State
webhook *webhook.Caller
pub pubsub.Publisher
}

func (srv *ProjectsController) loadProjectCounts(ctx context.Context, project *management.Project) {
Expand Down Expand Up @@ -239,6 +243,26 @@ func (srv *ProjectsController) CreateProject(w http.ResponseWriter, r *http.Requ
return
}

if srv.pub != nil {
err = srv.pub.Publish(ctx, schemas.ProjectEventsProcess(scope.OrganizationID), schemas.ProjectEvent{
ID: projectID,
Name: schemas.EventProjectCreated,
OrganizationID: scope.OrganizationID,
Data: map[string]any{
"id": projectID,
"organization_id": scope.OrganizationID,
"name": body.Name,
"timezone": body.Timezone,
"locale": body.Locale,
},
})
if err != nil {
logger.Error("failed to publish project created event", zap.Error(err))
oapi.WriteProblem(w, err)
return
}
}

err = tx.Commit()
if err != nil {
logger.Error("unexpected error while attempting to commit transaction", zap.Error(err))
Expand Down
130 changes: 125 additions & 5 deletions internal/http/controllers/v1/management/projects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package v1

import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"sync/atomic"
Expand All @@ -13,6 +15,8 @@ import (
"github.com/lunogram/platform/internal/config"

"github.com/lunogram/platform/internal/http/controllers/v1/management/oapi"
"github.com/lunogram/platform/internal/pubsub/schemas"
"github.com/lunogram/platform/internal/store"
"github.com/lunogram/platform/internal/store/management"
teststore "github.com/lunogram/platform/internal/store/test"
"github.com/lunogram/platform/internal/webhook"
Expand All @@ -21,6 +25,30 @@ import (
"go.uber.org/zap/zaptest"
)

type publishedMessage struct {
subject schemas.Subject
data []byte
}

type recordingPublisher struct {
messages []publishedMessage
}

func (r *recordingPublisher) Publish(_ context.Context, subject schemas.Subject, v any) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
r.messages = append(r.messages, publishedMessage{subject: subject, data: data})
return nil
}

type failingPublisher struct{}

func (f *failingPublisher) Publish(_ context.Context, _ schemas.Subject, _ any) error {
return errors.New("publish failed")
}

func TestCreateProject(t *testing.T) {
t.Parallel()

Expand All @@ -43,7 +71,7 @@ func TestCreateProject(t *testing.T) {
admin, err := admins.GetAdmin(ctx, adminID)
require.NoError(t, err)

projects := NewProjectsController(logger, mgmt, usrs, jrny, nil)
projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil)

type test struct {
body oapi.CreateProjectJSONRequestBody
Expand Down Expand Up @@ -136,7 +164,7 @@ func TestListProjects(t *testing.T) {
require.NoError(t, err)
}

projects := NewProjectsController(logger, mgmt, usrs, jrny, nil)
projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil)

res := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/api/admin/projects", nil)
Expand Down Expand Up @@ -198,7 +226,7 @@ func TestGetProject(t *testing.T) {
err = projectStore.AddProjectAdmin(ctx, projectID, adminID, "admin")
require.NoError(t, err)

projects := NewProjectsController(logger, mgmt, usrs, jrny, nil)
projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil)

res := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/api/admin/projects/"+projectID.String(), nil)
Expand Down Expand Up @@ -253,7 +281,7 @@ func TestUpdateProject(t *testing.T) {
err = projectStore.AddProjectAdmin(ctx, projectID, adminID, "admin")
require.NoError(t, err)

projects := NewProjectsController(logger, mgmt, usrs, jrny, nil)
projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, nil)

type test struct {
body oapi.UpdateProjectJSONRequestBody
Expand Down Expand Up @@ -349,7 +377,7 @@ func TestCreateProjectWebhook(t *testing.T) {
ProjectCreatedURL: webhookServer.URL,
})

projects := NewProjectsController(logger, mgmt, usrs, jrny, caller)
projects := NewProjectsController(logger, mgmt, usrs, jrny, caller, nil)

body := oapi.CreateProjectJSONRequestBody{
Name: "Webhook Test Project",
Expand Down Expand Up @@ -378,3 +406,95 @@ func TestCreateProjectWebhook(t *testing.T) {
require.Equal(t, orgID, receivedEvent.Project.OrganizationId)
require.NotEqual(t, uuid.Nil, receivedEvent.Project.Id)
}

func TestCreateProjectPublishesNATSEvent(t *testing.T) {
t.Parallel()

logger := zaptest.NewLogger(t)
ctx := t.Context()
mgmt, usrs, jrny := teststore.RunPostgreSQL(t)

orgs := management.NewOrganizationsStore(mgmt)
orgID, err := orgs.CreateOrganization(ctx, "Test Organization")
require.NoError(t, err)

pub := &recordingPublisher{}
projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, pub)

body := oapi.CreateProjectJSONRequestBody{
Name: "NATS Test Project",
Timezone: "America/New_York",
Locale: "en-US",
}

bb, err := json.Marshal(body)
require.NoError(t, err)

res := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/api/admin/projects", bytes.NewReader(bb))

claimAdmin := &rbac.Scope{
OrganizationID: orgID,
}
req = req.WithContext(rbac.WithScope(req.Context(), claimAdmin))

projects.CreateProject(res, req)

require.Equal(t, http.StatusCreated, res.Code, res.Body.String())
require.Len(t, pub.messages, 1, "expected one NATS event to be published")

expectedSubject := schemas.ProjectEventsProcess(orgID)
require.Equal(t, expectedSubject, pub.messages[0].subject)

var event schemas.ProjectEvent
err = json.Unmarshal(pub.messages[0].data, &event)
require.NoError(t, err)
require.Equal(t, schemas.EventProjectCreated, event.Name)
require.Equal(t, orgID, event.OrganizationID)
require.NotEqual(t, uuid.Nil, event.ID)
require.Equal(t, "NATS Test Project", event.Data["name"])
require.Equal(t, "America/New_York", event.Data["timezone"])
require.Equal(t, "en-US", event.Data["locale"])
}

func TestCreateProjectRollbackOnPublishFailure(t *testing.T) {
t.Parallel()

logger := zaptest.NewLogger(t)
ctx := t.Context()
mgmt, usrs, jrny := teststore.RunPostgreSQL(t)

orgs := management.NewOrganizationsStore(mgmt)
orgID, err := orgs.CreateOrganization(ctx, "Test Organization")
require.NoError(t, err)

pub := &failingPublisher{}
projects := NewProjectsController(logger, mgmt, usrs, jrny, nil, pub)

body := oapi.CreateProjectJSONRequestBody{
Name: "Rollback Test Project",
Timezone: "America/New_York",
Locale: "en-US",
}

bb, err := json.Marshal(body)
require.NoError(t, err)

res := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/api/admin/projects", bytes.NewReader(bb))

claimAdmin := &rbac.Scope{
OrganizationID: orgID,
}
req = req.WithContext(rbac.WithScope(req.Context(), claimAdmin))

projects.CreateProject(res, req)

require.Equal(t, http.StatusInternalServerError, res.Code, "request should fail when publish fails")

// Verify the project was not persisted (transaction was rolled back)
mgmtState := management.NewState(mgmt)
_, total, err := mgmtState.ListProjects(ctx, orgID, store.Pagination{Limit: 10, Offset: 0}, "")
require.NoError(t, err)
require.Equal(t, 0, total, "project should not exist after rollback")
}
17 changes: 17 additions & 0 deletions internal/pubsub/consumer/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ func Bootstrap(ctx graceful.Context, logger *zap.Logger, jet jetstream.JetStream
MaxDeliver: 5,
})

bootstrap.EnsureStream(ctx, jetstream.StreamConfig{
Name: ns.Stream(StreamProjects),
Description: "Project event processing",
Subjects: []string{ns.Subject("projects.events.>")},
Discard: jetstream.DiscardOld,
MaxAge: 24 * time.Hour,
Replicas: 1,
})

bootstrap.EnsureConsumer(ctx, ns.Stream(StreamProjects), jetstream.ConsumerConfig{
Name: ns.Consumer(ConsumerProjectEventsProcess),
Description: "Processes incoming project events",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: ns.Subject("projects.events.>"),
MaxDeliver: 5,
})

return bootstrap.Error()
}

Expand Down
2 changes: 2 additions & 0 deletions internal/pubsub/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
StreamOrganizationUsers = "organizations-users"
StreamOrganizationEvents = "organizations-events"
StreamActions = "actions"
StreamProjects = "projects"
)

// Subscription subjects for NATS core subscribers.
Expand All @@ -47,6 +48,7 @@ const (
ConsumerOrganizationEventsProcess = "organizations-events-process"
ConsumerOrganizationEventsSchema = "organizations-events-schema"
ConsumerActionsSchema = "actions-schema"
ConsumerProjectEventsProcess = "projects-events-process"
)

// Serve starts all JetStream consumers and registers their handlers.
Expand Down
15 changes: 15 additions & 0 deletions internal/pubsub/schemas/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
EventOrganizationUserAdded = "organization.user.added"
EventOrganizationUserUpdated = "organization.user.updated"
EventOrganizationUserRemoved = "organization.user.removed"

EventProjectCreated = "project.created"
)

// UserEvent represents a tracked event with associated user and project information.
Expand Down Expand Up @@ -243,3 +245,16 @@ type ActionSchema struct {
func ActionsSchema(projectID uuid.UUID) Subject {
return Subject(fmt.Sprintf("actions.schema.%s", projectID))
}

// ProjectEvent represents an event that occurs on a project.
type ProjectEvent struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
OrganizationID uuid.UUID `json:"organization_id"`
Data map[string]any `json:"data"`
}

// ProjectEventsProcess returns the NATS subject for project event processing.
func ProjectEventsProcess(organizationID uuid.UUID) Subject {
return Subject(fmt.Sprintf("projects.events.%s", organizationID))
}
Loading