Skip to content

Add inbox messaging system for organizations and users#238

Open
jeroenrinzema wants to merge 1 commit into
mainfrom
feat/inbox
Open

Add inbox messaging system for organizations and users#238
jeroenrinzema wants to merge 1 commit into
mainfrom
feat/inbox

Conversation

@jeroenrinzema
Copy link
Copy Markdown
Contributor

Introduce a full inbox feature allowing messages to be sent, scheduled, and managed for both organizations and users. Includes client and management API endpoints, pubsub consumers for async delivery across push (APNS, FCM, WebPush), email, and SMS channels, a scheduler for timed messages, RBAC permissions, database migrations, and a console UI for viewing inbox messages.

@jeroenrinzema jeroenrinzema requested a review from Copilot May 26, 2026 11:24
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Introduce a full inbox feature allowing messages to be sent, scheduled,
and managed for both organizations and users. Includes client and
management API endpoints, pubsub consumers for async delivery across
push (APNS, FCM, WebPush), email, and SMS channels, a scheduler for
timed messages, RBAC permissions, database migrations, and a console UI
for viewing inbox messages.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 118 out of 121 changed files in this pull request and generated 9 comments.

Comment on lines +122 to 127
if event.InboxMessageID == uuid.Nil {
logger.Warn("webhook event has no inbox message ID, skipping",
zap.String("event_name", event.EventName.String()),
)
continue
}
Comment on lines +54 to +56
"target": []map[string]any{{"external_id": "user_123"}},
"channel": "push",
"content": map[string]any{"title": "Hello"},
Comment on lines +769 to 777
var events []DueScheduledEvent
err := s.db.SelectContext(ctx, &events, stmt, limit)
if err != nil {
return 0, err
}
defer rows.Close()

var n int
for rows.Next() {
var event DueScheduledEvent
if err := rows.StructScan(&event); err != nil {
return n, err
}
for n, event := range events {
if err := fn(event); err != nil {
return n, err
Comment on lines +886 to +902
ORDER BY us.scheduled_at ASC
LIMIT $1
FOR UPDATE OF us SKIP LOCKED`

rows, err := s.db.QueryxContext(ctx, stmt)
var schedules []UserSchedule
err := s.db.SelectContext(ctx, &schedules, stmt, limit)
if err != nil {
return 0, err
}
defer rows.Close()

var n int
for rows.Next() {
var us UserSchedule
if err := rows.StructScan(&us); err != nil {
return n, err
}
for n, us := range schedules {
if err := fn(us); err != nil {
return n, err
}
n++
}

return n, rows.Err()
return len(schedules), nil
Comment on lines +1368 to +1384
ORDER BY se.fire_at ASC
LIMIT $1
FOR UPDATE OF se SKIP LOCKED`

rows, err := s.db.QueryxContext(ctx, stmt)
var events []DueOrgScheduledEvent
err := s.db.SelectContext(ctx, &events, stmt, limit)
if err != nil {
return 0, err
}
defer rows.Close()

var n int
for rows.Next() {
var event DueOrgScheduledEvent
if err := rows.StructScan(&event); err != nil {
return n, err
}
for n, event := range events {
if err := fn(event); err != nil {
return n, err
}
n++
}

return n, rows.Err()
return len(events), nil
Comment on lines +1534 to +1550
ORDER BY os.scheduled_at ASC
LIMIT $1
FOR UPDATE OF os SKIP LOCKED`

rows, err := s.db.QueryxContext(ctx, stmt)
var schedules []OrganizationSchedule
err := s.db.SelectContext(ctx, &schedules, stmt, limit)
if err != nil {
return 0, err
}
defer rows.Close()

var n int
for rows.Next() {
var os OrganizationSchedule
if err := rows.StructScan(&os); err != nil {
return n, err
}
for n, os := range schedules {
if err := fn(os); err != nil {
return n, err
}
n++
}

return n, rows.Err()
return len(schedules), nil
Comment on lines 1133 to +1155
query := `
SELECT jus.id, j.project_id, jus.journey_id, jus.user_id, jus.pinned_version_id, jus.occurrence, jus.entered_at, jus.resume_at, jus.completed_at, COALESCE(jus.data, '{}'::jsonb) AS data, jus.updated_at, jus.journey_entry_id, jus.external_step_id
FROM journey_user_state jus
JOIN journeys j ON j.id = jus.journey_id
WHERE jus.completed_at IS NULL
AND jus.resume_at <= NOW()`
AND jus.resume_at <= NOW()
ORDER BY jus.resume_at ASC
LIMIT $1
FOR UPDATE OF jus SKIP LOCKED`

rows, err := s.db.QueryxContext(ctx, query)
var states []JourneyUserState
err := s.db.SelectContext(ctx, &states, query, limit)
if err != nil {
return 0, err
}
defer rows.Close()

var n int
for rows.Next() {
var state JourneyUserState
if err := rows.StructScan(&state); err != nil {
return n, err
}
for n, state := range states {
if err := fn(state); err != nil {
return n, err
}
n++
}

return n, rows.Err()
return len(states), nil
Comment on lines 812 to +836
@@ -822,10 +827,13 @@ func (s *ListsStore) SelectListsDueForTimeReconciliation(ctx context.Context) ([
AND (
l.last_recomputed_at IS NULL
OR l.last_recomputed_at + r.recompute_interval <= NOW()
)`
)
ORDER BY l.last_recomputed_at ASC NULLS FIRST, l.created_at ASC
LIMIT $1
FOR UPDATE OF l SKIP LOCKED`

var results []ListDueForReconciliation
err := s.db.SelectContext(ctx, &results, query)
err := s.db.SelectContext(ctx, &results, query, limit)
Comment thread internal/pubsub/README.md
Comment on lines +25 to +32
Client -->|Send Broadcast| BroadcastSubject["broadcasts.process.{project_id}.{broadcast_id}"]
Client -->|User Inbox Message| UserInboxSubject["users.inbox.process.{project_id}"]
Client -->|User Inbox Opened| UserInboxOpenedSubject["users.inbox.opened.{project_id}"]
Client -->|User Inbox Archived| UserInboxArchivedSubject["users.inbox.archived.{project_id}"]
Client -->|Org Inbox Message| OrgInboxSubject["organizations.inbox.process.{project_id}"]
Client -->|Org Inbox Opened| OrgInboxOpenedSubject["organizations.inbox.opened.{project_id}"]
Client -->|Org Inbox Archived| OrgInboxArchivedSubject["organizations.inbox.archived.{project_id}"]
Client -->|Project Event| ProjectEventSubject["projects.events.{organization_id}"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants