From c0c16c9bb2d24cb458857eaccc5354c57ba71112 Mon Sep 17 00:00:00 2001 From: Jeroen Rinzema Date: Tue, 26 May 2026 15:45:03 +0200 Subject: [PATCH] feat: add inbox messaging system for organizations and users Introduce a full inbox feature allowing messages to be sent, scheduled, and managed for both organizations and users. Includes client and management API endpoints, pubsub consumers for async delivery across push (APNS, FCM, WebPush), email, and SMS channels, a scheduler for timed messages, RBAC permissions, database migrations, and a console UI for viewing inbox messages. --- console/public/locales/en.json | 37 +- console/src/components/inbox-detail-table.tsx | 980 +++++ console/src/oapi/management.generated.ts | 773 +++- .../organizations/OrganizationDetail.tsx | 2 + .../organizations/OrganizationDetailInbox.tsx | 9 + console/src/views/router.tsx | 10 + console/src/views/users/UserDetail.tsx | 2 + console/src/views/users/UserDetailInbox.tsx | 9 + .../scheduler/broadcasts_enterprise.go | 2 +- internal/cluster/scheduler/inbox_messages.go | 99 + .../cluster/scheduler/journey_resumptions.go | 2 +- .../cluster/scheduler/list_recomputation.go | 2 +- .../organization_scheduled_events.go | 4 +- internal/cluster/scheduler/scheduler.go | 43 +- .../scheduler/user_scheduled_events.go | 4 +- internal/config/config.go | 12 +- .../http/controllers/v1/client/controller.go | 2 + .../http/controllers/v1/client/devices.go | 18 +- internal/http/controllers/v1/client/events.go | 36 +- internal/http/controllers/v1/client/inbox.go | 351 ++ .../http/controllers/v1/client/inbox_test.go | 253 ++ .../v1/client/oapi/external_ids.go | 5 + .../http/controllers/v1/client/oapi/inbox.go | 79 + .../controllers/v1/client/oapi/resources.yml | 623 +++ .../v1/client/oapi/resources_gen.go | 3568 +++++++++++++--- .../controllers/v1/client/organizations.go | 73 +- .../http/controllers/v1/client/scheduled.go | 72 +- internal/http/controllers/v1/client/users.go | 37 +- .../controllers/v1/management/apikeys_test.go | 15 +- .../v1/management/broadcasts_enterprise.go | 32 +- .../management/broadcasts_enterprise_test.go | 9 +- .../v1/management/campaigns_test.go | 7 +- .../controllers/v1/management/controller.go | 2 +- .../v1/management/documents_test.go | 21 +- .../controllers/v1/management/http_test.go | 4 - .../http/controllers/v1/management/inbox.go | 50 + .../controllers/v1/management/lists_test.go | 23 +- .../v1/management/oapi/resources.yml | 931 ++++- .../v1/management/oapi/resources_gen.go | 3717 ++++++++++++++++- .../v1/management/organizations.go | 346 +- .../v1/management/organizations_test.go | 61 +- .../controllers/v1/management/projects.go | 9 +- .../v1/management/projects_test.go | 7 +- .../v1/management/providers_test.go | 7 +- .../v1/management/scheduled_test.go | 3 +- .../v1/management/sender_identities_test.go | 33 +- .../v1/management/subscriptions_test.go | 5 +- .../controllers/v1/management/tags_test.go | 15 +- .../controllers/v1/management/templates.go | 5 +- .../v1/management/templates_test.go | 5 +- .../http/controllers/v1/management/users.go | 325 +- .../controllers/v1/management/users_test.go | 39 +- internal/http/controllers/v1/webhooks.go | 30 +- internal/http/params/params.go | 24 + internal/journeys/campaign.go | 10 +- internal/journeys/delay_test.go | 17 +- internal/journeys/gate_test.go | 49 +- internal/journeys/journeys_test.go | 4 - internal/providers/channels/email_test.go | 17 +- internal/providers/channels/sms_test.go | 13 +- internal/ptr/ptr.go | 25 + internal/pubsub/README.md | 47 +- internal/pubsub/consumer/bootstrap.go | 114 +- internal/pubsub/consumer/campaigns.go | 609 +-- internal/pubsub/consumer/campaigns_render.go | 420 ++ internal/pubsub/consumer/campaigns_test.go | 156 + internal/pubsub/consumer/consumer.go | 25 +- internal/pubsub/consumer/inbox.go | 300 ++ internal/pubsub/consumer/inbox_broadcast.go | 79 + .../pubsub/consumer/inbox_organization.go | 370 ++ internal/pubsub/consumer/inbox_test.go | 95 + internal/pubsub/consumer/inbox_user.go | 417 ++ internal/pubsub/schemas/events.go | 192 +- internal/pubsub/schemas/events_origin_test.go | 96 + internal/rbac/engine.go | 43 +- internal/rbac/model.go | 1 + internal/store/journey/journeys.go | 31 +- internal/store/journey/journeys_test.go | 29 +- internal/store/management/admins_test.go | 7 +- internal/store/management/broadcasts.go | 64 +- internal/store/management/campaigns.go | 10 +- .../1764116037_broadcast_sent.down.sql | 1 + .../1764116037_broadcast_sent.up.sql | 1 + internal/store/management/projects_test.go | 7 +- internal/store/management/store_test.go | 4 - internal/store/store.go | 11 + internal/store/subjects/campaign_sends.go | 144 - .../store/subjects/campaign_sends_test.go | 140 - internal/store/subjects/devices.go | 7 +- internal/store/subjects/inbox.go | 1098 +++++ internal/store/subjects/inbox_test.go | 213 + internal/store/subjects/lists.go | 14 +- internal/store/subjects/lists_test.go | 7 +- .../1784400000_inbox_messages.down.sql | 9 + .../1784400000_inbox_messages.up.sql | 149 + internal/store/subjects/organizations_test.go | 29 +- internal/store/subjects/scheduled.go | 104 +- internal/store/subjects/scheduled_test.go | 15 +- internal/store/subjects/store.go | 8 +- internal/store/subjects/users_test.go | 49 +- internal/wasm/test/provider.wasm | Bin 600545 -> 614434 bytes modules/apns/main.go | 45 +- modules/apns/payload/payload.go | 51 + modules/apns/payload/payload_test.go | 129 + modules/fcm/main.go | 68 +- modules/fcm/payload/payload.go | 89 + modules/fcm/payload/payload_test.go | 113 + modules/logger/main.go | 8 +- modules/resend/main.go | 17 +- modules/resend/main_test.go | 111 +- modules/resend/resend.go | 54 +- modules/twilio/main.go | 19 +- modules/twilio/provider/provider.go | 58 + modules/twilio/provider/provider_test.go | 130 + modules/webpush/main.go | 22 +- modules/webpush/payload/payload.go | 46 + modules/webpush/payload/payload_test.go | 105 + pkg/modules/channel.go | 1 + pkg/modules/providers/channel.go | 3 +- pkg/modules/providers/request.go | 12 + pkg/modules/providers/webhook.go | 15 +- 121 files changed, 16823 insertions(+), 2355 deletions(-) create mode 100644 console/src/components/inbox-detail-table.tsx create mode 100644 console/src/views/organizations/OrganizationDetailInbox.tsx create mode 100644 console/src/views/users/UserDetailInbox.tsx create mode 100644 internal/cluster/scheduler/inbox_messages.go create mode 100644 internal/http/controllers/v1/client/inbox.go create mode 100644 internal/http/controllers/v1/client/inbox_test.go create mode 100644 internal/http/controllers/v1/client/oapi/inbox.go create mode 100644 internal/http/controllers/v1/management/inbox.go create mode 100644 internal/http/params/params.go create mode 100644 internal/ptr/ptr.go create mode 100644 internal/pubsub/consumer/campaigns_render.go create mode 100644 internal/pubsub/consumer/inbox.go create mode 100644 internal/pubsub/consumer/inbox_broadcast.go create mode 100644 internal/pubsub/consumer/inbox_organization.go create mode 100644 internal/pubsub/consumer/inbox_test.go create mode 100644 internal/pubsub/consumer/inbox_user.go create mode 100644 internal/pubsub/schemas/events_origin_test.go create mode 100644 internal/store/management/migrations/1764116037_broadcast_sent.down.sql create mode 100644 internal/store/management/migrations/1764116037_broadcast_sent.up.sql delete mode 100644 internal/store/subjects/campaign_sends.go delete mode 100644 internal/store/subjects/campaign_sends_test.go create mode 100644 internal/store/subjects/inbox.go create mode 100644 internal/store/subjects/inbox_test.go create mode 100644 internal/store/subjects/migrations/1784400000_inbox_messages.down.sql create mode 100644 internal/store/subjects/migrations/1784400000_inbox_messages.up.sql create mode 100644 modules/apns/payload/payload.go create mode 100644 modules/apns/payload/payload_test.go create mode 100644 modules/fcm/payload/payload.go create mode 100644 modules/fcm/payload/payload_test.go create mode 100644 modules/webpush/payload/payload.go create mode 100644 modules/webpush/payload/payload_test.go diff --git a/console/public/locales/en.json b/console/public/locales/en.json index f178c847b..6d9d64852 100644 --- a/console/public/locales/en.json +++ b/console/public/locales/en.json @@ -560,7 +560,7 @@ "onboarding_project-getting-started_journey": "Journeys automate your user flows with scheduled or event-based messages.", "onboarding_project-getting-started_campaign": "Campaigns handle one-time sends, or plug into journeys to make them even smarter.", "open_rate": "Open Rate", - "opened_at": "Opened At", + "opened_at": "Read At", "options": "Options", "organizations": "Organizations", "organization_data": "Organization Data", @@ -704,7 +704,6 @@ "status": "Status", "state": "State", "static": "Static", - "status": "Status", "step_date": "Step Date", "sticky": "Sticky Note", "sticky_desc": "Add a sticky note that will be visible in the journey editor.", @@ -924,5 +923,37 @@ "emptyTitle": "No images yet", "emptyDescription": "Drag and drop images here, or click Upload to get started.", "imageCount": "{{count}} image(s)" - } + }, + "all": "All", + "creating": "Creating...", + "from_address": "From address", + "from_number": "From number", + "inbox_body_placeholder": "Write your message...", + "inbox_channel_help": "Choose how the message is delivered.", + "inbox_from_address_help": "Optional. Overrides the default sender address.", + "inbox_from_number_help": "Optional. Overrides the default sender number.", + "inbox_message_archived": "Message archived", + "inbox_message_create_failed": "Failed to create message", + "inbox_message_created": "Message created", + "inbox_message_opened": "Message marked as read", + "inbox_message_scheduled": "Message scheduled", + "inbox_message_update_failed": "Failed to update message", + "inbox_tags_help": "Comma separated. Duplicates are removed.", + "inbox_tags_placeholder": "billing, onboarding", + "inbox_title_placeholder": "Welcome to Lunogram", + "mark_opened": "Mark as read", + "messages": "messages", + "new_inbox_message": "New inbox message", + "new_message": "New message", + "next_page": "Next page", + "no_inbox_messages": "No inbox messages", + "no_inbox_messages_description": "There are no messages in this inbox yet. Create a new one to get started.", + "open_menu": "Open menu", + "opened": "Read", + "previous_page": "Previous page", + "scheduled_at": "Scheduled at", + "search_inbox": "Search inbox...", + "sender": "Sender", + "sms": "SMS", + "unread": "Unread" } diff --git a/console/src/components/inbox-detail-table.tsx b/console/src/components/inbox-detail-table.tsx new file mode 100644 index 000000000..ab3df6e1d --- /dev/null +++ b/console/src/components/inbox-detail-table.tsx @@ -0,0 +1,980 @@ +import { useCallback, useContext, useRef, useState } from "react" +import { useTranslation } from "react-i18next" +import type { TFunction } from "i18next" +import { + Archive, + ArchiveRestore, + Bell, + CalendarClock, + Check, + ChevronLeft, + ChevronRight, + EyeOff, + Inbox, + Info, + Mail, + MessageSquare, + MoreHorizontal, + Plus, + Search, +} from "lucide-react" +import { toast } from "sonner" +import { ProjectContext } from "../contexts" +import { PreferencesContext } from "@/contexts/PreferencesContext" +import { useResolver } from "../hooks" +import { formatDate, getPageNumbers } from "../utils" +import oapiClient from "../oapi/client" +import type { components } from "../oapi/management.generated" + +import { Badge } from "@/components/ui/badge" +import { Button } from "@/components/ui/button" +import { SenderIdentityCombobox } from "@/components/sender-identity-combobox" +import { Input } from "@/components/ui/input" +import { Label } from "@/components/ui/label" +import { Skeleton } from "@/components/ui/skeleton" +import { Textarea } from "@/components/ui/textarea" +import { DateTimeEdit } from "@/components/ui/datetime-edit" +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog" +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from "@/components/ui/dropdown-menu" +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select" +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "@/components/ui/table" + +type InboxMessage = components["schemas"]["InboxMessage"] +type InboxStatus = "all" | "unread" | "opened" | "archived" +type InboxChannel = components["schemas"]["Channel"] + +interface InboxDetailTableProps { + subjectId: string + subjectType: "users" | "organizations" +} + +const limit = 15 + +export default function InboxDetailTable({ subjectId, subjectType }: InboxDetailTableProps) { + const { t } = useTranslation() + const [project] = useContext(ProjectContext) + const [preferences] = useContext(PreferencesContext) + const [page, setPage] = useState(1) + const [status, setStatus] = useState("all") + const [searchQuery, setSearchQuery] = useState("") + const [debouncedQuery, setDebouncedQuery] = useState("") + const [isCreateOpen, setIsCreateOpen] = useState(false) + const [isCreating, setIsCreating] = useState(false) + const [channel, setChannel] = useState("inbox") + const [senderIdentityId, setSenderIdentityId] = useState("") + const [title, setTitle] = useState("") + const [body, setBody] = useState("") + const [tags, setTags] = useState("") + const [scheduledAt, setScheduledAt] = useState("") + const searchTimeoutRef = useRef>() + + const handleSearch = (value: string) => { + setSearchQuery(value) + setPage(1) + clearTimeout(searchTimeoutRef.current) + searchTimeoutRef.current = setTimeout(() => { + setDebouncedQuery(value) + }, 300) + } + + const resetForm = () => { + setChannel("inbox") + setSenderIdentityId("") + setTitle("") + setBody("") + setTags("") + setScheduledAt("") + } + + const [result, , reload] = useResolver( + useCallback(async () => { + const params = { + limit, + offset: (page - 1) * limit, + search: debouncedQuery || undefined, + status: status === "all" ? undefined : status, + include_archived: status === "all" ? true : undefined, + include_scheduled: true, + } + + if (subjectType === "users") { + const { data, error } = await oapiClient.GET( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox", + { + params: { + path: { projectID: project.id, userID: subjectId }, + query: params, + }, + }, + ) + if (error) throw error + return data ?? { results: [], total: 0 } + } + + const { data, error } = await oapiClient.GET( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox", + { + params: { + path: { projectID: project.id, organizationID: subjectId }, + query: params, + }, + }, + ) + if (error) throw error + return data ?? { results: [], total: 0 } + }, [page, project.id, debouncedQuery, status, subjectId, subjectType]), + ) + + const createMessage = async () => { + if (!title.trim()) return + if ((channel === "email" || channel === "sms") && !senderIdentityId) return + + setIsCreating(true) + try { + const dedupedTags = Array.from( + new Set( + tags + .split(",") + .map((tag) => tag.trim()) + .filter(Boolean), + ), + ) + + const payload = { + channel, + sender_identity_id: channel === "push" || channel === "inbox" ? undefined : senderIdentityId, + content: { + title: title.trim(), + body: body.trim() || undefined, + }, + tags: dedupedTags, + scheduled_at: scheduledAt || undefined, + } + + if (subjectType === "users") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox", + { + params: { path: { projectID: project.id, userID: subjectId } }, + body: payload, + }, + ) + if (error) throw error + } else { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox", + { + params: { path: { projectID: project.id, organizationID: subjectId } }, + body: payload, + }, + ) + if (error) throw error + } + + toast.success(t("inbox_message_created", "Inbox message created")) + resetForm() + setIsCreateOpen(false) + await reload() + } catch { + toast.error(t("inbox_message_create_failed", "Failed to create inbox message")) + } finally { + setIsCreating(false) + } + } + + const updateMessage = async ( + message: InboxMessage, + event: "opened" | "archived" | "scheduled" | "unarchived" | "unread", + newScheduledAt?: string, + ) => { + try { + if (subjectType === "users") { + if (event === "scheduled") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox/{messageID}/schedule", + { + params: { + path: { + projectID: project.id, + userID: subjectId, + messageID: message.id, + }, + }, + body: { scheduled_at: newScheduledAt ?? "" }, + }, + ) + if (error) throw error + } else if (event === "opened") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox/{messageID}/open", + { + params: { + path: { + projectID: project.id, + userID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } else if (event === "archived") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox/{messageID}/archive", + { + params: { + path: { + projectID: project.id, + userID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } else if (event === "unarchived") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox/{messageID}/unarchive", + { + params: { + path: { + projectID: project.id, + userID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } else if (event === "unread") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/users/{userID}/inbox/{messageID}/unread", + { + params: { + path: { + projectID: project.id, + userID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } + } else { + if (event === "scheduled") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox/{messageID}/schedule", + { + params: { + path: { + projectID: project.id, + organizationID: subjectId, + messageID: message.id, + }, + }, + body: { scheduled_at: newScheduledAt ?? "" }, + }, + ) + if (error) throw error + } else if (event === "opened") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox/{messageID}/open", + { + params: { + path: { + projectID: project.id, + organizationID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } else if (event === "archived") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox/{messageID}/archive", + { + params: { + path: { + projectID: project.id, + organizationID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } else if (event === "unarchived") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox/{messageID}/unarchive", + { + params: { + path: { + projectID: project.id, + organizationID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } else if (event === "unread") { + const { error } = await oapiClient.POST( + "/api/admin/projects/{projectID}/subjects/organizations/{organizationID}/inbox/{messageID}/unread", + { + params: { + path: { + projectID: project.id, + organizationID: subjectId, + messageID: message.id, + }, + }, + }, + ) + if (error) throw error + } + } + + toast.success( + event === "scheduled" + ? t("inbox_message_scheduled", "Message schedule updated") + : event === "archived" + ? t("inbox_message_archived", "Message archived") + : event === "unarchived" + ? t("inbox_message_unarchived", "Message unarchived") + : event === "unread" + ? t("inbox_message_unread", "Message marked unread") + : t("inbox_message_opened", "Message marked opened"), + ) + await reload() + } catch { + toast.error(t("inbox_message_update_failed", "Failed to update inbox message")) + } + } + + const total = result?.total ?? 0 + const totalPages = result ? Math.ceil(total / limit) : 0 + const hasPrevPage = page > 1 + const hasNextPage = page < totalPages + + return ( +
+ {/* Search + Status + Create */} +
+
+ +
+
+ + + +
+
+ + {/* Inbox Table */} +
+ + + + {t("message", "Message")} + {t("channel", "Channel")} + {t("status", "Status")} + {t("tags", "Tags")} + {t("scheduled", "Scheduled")} + + + + + {result === null ? ( + Array.from({ length: 5 }).map((_, index) => ( + + +
+ + +
+
+ + + + + + + + + + + + + + + +
+ )) + ) : result.results.length === 0 ? ( + + +
+
+
+

+ {t("no_inbox_messages", "No inbox messages yet")} +

+

+ {t( + "no_inbox_messages_description", + "Inbox messages will appear here when they are created", + )} +

+
+
+
+ ) : ( + result.results.map((message) => { + const visible = isMessageVisible(message) + const channelMeta = getChannelMeta(message.channel, t) + const ChannelIcon = channelMeta.icon + const messageTitle = + typeof message.content?.title === "string" + ? message.content.title + : "" + const messageBody = + typeof message.content?.body === "string" + ? message.content.body + : "" + + return ( + + +
+
{messageTitle}
+ {messageBody && ( +
+ {messageBody} +
+ )} +
+
+ + + + + {statusBadge(message, t)} + + {message.tags.length > 0 ? ( +
+ {message.tags.map((tag) => ( + + {tag} + + ))} +
+ ) : ( + + )} +
+ + {new Date(message.scheduled_at) > new Date() ? ( + + updateMessage(message, "scheduled", newIso) + } + > + + + + ) : ( + + + )} + + + + + + + + {!message.opened_at && + !message.archived_at && + visible && ( + + updateMessage( + message, + "opened", + ).catch(console.error) + } + > + + )} + {!message.archived_at && visible && ( + + updateMessage( + message, + "archived", + ).catch(console.error) + } + > + + )} + {message.archived_at && visible && ( + + updateMessage( + message, + "unarchived", + ).catch(console.error) + } + > + + )} + {message.opened_at && + !message.archived_at && + visible && ( + + updateMessage( + message, + "unread", + ).catch(console.error) + } + > + + )} + + + +
+ ) + }) + )} +
+
+ + {/* Pagination */} +
+

+ {total} {t("messages", "messages")} +

+ {totalPages > 1 && ( +
+ + + {getPageNumbers(page, totalPages).map((pageNum, idx) => + pageNum === "..." ? ( + + ... + + ) : ( + + ), + )} + + +
+ )} +
+
+ + {/* Create Inbox Message Dialog */} + { + setIsCreateOpen(open) + if (!open) resetForm() + }} + > + + + {t("new_inbox_message", "New inbox message")} + + {t( + "new_inbox_message_description", + "Create a message that appears in this subject inbox.", + )} + + +
+
+
+ + +

+ {t( + "inbox_channel_help", + "Delivery channel the message represents.", + )} +

+
+
+ {channel === "push" || channel === "inbox" ? ( + <> + +
+
+ + ) : ( + <> + + +

+ {channel === "email" + ? t( + "inbox_from_address_help", + "Verified sender to use as the from address.", + ) + : t( + "inbox_from_number_help", + "Verified sender number to use.", + )} +

+ + )} +
+
+ +
+ + setTitle(event.target.value)} + placeholder={ + channel === "email" + ? t("inbox_subject_placeholder", "Message subject") + : t("inbox_title_placeholder", "Message title") + } + /> +
+ +
+ +