diff --git a/apps/mobile/src/components/home/home-screen.tsx b/apps/mobile/src/components/home/home-screen.tsx index 8975232ffe..cbf474425c 100644 --- a/apps/mobile/src/components/home/home-screen.tsx +++ b/apps/mobile/src/components/home/home-screen.tsx @@ -15,6 +15,7 @@ import { isTransitionalStatus } from '@/components/kiloclaw/status-badge'; import { ProfileAvatarButton } from '@/components/profile-avatar-button'; import { ScreenHeader } from '@/components/screen-header'; import { Skeleton } from '@/components/ui/skeleton'; +import { badgeBucketForInstance } from '@/lib/badge-buckets'; import { useAgentSessions } from '@/lib/hooks/use-agent-sessions'; import { type ClawInstance, useAllKiloClawInstances } from '@/lib/hooks/use-instance-context'; import { useUnreadCounts } from '@/lib/hooks/use-unread-counts'; @@ -79,7 +80,7 @@ export function HomeScreen() { isPending: instancesPending, isError: instancesError, } = useAllKiloClawInstances(pickListPollInterval); - const { byChannel: unreadByChannel } = useUnreadCounts(); + const { byBadgeBucket: unreadByBadgeBucket } = useUnreadCounts(); const { storedSessions, activeSessions, @@ -128,7 +129,7 @@ export function HomeScreen() { {renderKiloClawSlot({ instances: instances ?? [], instancesError, - unreadByChannel, + unreadByBadgeBucket, })} {renderSessionsOrPromo({ @@ -151,7 +152,7 @@ export function HomeScreen() { function renderKiloClawSlot(params: { instances: ClawInstance[]; instancesError: boolean; - unreadByChannel: Map; + unreadByBadgeBucket: Map; }) { if (params.instances.length > 0) { return ( @@ -162,7 +163,9 @@ function renderKiloClawSlot(params: { ))} diff --git a/apps/mobile/src/components/kiloclaw/chat.tsx b/apps/mobile/src/components/kiloclaw/chat.tsx index 626cde8230..e75b60862b 100644 --- a/apps/mobile/src/components/kiloclaw/chat.tsx +++ b/apps/mobile/src/components/kiloclaw/chat.tsx @@ -15,10 +15,15 @@ import { ChatHeader, ChatShell } from '@/components/kiloclaw/chat-shell'; import { useBotOnlineStatus } from '@/components/kiloclaw/chat-hooks'; import { NotificationPrompt } from '@/components/kiloclaw/notification-prompt'; import { useStreamChatTheme } from '@/components/kiloclaw/chat-theme'; +import { badgeBucketForInstance } from '@/lib/badge-buckets'; import { useAppLifecycle } from '@/lib/hooks/use-app-lifecycle'; import { useStreamChatCredentials } from '@/lib/hooks/use-kiloclaw-queries'; import { setLastActiveInstance } from '@/lib/last-active-instance'; -import { parseNotificationData, setActiveChatInstance } from '@/lib/notifications'; +import { + getNotificationSandboxId, + parseNotificationData, + setActiveChatInstance, +} from '@/lib/notifications'; import { useTRPC } from '@/lib/trpc'; type KiloClawChatProps = { @@ -28,7 +33,7 @@ type KiloClawChatProps = { organizationId?: string | null; }; -type UnreadCountsData = { channelId: string; badgeCount: number }[]; +type UnreadCountsData = { badgeBucket: string; badgeCount: number }[]; export function KiloClawChat({ instanceId, @@ -46,11 +51,11 @@ export function KiloClawChat({ const { mutate: markChatRead } = useMutation( trpc.user.markChatRead.mutationOptions({ - onMutate: async ({ channelId }) => { + onMutate: async ({ badgeBucket }) => { await queryClient.cancelQueries({ queryKey: unreadCountsKey }); const previous = queryClient.getQueryData(unreadCountsKey); queryClient.setQueryData(unreadCountsKey, old => - (old ?? []).filter(row => row.channelId !== channelId) + (old ?? []).filter(row => row.badgeBucket !== badgeBucket) ); return { previous }; }, @@ -71,18 +76,19 @@ export function KiloClawChat({ useFocusEffect( useCallback(() => { + const badgeBucket = badgeBucketForInstance(instanceId); isFocusedRef.current = true; setActiveChatInstance(instanceId); setLastActiveInstance(instanceId); - markChatRead({ channelId: instanceId }); + markChatRead({ badgeBucket }); // If a notification for this chat arrives while the screen is already open it is // visually suppressed, but the DO still incremented the server-side count. Clear // it immediately so the badge never drifts above 0 while the user is reading. const subscription = Notifications.addNotificationReceivedListener(notification => { const data = parseNotificationData(notification.request.content.data); - if (data?.type === 'chat' && data.instanceId === instanceId) { - markChatRead({ channelId: instanceId }); + if (data && getNotificationSandboxId(data) === instanceId) { + markChatRead({ badgeBucket }); } }); @@ -100,7 +106,7 @@ export function KiloClawChat({ // not an app-state one), so without this the badge stays stuck after backgrounding. useEffect(() => { if (isActive && isFocusedRef.current) { - markChatRead({ channelId: instanceId }); + markChatRead({ badgeBucket: badgeBucketForInstance(instanceId) }); } }, [isActive, instanceId, markChatRead]); diff --git a/apps/mobile/src/lib/badge-buckets.ts b/apps/mobile/src/lib/badge-buckets.ts new file mode 100644 index 0000000000..cb32814020 --- /dev/null +++ b/apps/mobile/src/lib/badge-buckets.ts @@ -0,0 +1 @@ +export const badgeBucketForInstance = (sandboxId: string) => `kiloclaw:${sandboxId}` as const; diff --git a/apps/mobile/src/lib/hooks/use-unread-counts.ts b/apps/mobile/src/lib/hooks/use-unread-counts.ts index e69e93fdc9..f88f95cabc 100644 --- a/apps/mobile/src/lib/hooks/use-unread-counts.ts +++ b/apps/mobile/src/lib/hooks/use-unread-counts.ts @@ -4,9 +4,8 @@ import { useMemo } from 'react'; import { useTRPC } from '@/lib/trpc'; /** - * Fetches per-channel unread message counts for the current user and returns - * a Map keyed by channelId for O(1) lookup from dashboard cards. For kiloclaw - * chats, `channelId` equals the instance's `sandboxId`. + * Fetches unread message counts for the current user and returns a Map keyed + * by badge bucket for O(1) lookup from dashboard cards. * * Freshness is driven by invalidations, not polling: * - Foreground chat push → invalidate (see `use-unread-counts-invalidation`). @@ -21,13 +20,13 @@ export function useUnreadCounts() { }) ); - const byChannel = useMemo(() => { + const byBadgeBucket = useMemo(() => { const map = new Map(); for (const row of query.data ?? []) { - map.set(row.channelId, row.badgeCount); + map.set(row.badgeBucket, row.badgeCount); } return map; }, [query.data]); - return { byChannel, query }; + return { byBadgeBucket, query }; } diff --git a/apps/mobile/src/lib/notifications.ts b/apps/mobile/src/lib/notifications.ts index c20858c069..7045ad9b22 100644 --- a/apps/mobile/src/lib/notifications.ts +++ b/apps/mobile/src/lib/notifications.ts @@ -24,11 +24,18 @@ export function setActiveChatInstance(instanceId: string | null) { activeChatInstanceId = instanceId; } -// Keep in sync with data field in services/notifications/src/dos/NotificationChannelDO.ts -const notificationDataSchema = z.object({ - type: z.literal('chat'), - instanceId: z.string().min(1), -}); +const notificationDataSchema = z.discriminatedUnion('type', [ + z.object({ + type: z.literal('chat'), + instanceId: z.string().min(1), + }), + z.object({ + type: z.literal('chat.message'), + sandboxId: z.string().min(1), + conversationId: z.string().min(1), + messageId: z.string().min(1), + }), +]); type NotificationData = z.infer; @@ -40,6 +47,10 @@ export function parseNotificationData(data: unknown): NotificationData | null { return parsed.success ? parsed.data : null; } +export function getNotificationSandboxId(data: NotificationData): string { + return data.type === 'chat' ? data.instanceId : data.sandboxId; +} + const shown = { shouldShowAlert: true, shouldPlaySound: true, @@ -63,7 +74,7 @@ export function setupNotificationHandler() { const data = parseNotificationData(notification.request.content.data); // Suppress only if the user is already viewing this exact chat - if (data && data.instanceId === activeChatInstanceId) { + if (data && getNotificationSandboxId(data) === activeChatInstanceId) { return suppressed; } @@ -87,7 +98,7 @@ export function setupNotificationResponseHandler() { const data = parseNotificationData(response.notification.request.content.data); if (data) { - const path = `/(app)/chat/${data.instanceId}`; + const path = `/(app)/chat/${getNotificationSandboxId(data)}`; // If the router is ready (has segments), navigate immediately. // Otherwise store as pending for consumption after auth completes. try { @@ -109,7 +120,7 @@ export function checkInitialNotification(): void { } const data = parseNotificationData(response.notification.request.content.data); if (data) { - pendingNotificationLink = `/(app)/chat/${data.instanceId}`; + pendingNotificationLink = `/(app)/chat/${getNotificationSandboxId(data)}`; } } diff --git a/packages/event-service/src/index.ts b/packages/event-service/src/index.ts index 3a40272cbe..c2e238f1dd 100644 --- a/packages/event-service/src/index.ts +++ b/packages/event-service/src/index.ts @@ -1,4 +1,5 @@ export { EventServiceClient, WebSocketAuthError, HandshakeTimeoutError } from './client'; export * from './presence'; +export * from './kiloclaw-contexts'; export * from './schemas'; export type * from './types'; diff --git a/packages/event-service/src/kiloclaw-contexts.ts b/packages/event-service/src/kiloclaw-contexts.ts new file mode 100644 index 0000000000..afbb6bc665 --- /dev/null +++ b/packages/event-service/src/kiloclaw-contexts.ts @@ -0,0 +1,13 @@ +/** + * Event-context path builders for kiloclaw event subscriptions. + * + * These are the contexts on which kilo-chat publishes events (message + * created, typing, etc.) and to which clients subscribe to receive + * those events. Distinct from `/presence/*` contexts, which signal + * whether the user is actively on a surface. + */ + +export const kiloclawInstanceContext = (sandboxId: string) => `/kiloclaw/${sandboxId}` as const; + +export const kiloclawConversationContext = (sandboxId: string, conversationId: string) => + `/kiloclaw/${sandboxId}/${conversationId}` as const; diff --git a/packages/event-service/src/presence.ts b/packages/event-service/src/presence.ts index 8459b886c3..a267aa667c 100644 --- a/packages/event-service/src/presence.ts +++ b/packages/event-service/src/presence.ts @@ -3,14 +3,19 @@ * and are subscribed by clients only when the user is *actively* on the * matching surface. The notifications pipeline queries them via * event-service.isUserInContext to skip pushes when the user is in-context. + * + * The kiloclaw-scoped variants compose `/presence` with the corresponding + * event-context paths so the segment shape is defined in exactly one place. */ +import { kiloclawConversationContext, kiloclawInstanceContext } from './kiloclaw-contexts'; + export type Platform = 'app' | 'web'; export const presenceContextForPlatform = (platform: Platform) => `/presence/${platform}` as const; export const presenceContextForInstance = (sandboxId: string) => - `/presence/kiloclaw/${sandboxId}` as const; + `/presence${kiloclawInstanceContext(sandboxId)}` as const; export const presenceContextForConversation = (sandboxId: string, conversationId: string) => - `/presence/kiloclaw/${sandboxId}/${conversationId}` as const; + `/presence${kiloclawConversationContext(sandboxId, conversationId)}` as const; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 00bae11778..3ea3a33947 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1561,7 +1561,7 @@ importers: version: 7.0.0-dev.20260319.1 jest: specifier: ^30.3.0 - version: 30.3.0(@types/node@24.12.0)(esbuild-register@3.6.0(esbuild@0.27.4)) + version: 30.3.0(@types/node@25.5.0)(esbuild-register@3.6.0(esbuild@0.27.4)) typescript: specifier: 'catalog:' version: 5.9.3 @@ -1808,9 +1808,15 @@ importers: '@kilocode/encryption': specifier: workspace:* version: link:../../packages/encryption + '@kilocode/event-service': + specifier: workspace:* + version: link:../../packages/event-service '@kilocode/kilo-chat': specifier: workspace:* version: link:../../packages/kilo-chat + '@kilocode/notifications': + specifier: workspace:* + version: link:../../packages/notifications '@kilocode/worker-utils': specifier: workspace:* version: link:../../packages/worker-utils @@ -2008,6 +2014,9 @@ importers: '@kilocode/db': specifier: workspace:* version: link:../../packages/db + '@kilocode/event-service': + specifier: workspace:* + version: link:../../packages/event-service '@kilocode/notifications': specifier: workspace:* version: link:../../packages/notifications @@ -2023,9 +2032,6 @@ importers: hono: specifier: ^4.12.7 version: 4.12.8 - stream-chat: - specifier: 'catalog:' - version: 9.38.0(bufferutil@4.1.0)(utf-8-validate@6.0.6) zod: specifier: 'catalog:' version: 4.3.6 @@ -16744,7 +16750,7 @@ snapshots: cjs-module-lexer: 1.4.3 esbuild: 0.27.4 miniflare: 4.20260310.0(bufferutil@4.1.0)(utf-8-validate@6.0.6) - vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.12.0)(@vitest/ui@3.2.4)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) + vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.5.0)(@vitest/ui@3.2.4)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) wrangler: 4.72.0(@cloudflare/workers-types@4.20260313.1)(bufferutil@4.1.0)(utf-8-validate@6.0.6) transitivePeerDependencies: - '@cloudflare/workers-types' @@ -22252,7 +22258,7 @@ snapshots: sirv: 3.0.2 tinyglobby: 0.2.15 tinyrainbow: 2.0.0 - vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.12.0)(@vitest/ui@3.2.4)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) + vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.5.0)(@vitest/ui@3.2.4)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) '@vitest/utils@3.2.4': dependencies: @@ -25683,6 +25689,25 @@ snapshots: - supports-color - ts-node + jest-cli@30.3.0(@types/node@25.5.0)(esbuild-register@3.6.0(esbuild@0.27.4)): + dependencies: + '@jest/core': 30.3.0(esbuild-register@3.6.0(esbuild@0.27.4)) + '@jest/test-result': 30.3.0 + '@jest/types': 30.3.0 + chalk: 4.1.2 + exit-x: 0.2.2 + import-local: 3.2.0 + jest-config: 30.3.0(@types/node@25.5.0)(esbuild-register@3.6.0(esbuild@0.27.4)) + jest-util: 30.3.0 + jest-validate: 30.3.0 + yargs: 17.7.2 + transitivePeerDependencies: + - '@types/node' + - babel-plugin-macros + - esbuild-register + - supports-color + - ts-node + jest-config@29.7.0(@types/node@24.12.0): dependencies: '@babel/core': 7.29.0 @@ -26334,6 +26359,19 @@ snapshots: - supports-color - ts-node + jest@30.3.0(@types/node@25.5.0)(esbuild-register@3.6.0(esbuild@0.27.4)): + dependencies: + '@jest/core': 30.3.0(esbuild-register@3.6.0(esbuild@0.27.4)) + '@jest/types': 30.3.0 + import-local: 3.2.0 + jest-cli: 30.3.0(@types/node@25.5.0)(esbuild-register@3.6.0(esbuild@0.27.4)) + transitivePeerDependencies: + - '@types/node' + - babel-plugin-macros + - esbuild-register + - supports-color + - ts-node + jimp-compact@0.16.1: {} jiti@2.6.1: {} diff --git a/services/kilo-chat/package.json b/services/kilo-chat/package.json index 9eb1492829..283de5871b 100644 --- a/services/kilo-chat/package.json +++ b/services/kilo-chat/package.json @@ -27,7 +27,9 @@ "dependencies": { "@kilocode/db": "workspace:*", "@kilocode/encryption": "workspace:*", + "@kilocode/event-service": "workspace:*", "@kilocode/kilo-chat": "workspace:*", + "@kilocode/notifications": "workspace:*", "@kilocode/worker-utils": "workspace:*", "drizzle-orm": "catalog:", "hono": "catalog:", diff --git a/services/kilo-chat/src/__tests__/conversation-status-routes.test.ts b/services/kilo-chat/src/__tests__/conversation-status-routes.test.ts index 4b6330c0ac..022f803484 100644 --- a/services/kilo-chat/src/__tests__/conversation-status-routes.test.ts +++ b/services/kilo-chat/src/__tests__/conversation-status-routes.test.ts @@ -1,6 +1,7 @@ import { env } from 'cloudflare:test'; import { describe, it, expect, vi } from 'vitest'; import { Hono } from 'hono'; +import { kiloclawConversationContext } from '@kilocode/event-service'; import type { AuthContext } from '../auth'; import { botAuthMiddleware } from '../auth-bot'; import { registerBotRoutes } from '../routes/bot-messages'; @@ -246,7 +247,7 @@ describe('POST /bot/v1/sandboxes/:sandboxId/conversations/:cid/conversation-stat expect(pushEvent).toHaveBeenCalledTimes(1); expect(pushEvent).toHaveBeenCalledWith( userId, - `/kiloclaw/${sandboxId}/${conversationId}`, + kiloclawConversationContext(sandboxId, conversationId), 'conversation.status', { conversationId, diff --git a/services/kilo-chat/src/__tests__/push-notifications.test.ts b/services/kilo-chat/src/__tests__/push-notifications.test.ts new file mode 100644 index 0000000000..920bae946c --- /dev/null +++ b/services/kilo-chat/src/__tests__/push-notifications.test.ts @@ -0,0 +1,173 @@ +import { env } from 'cloudflare:test'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { ConversationDO } from '../do/conversation-do'; +import { makeApp } from './helpers'; + +// fetchSandboxLabel hits Hyperdrive/pg. Mock it so the push call site doesn't +// need a real DB. Individual tests can override per-test as needed. +vi.mock('../services/sandbox-lookup', () => ({ + fetchSandboxLabel: vi.fn(async () => 'My Sandbox'), +})); + +const sampleContent = [{ type: 'text', text: 'hello there' }]; + +async function waitForCalls(spy: { mock: { calls: unknown[][] } }, timeoutMs = 2000) { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (spy.mock.calls.length > 0) return; + await new Promise(r => setTimeout(r, 10)); + } +} + +describe('kilo-chat publishes push on message.created', () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it('does NOT call sendPushForConversation when only the sender is a human member', async () => { + // Single-human conversation: sender + bot. After excluding the sender, + // recipientUserIds is empty, so the push fanout must be skipped. + const sendSpy = vi + .spyOn(env.NOTIFICATIONS, 'sendPushForConversation') + .mockResolvedValue({ perRecipient: [] }); + + const userId = 'user-push-skip-1'; + const sandboxId = 'sandbox-push-skip-1'; + const userApp = makeApp(userId, 'user'); + + const createRes = await userApp.request( + '/v1/conversations', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ sandboxId, title: 'Push skip' }), + }, + env + ); + expect(createRes.status).toBe(201); + const { conversationId } = await createRes.json<{ conversationId: string }>(); + + const sendRes = await userApp.request( + '/v1/messages', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ conversationId, content: sampleContent }), + }, + env + ); + expect(sendRes.status).toBe(201); + + // Give any waitUntil tasks a chance to fire then assert the push wasn't + // called — there are no human recipients other than the sender. + await new Promise(r => setTimeout(r, 50)); + expect(sendSpy).not.toHaveBeenCalled(); + }); + + it('calls sendPushForConversation with non-sender humans when conversation has multiple humans', async () => { + const sendSpy = vi + .spyOn(env.NOTIFICATIONS, 'sendPushForConversation') + .mockResolvedValue({ perRecipient: [] }); + + const senderId = 'user-push-multi-sender'; + const otherId = 'user-push-multi-other'; + const sandboxId = 'sandbox-push-multi'; + const conversationId = '01KQD0T86VR3M1RPQCF4WBFX1W'; + const botId = `bot:kiloclaw:${sandboxId}`; + + // Seed a multi-human conversation directly via the ConversationDO so we + // can exercise the push fanout's non-sender recipient path. + const convStub: DurableObjectStub = env.CONVERSATION_DO.get( + env.CONVERSATION_DO.idFromName(conversationId) + ); + const initRes = await convStub.initialize({ + id: conversationId, + title: 'Multi-human', + createdBy: senderId, + createdAt: Date.now(), + members: [ + { id: senderId, kind: 'user' }, + { id: otherId, kind: 'user' }, + { id: botId, kind: 'bot' }, + ], + }); + expect(initRes.ok).toBe(true); + + const senderApp = makeApp(senderId, 'user'); + const sendRes = await senderApp.request( + '/v1/messages', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ conversationId, content: sampleContent }), + }, + env + ); + expect(sendRes.status).toBe(201); + const { messageId } = await sendRes.json<{ messageId: string }>(); + + await waitForCalls(sendSpy); + expect(sendSpy).toHaveBeenCalledTimes(1); + const call = sendSpy.mock.calls[0][0] as { + conversationId: string; + sandboxId: string; + senderUserId: string | null; + recipientUserIds: string[]; + title: string; + bodyPreview: string; + messageId: string; + }; + expect(call.conversationId).toBe(conversationId); + expect(call.sandboxId).toBe(sandboxId); + expect(call.senderUserId).toBe(senderId); + expect(call.recipientUserIds).toContain(otherId); + expect(call.recipientUserIds).not.toContain(senderId); + expect(call.bodyPreview).toContain('hello there'); + expect(call.title).toContain('My Sandbox'); + expect(call.messageId).toBe(messageId); + }); + + it('does not block the send when sendPushForConversation rejects', async () => { + vi.spyOn(env.NOTIFICATIONS, 'sendPushForConversation').mockRejectedValue( + new Error('downstream blew up') + ); + + const senderId = 'user-push-throw-sender'; + const otherId = 'user-push-throw-other'; + const sandboxId = 'sandbox-push-throw'; + const conversationId = '01KQD0T86WRTBR2NXX0VX3MY1M'; + const botId = `bot:kiloclaw:${sandboxId}`; + + const convStub: DurableObjectStub = env.CONVERSATION_DO.get( + env.CONVERSATION_DO.idFromName(conversationId) + ); + const initRes = await convStub.initialize({ + id: conversationId, + title: 'Throw', + createdBy: senderId, + createdAt: Date.now(), + members: [ + { id: senderId, kind: 'user' }, + { id: otherId, kind: 'user' }, + { id: botId, kind: 'bot' }, + ], + }); + expect(initRes.ok).toBe(true); + + const senderApp = makeApp(senderId, 'user'); + // Even with the push throwing inside the post-commit fan-out, the send + // must still succeed because the failure is swallowed by try/catch. + const sendRes = await senderApp.request( + '/v1/messages', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ conversationId, content: sampleContent }), + }, + env + ); + expect(sendRes.status).toBe(201); + const body = await sendRes.json<{ messageId: string }>(); + expect(body.messageId).toBeTruthy(); + }); +}); diff --git a/services/kilo-chat/src/__tests__/setup.ts b/services/kilo-chat/src/__tests__/setup.ts index 897c3abf77..5807193965 100644 --- a/services/kilo-chat/src/__tests__/setup.ts +++ b/services/kilo-chat/src/__tests__/setup.ts @@ -15,3 +15,9 @@ vi.mock('../services/user-lookup', () => ({ invalid: [], })), })); + +// sandbox-lookup imports @kilocode/db/client → pg which doesn't work in the +// Workers runtime. Mock globally so module resolution succeeds. +vi.mock('../services/sandbox-lookup', () => ({ + fetchSandboxLabel: vi.fn(async () => 'Sandbox'), +})); diff --git a/services/kilo-chat/src/bindings.d.ts b/services/kilo-chat/src/bindings.d.ts index 842333f2a1..d6580663b6 100644 --- a/services/kilo-chat/src/bindings.d.ts +++ b/services/kilo-chat/src/bindings.d.ts @@ -1,5 +1,9 @@ import type { z } from 'zod'; import type { chatWebhookRpcSchema, KiloChatEventName } from '@kilocode/kilo-chat'; +import type { + SendPushForConversationInput, + SendPushForConversationOutput, +} from '@kilocode/notifications'; // Augment the wrangler-generated Env with RPC method signatures for service // bindings. `worker-configuration.d.ts` types these as plain Fetcher; this @@ -23,6 +27,11 @@ declare global { payload: unknown ): Promise; }; + NOTIFICATIONS: Fetcher & { + sendPushForConversation( + input: SendPushForConversationInput + ): Promise; + }; } } diff --git a/services/kilo-chat/src/services/event-push.ts b/services/kilo-chat/src/services/event-push.ts index 423552ed2a..a1491c25f8 100644 --- a/services/kilo-chat/src/services/event-push.ts +++ b/services/kilo-chat/src/services/event-push.ts @@ -4,6 +4,7 @@ import type { BotStatusRequest, ConversationStatusRequest, } from '@kilocode/kilo-chat'; +import { kiloclawConversationContext, kiloclawInstanceContext } from '@kilocode/event-service'; import { formatError, withDORetry } from '@kilocode/worker-utils'; import { logger } from '../util/logger'; import { lookupSandboxOwnerUserId } from './sandbox-ownership'; @@ -26,7 +27,7 @@ export async function pushEventToHumanMembers( ): Promise> { const es = getEventService(env); if (!es) return new Map(); - const context = `/kiloclaw/${sandboxId}/${conversationId}`; + const context = kiloclawConversationContext(sandboxId, conversationId); const results = await Promise.allSettled( humanMemberIds.map(async userId => { @@ -65,7 +66,7 @@ export async function pushInstanceEvent( ): Promise { const es = getEventService(env); if (!es) return; - const context = `/kiloclaw/${sandboxId}`; + const context = kiloclawInstanceContext(sandboxId); const results = await Promise.allSettled( humanMemberIds.map(userId => es.pushEvent(userId, context, event, payload)) diff --git a/services/kilo-chat/src/services/messages.ts b/services/kilo-chat/src/services/messages.ts index db5373ac8e..884b31de1e 100644 --- a/services/kilo-chat/src/services/messages.ts +++ b/services/kilo-chat/src/services/messages.ts @@ -10,11 +10,13 @@ import type { ContentBlock, ExecApprovalDecision } from '@kilocode/kilo-chat'; import { formatError, withDORetry } from '@kilocode/worker-utils'; import { logger } from '../util/logger'; +import { contentBlocksToText } from '../util/content'; import { extractConversationContext, pushEventToHumanMembers, pushInstanceEvent, } from './event-push'; +import { fetchSandboxLabel } from './sandbox-lookup'; import type { ConversationInfo } from '../do/conversation-do'; export type DeferCtx = { waitUntil: (p: Promise) => void }; @@ -304,6 +306,32 @@ async function postCommitFanOut( } await Promise.allSettled(instanceEvents); } + + // ── Block E: Push notification fanout ───────────────────────────────── + // Runs after realtime/event-service delivery has been attempted. Sender is + // excluded; bot members are not push recipients (kind=bot, never in + // humanMemberIds). Failures are logged but never propagate — the send has + // already succeeded and any other post-commit work must complete. + const pushRecipients = humanMemberIds.filter(id => id !== callerId); + if (sandboxId !== null && pushRecipients.length > 0) { + try { + const senderUserId = isSenderHuman ? callerId : null; + const bodyPreview = contentBlocksToText(content).slice(0, 200); + const sandboxLabel = await fetchSandboxLabel(env.HYPERDRIVE.connectionString, sandboxId); + const conversationTitle = info.title ?? autoTitle ?? 'Untitled'; + await env.NOTIFICATIONS.sendPushForConversation({ + conversationId, + sandboxId, + senderUserId, + recipientUserIds: pushRecipients, + title: `${sandboxLabel} · ${conversationTitle}`, + bodyPreview, + messageId, + }); + } catch (err) { + logger.error('sendPushForConversation failed', formatError(err)); + } + } } // ─── editMessage ──────────────────────────────────────────────────────────── diff --git a/services/kilo-chat/src/services/sandbox-lookup.ts b/services/kilo-chat/src/services/sandbox-lookup.ts new file mode 100644 index 0000000000..7dfad7055b --- /dev/null +++ b/services/kilo-chat/src/services/sandbox-lookup.ts @@ -0,0 +1,18 @@ +import { getWorkerDb } from '@kilocode/db/client'; +import { kiloclaw_instances } from '@kilocode/db/schema'; +import { and, eq, isNull } from 'drizzle-orm'; + +export async function fetchSandboxLabel( + hyperdriveConnectionString: string, + sandboxId: string +): Promise { + const db = getWorkerDb(hyperdriveConnectionString); + const [row] = await db + .select({ name: kiloclaw_instances.name }) + .from(kiloclaw_instances) + .where( + and(eq(kiloclaw_instances.sandbox_id, sandboxId), isNull(kiloclaw_instances.destroyed_at)) + ) + .limit(1); + return row?.name ?? 'KiloClaw'; +} diff --git a/services/kilo-chat/src/util/content.ts b/services/kilo-chat/src/util/content.ts new file mode 100644 index 0000000000..060e75726a --- /dev/null +++ b/services/kilo-chat/src/util/content.ts @@ -0,0 +1,12 @@ +import type { ContentBlock } from '@kilocode/kilo-chat'; + +/** Concatenates text content blocks into a single string. Skips non-text blocks. */ +export function contentBlocksToText(content: ContentBlock[]): string { + return content + .filter( + (b): b is { type: 'text'; text: string } => + b.type === 'text' && typeof (b as { text?: unknown }).text === 'string' + ) + .map(b => b.text) + .join(''); +} diff --git a/services/kilo-chat/vitest.config.mts b/services/kilo-chat/vitest.config.mts index 7f657f3298..f814c84e0e 100644 --- a/services/kilo-chat/vitest.config.mts +++ b/services/kilo-chat/vitest.config.mts @@ -20,6 +20,7 @@ export default defineWorkersConfig({ serviceBindings: { KILOCLAW: 'kiloclaw-stub', EVENT_SERVICE: 'event-service-stub', + NOTIFICATIONS: 'notifications-stub', KILO_CHAT_SELF: kCurrentWorker as unknown as string, }, workers: [ @@ -59,6 +60,21 @@ export default defineWorkersConfig({ } `, }, + { + name: 'notifications-stub', + modules: true, + script: ` + import { WorkerEntrypoint } from 'cloudflare:workers'; + export default class NotificationsStub extends WorkerEntrypoint { + async fetch(request) { + return new Response('ok'); + } + async sendPushForConversation(input) { + return { perRecipient: [] }; + } + } + `, + }, ], }, }, diff --git a/services/kilo-chat/worker-configuration.d.ts b/services/kilo-chat/worker-configuration.d.ts index 37bd73ea62..425c447524 100644 --- a/services/kilo-chat/worker-configuration.d.ts +++ b/services/kilo-chat/worker-configuration.d.ts @@ -1,5 +1,5 @@ /* eslint-disable */ -// Generated by Wrangler by running `wrangler types` (hash: 9175d354bbd6fb2004e44ec97e77c151) +// Generated by Wrangler by running `wrangler types` (hash: 3a963525d82eadcc7359b89d1d30e039) // Runtime types generated with workerd@1.20260312.1 2026-04-25 nodejs_compat declare namespace Cloudflare { interface GlobalProps { @@ -15,6 +15,7 @@ declare namespace Cloudflare { SANDBOX_STATUS_DO: DurableObjectNamespace; KILOCLAW: Fetcher /* kiloclaw */; EVENT_SERVICE: Fetcher /* event-service */; + NOTIFICATIONS: Service /* entrypoint NotificationsService from notifications */; } } interface Env extends Cloudflare.Env {} diff --git a/services/kilo-chat/wrangler.jsonc b/services/kilo-chat/wrangler.jsonc index 167e5dce38..54e5fee17f 100644 --- a/services/kilo-chat/wrangler.jsonc +++ b/services/kilo-chat/wrangler.jsonc @@ -42,6 +42,11 @@ "services": [ { "binding": "KILOCLAW", "service": "kiloclaw" }, { "binding": "EVENT_SERVICE", "service": "event-service" }, + { + "binding": "NOTIFICATIONS", + "service": "notifications", + "entrypoint": "NotificationsService", + }, ], "secrets_store_secrets": [ diff --git a/services/notifications/package.json b/services/notifications/package.json index a7897326d8..a304e241c6 100644 --- a/services/notifications/package.json +++ b/services/notifications/package.json @@ -6,7 +6,7 @@ "deploy": "wrangler deploy", "dev": "wrangler dev", "start": "wrangler dev", - "test": "vitest", + "test": "vitest run", "cf-typegen": "wrangler types", "typecheck": "tsgo --noEmit", "lint": "pnpm -w exec oxlint --config .oxlintrc.json services/notifications/src" @@ -22,12 +22,12 @@ }, "dependencies": { "@kilocode/db": "workspace:*", + "@kilocode/event-service": "workspace:*", "@kilocode/notifications": "workspace:*", "@kilocode/worker-utils": "workspace:*", "drizzle-orm": "catalog:", "expo-server-sdk": "^6.1.0", "hono": "catalog:", - "stream-chat": "catalog:", "zod": "catalog:" } } diff --git a/services/notifications/src/__tests__/dispatch-push.test.ts b/services/notifications/src/__tests__/dispatch-push.test.ts new file mode 100644 index 0000000000..1e9dca7fd3 --- /dev/null +++ b/services/notifications/src/__tests__/dispatch-push.test.ts @@ -0,0 +1,228 @@ +import { env, runInDurableObject } from 'cloudflare:test'; +import { describe, expect, it, vi, beforeEach } from 'vitest'; +import { getTableName } from 'drizzle-orm'; +import type { DispatchPushInput } from '@kilocode/notifications'; + +import { sendPushNotifications } from '../lib/expo-push'; +import * as dbClient from '@kilocode/db/client'; + +vi.mock('../lib/expo-push', () => ({ + sendPushNotifications: vi.fn(async () => ({ + ticketTokenPairs: [{ ticket: { status: 'ok', id: 't1' }, token: 'tok1' }], + staleTokens: [], + })), +})); + +type DbState = { + tokens: { user_id: string; token: string }[]; + badgeTotal: number; +}; + +function installDbMock(state: DbState) { + const fakeDb = { + select: () => ({ + from: (table: Parameters[0]) => ({ + where: async () => { + if (getTableName(table) === 'user_push_tokens') { + return state.tokens.map(t => ({ token: t.token })); + } + // sum(badge_count) — return single row with `total` + return [{ total: state.badgeTotal }]; + }, + }), + }), + insert: () => ({ + values: () => ({ onConflictDoUpdate: async () => undefined }), + }), + delete: () => ({ where: async () => undefined }), + }; + vi.spyOn(dbClient, 'getWorkerDb').mockReturnValue( + fakeDb as unknown as ReturnType + ); +} + +const baseInput = (over: Partial = {}): DispatchPushInput => ({ + userId: 'user-1', + presenceContext: '/presence/kiloclaw/sb1/conv1', + idempotencyKey: 'k1', + badge: { badgeBucket: 'conv1', delta: 1 }, + push: { + title: 'T', + body: 'B', + data: { type: 'chat.message', sandboxId: 'sb1', conversationId: 'conv1', messageId: 'm1' }, + sound: 'default', + priority: 'high', + }, + ...over, +}); + +function getDO(name = 'conv1') { + const id = env.NOTIFICATION_CHANNEL_DO.idFromName(name); + return env.NOTIFICATION_CHANNEL_DO.get(id); +} + +describe('NotificationChannelDO.dispatchPush', () => { + beforeEach(() => { + vi.mocked(sendPushNotifications).mockClear(); + vi.spyOn(env.EXPO_ACCESS_TOKEN, 'get').mockResolvedValue('test-token'); + }); + + it('returns suppressed_presence when EVENT_SERVICE.isUserInContext is true', async () => { + installDbMock({ tokens: [{ user_id: 'user-1', token: 'tok1' }], badgeTotal: 0 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValueOnce(true); + const result = await getDO().dispatchPush(baseInput()); + expect(result.kind).toBe('suppressed_presence'); + expect(sendPushNotifications).not.toHaveBeenCalled(); + }); + + it('returns no_tokens when the user has no push tokens', async () => { + installDbMock({ tokens: [], badgeTotal: 0 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValueOnce(false); + const result = await getDO().dispatchPush(baseInput({ userId: 'user-no-tokens' })); + expect(result.kind).toBe('no_tokens'); + expect(sendPushNotifications).not.toHaveBeenCalled(); + }); + + it('delivers, increments badge, writes idempotency key', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 1 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValueOnce(false); + const result = await getDO('conv-deliver').dispatchPush( + baseInput({ idempotencyKey: 'k-deliver' }) + ); + expect(result.kind).toBe('delivered'); + expect(sendPushNotifications).toHaveBeenCalledOnce(); + const [[messages]] = vi.mocked(sendPushNotifications).mock.calls; + expect(messages[0].badge).toBe(1); + }); + + it('returns duplicate when the idempotency key has been seen', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 1 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValue(false); + const stub = getDO('conv-dup'); + const input = baseInput({ idempotencyKey: 'k-dup' }); + await stub.dispatchPush(input); + const second = await stub.dispatchPush(input); + expect(second.kind).toBe('duplicate'); + expect(sendPushNotifications).toHaveBeenCalledOnce(); + }); + + it('skips badge mutation when badge is null', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 0 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValueOnce(false); + const result = await getDO('conv-no-badge').dispatchPush( + baseInput({ badge: null, idempotencyKey: 'k-no-badge' }) + ); + expect(result.kind).toBe('delivered'); + const [[messages]] = vi.mocked(sendPushNotifications).mock.calls; + expect(messages[0].badge).toBeUndefined(); + }); + + it('does not write idempotency key on Expo failure', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 0 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValue(false); + vi.mocked(sendPushNotifications).mockRejectedValueOnce(new Error('boom')); + const stub = getDO('conv-fail'); + const input = baseInput({ idempotencyKey: 'k-fail', badge: null }); + const first = await stub.dispatchPush(input); + expect(first.kind).toBe('failed'); + const second = await stub.dispatchPush(input); + expect(second.kind).not.toBe('duplicate'); + }); + + it('does not re-increment the badge when retrying after Expo failure', async () => { + const insertSpy = vi.fn().mockReturnValue({ + values: () => ({ onConflictDoUpdate: async () => undefined }), + }); + vi.spyOn(dbClient, 'getWorkerDb').mockReturnValue({ + select: () => ({ + from: (table: Parameters[0]) => ({ + where: async () => { + if (getTableName(table) === 'user_push_tokens') { + return [{ token: 'tok1' }]; + } + return [{ total: 1 }]; + }, + }), + }), + insert: insertSpy, + delete: () => ({ where: async () => undefined }), + } as unknown as ReturnType); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValue(false); + vi.mocked(sendPushNotifications).mockRejectedValueOnce(new Error('boom')); + + const stub = getDO('conv-no-double'); + const input = baseInput({ idempotencyKey: 'k-no-double' }); + + const first = await stub.dispatchPush(input); + expect(first.kind).toBe('failed'); + expect(insertSpy).toHaveBeenCalledTimes(1); + + const second = await stub.dispatchPush(input); + expect(second.kind).toBe('delivered'); + // Badge must not be incremented twice across the retry — the first + // attempt's `pending` marker gates the second insert out. + expect(insertSpy).toHaveBeenCalledTimes(1); + }); + + it('schedules cleanup when writing the pending marker (failed send)', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 0 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValue(false); + vi.mocked(sendPushNotifications).mockRejectedValueOnce(new Error('boom')); + const stub = getDO('conv-pending-alarm'); + + const result = await stub.dispatchPush(baseInput({ idempotencyKey: 'k-pending-alarm' })); + expect(result.kind).toBe('failed'); + // Even though delivery failed, an alarm must be set so the orphan + // `pending` record gets pruned after IDEM_TTL_MS. + const alarm = await runInDurableObject(stub, (_inst, state) => state.storage.getAlarm()); + expect(alarm).not.toBeNull(); + }); + + it('reschedules cleanup for younger records when alarm fires', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 1 }); + const stub = getDO('conv-reschedule'); + + const now = Date.now(); + await runInDurableObject(stub, async (_inst, state) => { + await state.storage.put('idem:old', { stage: 'delivered', ts: now - 2 * 60 * 60 * 1000 }); + await state.storage.put('idem:new', { stage: 'delivered', ts: now - 30 * 60 * 1000 }); + }); + + await runInDurableObject(stub, async inst => { + await (inst as unknown as { alarm: () => Promise }).alarm(); + }); + + const remaining = await runInDurableObject(stub, async (_inst, state) => { + const entries = await state.storage.list({ prefix: 'idem:' }); + return Array.from(entries.keys()); + }); + expect(remaining).toEqual(['idem:new']); + + const alarm = await runInDurableObject(stub, (_inst, state) => state.storage.getAlarm()); + expect(alarm).not.toBeNull(); + // Should be rescheduled for the younger record's expiry, not "1h from now". + const expectedExpiry = now - 30 * 60 * 1000 + 60 * 60 * 1000; + expect(alarm).toBe(expectedExpiry); + }); + + it('does not reset the alarm on every successful send', async () => { + installDbMock({ tokens: [{ user_id: 'u', token: 'tok1' }], badgeTotal: 1 }); + vi.spyOn(env.EVENT_SERVICE, 'isUserInContext').mockResolvedValue(false); + const stub = getDO('conv-alarm'); + + await stub.dispatchPush(baseInput({ idempotencyKey: 'k-alarm-1' })); + const firstAlarm = await runInDurableObject(stub, (_inst, state) => state.storage.getAlarm()); + expect(firstAlarm).not.toBeNull(); + + // Advance Date.now so a naive setAlarm would push the alarm forward. + const realNow = Date.now; + try { + vi.spyOn(Date, 'now').mockImplementation(() => realNow.call(Date) + 60_000); + await stub.dispatchPush(baseInput({ idempotencyKey: 'k-alarm-2' })); + } finally { + vi.mocked(Date.now).mockRestore(); + } + const secondAlarm = await runInDurableObject(stub, (_inst, state) => state.storage.getAlarm()); + expect(secondAlarm).toBe(firstAlarm); + }); +}); diff --git a/services/notifications/src/__tests__/env.d.ts b/services/notifications/src/__tests__/env.d.ts new file mode 100644 index 0000000000..3257351c91 --- /dev/null +++ b/services/notifications/src/__tests__/env.d.ts @@ -0,0 +1,7 @@ +import type NotificationsService from '../index'; + +declare module 'cloudflare:test' { + interface ProvidedEnv extends Env { + SELF: Service; + } +} diff --git a/services/notifications/src/__tests__/send-push-for-conversation.test.ts b/services/notifications/src/__tests__/send-push-for-conversation.test.ts new file mode 100644 index 0000000000..2eb5365773 --- /dev/null +++ b/services/notifications/src/__tests__/send-push-for-conversation.test.ts @@ -0,0 +1,70 @@ +import { env } from 'cloudflare:test'; +import { describe, expect, it, vi } from 'vitest'; +import type { + DispatchPushInput, + PerRecipientResult, + SendPushForConversationInput, +} from '@kilocode/notifications'; + +import type * as do_module from '../dos/NotificationChannelDO'; + +const baseInput = ( + over: Partial = {} +): SendPushForConversationInput => ({ + conversationId: 'conv1', + sandboxId: 'sb1', + senderUserId: 'sender', + recipientUserIds: ['r1', 'r2', 'r2', 'sender'], + title: 'Conv Title', + bodyPreview: 'hello', + messageId: 'm1', + ...over, +}); + +describe('NotificationsService.sendPushForConversation', () => { + it('excludes sender, dedupes, fans out to remaining recipients', async () => { + const stubSpy = vi.fn(async (_input: DispatchPushInput) => ({ + kind: 'delivered' as const, + tokenCount: 1, + })); + vi.spyOn(env.NOTIFICATION_CHANNEL_DO, 'get').mockReturnValue({ + dispatchPush: stubSpy, + } as unknown as DurableObjectStub); + + const result = await env.SELF.sendPushForConversation(baseInput()); + + expect(stubSpy).toHaveBeenCalledTimes(2); // r1, r2 + expect(result.perRecipient.map((r: PerRecipientResult) => r.userId).sort()).toEqual([ + 'r1', + 'r2', + ]); + expect(result.perRecipient.every((r: PerRecipientResult) => r.outcome === 'delivered')).toBe( + true + ); + }); + + it('passes the right presence context and badge bucket', async () => { + const stubSpy = vi.fn(async (_input: DispatchPushInput) => ({ + kind: 'delivered' as const, + tokenCount: 1, + })); + vi.spyOn(env.NOTIFICATION_CHANNEL_DO, 'get').mockReturnValue({ + dispatchPush: stubSpy, + } as unknown as DurableObjectStub); + + await env.SELF.sendPushForConversation( + baseInput({ recipientUserIds: ['r1'], senderUserId: null }) + ); + const firstCall = stubSpy.mock.calls[0]; + if (!firstCall) throw new Error('expected dispatchPush to be called'); + const call: DispatchPushInput = firstCall[0]; + expect(call.presenceContext).toBe('/presence/kiloclaw/sb1/conv1'); + expect(call.badge).toEqual({ badgeBucket: 'kiloclaw:sb1:conv1', delta: 1 }); + expect(call.push.data).toEqual({ + type: 'chat.message', + sandboxId: 'sb1', + conversationId: 'conv1', + messageId: 'm1', + }); + }); +}); diff --git a/services/notifications/src/__tests__/setup.ts b/services/notifications/src/__tests__/setup.ts new file mode 100644 index 0000000000..d54904d430 --- /dev/null +++ b/services/notifications/src/__tests__/setup.ts @@ -0,0 +1,17 @@ +import { vi } from 'vitest'; + +vi.mock('@kilocode/db/client', () => ({ + getWorkerDb: () => ({ + select: () => ({ + from: (table: { _: { name: string } }) => ({ + where: () => { + if (table._.name === 'user_push_tokens') return []; + if (table._.name === 'badge_counts') return [{ total: 0 }]; + return []; + }, + }), + }), + insert: () => ({ values: () => ({ onConflictDoUpdate: async () => undefined }) }), + delete: () => ({ where: async () => undefined }), + }), +})); diff --git a/services/notifications/src/bindings.d.ts b/services/notifications/src/bindings.d.ts new file mode 100644 index 0000000000..5797e9eaf4 --- /dev/null +++ b/services/notifications/src/bindings.d.ts @@ -0,0 +1,14 @@ +import type {} from './worker-configuration.d.ts'; + +// Augment the wrangler-generated Env with RPC method signatures for service +// bindings. `worker-configuration.d.ts` types these as plain Fetcher; this +// file layers on the RPC shape so call sites don't need runtime casts. +declare global { + interface Env { + EVENT_SERVICE: Fetcher & { + isUserInContext(userId: string, context: string): Promise; + }; + } +} + +export type NotificationsEnv = Env; diff --git a/services/notifications/src/dos/NotificationChannelDO.ts b/services/notifications/src/dos/NotificationChannelDO.ts index 3568f9a8e7..195bdd82e9 100644 --- a/services/notifications/src/dos/NotificationChannelDO.ts +++ b/services/notifications/src/dos/NotificationChannelDO.ts @@ -1,210 +1,151 @@ import { DurableObject } from 'cloudflare:workers'; import { getWorkerDb } from '@kilocode/db/client'; -import { badge_counts, kiloclaw_instances, user_push_tokens } from '@kilocode/db/schema'; -import { badgeBucketForInstance } from '@kilocode/notifications'; -import { and, eq, inArray, isNull, sql, sum } from 'drizzle-orm'; -import type { Event } from 'stream-chat'; +import { badge_counts, user_push_tokens } from '@kilocode/db/schema'; +import { type DispatchPushInput, type DispatchPushOutcome } from '@kilocode/notifications'; +import { eq, inArray, sql, sum } from 'drizzle-orm'; import type { ExpoPushMessage, TicketTokenPair } from '../lib/expo-push'; import { sendPushNotifications } from '../lib/expo-push'; -type ReceiptCheckMessage = { - ticketTokenPairs: TicketTokenPair[]; -}; +type ReceiptCheckMessage = { ticketTokenPairs: TicketTokenPair[] }; -type PendingMessage = { - messageId: string; - senderId: string; - text: string; - notified: boolean; - createdAt: number; - updatedAt: string; // ISO timestamp from Stream Chat payload -}; +// Two-stage idempotency record. `pending` means the badge was incremented +// for this idempotency key but the Expo send did not (yet) succeed; on +// retry we must skip the increment to avoid double-counting. `delivered` +// means the send succeeded; further attempts are duplicates. +type IdemRecord = { stage: 'pending' | 'delivered'; ts: number }; -const DEDUP_PREFIX = 'dedup:'; -const MSG_PREFIX = 'msg:'; -const DEDUP_TTL_MS = 60 * 60 * 1000; // 1 hour -const DEBOUNCE_MS = 10_000; // 10 seconds +const IDEM_PREFIX = 'idem:'; +const IDEM_TTL_MS = 60 * 60 * 1000; // 1 hour export class NotificationChannelDO extends DurableObject { - async processWebhook(payload: Event, webhookId: string): Promise { - // Webhook-level dedup (prevents reprocessing the same delivery) - const existing = await this.ctx.storage.get(`${DEDUP_PREFIX}${webhookId}`); - if (existing) { - return Response.json({ ok: true, deduplicated: true }); - } - await this.markWebhookSeen(webhookId); - - const messageId = payload.message?.id; - const senderId = payload.message?.user?.id; - const messageText = payload.message?.text ?? ''; - const messageUpdatedAt = payload.message?.updated_at ?? payload.created_at ?? ''; - - if (!messageId || !senderId?.startsWith('bot-')) { - return Response.json({ ok: true }); - } - - const msgKey = `${MSG_PREFIX}${messageId}`; - const pendingMessage = await this.ctx.storage.get(msgKey); - - if (pendingMessage?.notified) { - return Response.json({ ok: true }); - } + async dispatchPush(input: DispatchPushInput): Promise { + // 1. Idempotency. DO is single-threaded — requests for a given + // conversation serialize on this instance. A `failed` outcome + // leaves the record at `pending` so upstream can retry the send + // without re-incrementing the badge. + const idemKey = `${IDEM_PREFIX}${input.idempotencyKey}`; + const existing = await this.ctx.storage.get(idemKey); + if (existing?.stage === 'delivered') return { kind: 'duplicate' }; + const isRetry = existing?.stage === 'pending'; + + // 2. Presence + const inContext = await this.env.EVENT_SERVICE.isUserInContext( + input.userId, + input.presenceContext + ); + if (inContext) return { kind: 'suppressed_presence' }; - if (pendingMessage) { - // Only accept if this event is newer than what we have - if (messageUpdatedAt <= pendingMessage.updatedAt) { - return Response.json({ ok: true }); - } - if (messageText) { - pendingMessage.text = messageText; - } - pendingMessage.updatedAt = messageUpdatedAt; - await this.ctx.storage.put(msgKey, pendingMessage); - await this.scheduleAlarm(DEBOUNCE_MS); - } else { - // First event for this message (could be message.new or a late message.updated) - const pending: PendingMessage = { - messageId, - senderId, - text: messageText, - notified: false, - createdAt: Date.now(), - updatedAt: messageUpdatedAt, - }; - await this.ctx.storage.put(msgKey, pending); - await this.scheduleAlarm(DEBOUNCE_MS); - } - - return Response.json({ ok: true }); - } - - override async alarm(): Promise { - // Prune expired dedup entries - const dedupEntries = await this.ctx.storage.list({ prefix: DEDUP_PREFIX }); - const now = Date.now(); - const expired: string[] = []; - for (const [key, timestamp] of dedupEntries) { - if (now - timestamp > DEDUP_TTL_MS) { - expired.push(key); - } - } - if (expired.length > 0) { - await this.ctx.storage.delete(expired); - } - - // Process pending messages that have debounced - const pendingEntries = await this.ctx.storage.list({ prefix: MSG_PREFIX }); - for (const [key, msg] of pendingEntries) { - if (msg.notified) { - // Clean up old notified messages - if (now - msg.createdAt > DEDUP_TTL_MS) { - await this.ctx.storage.delete(key); - } - continue; - } - - if (!msg.text) { - // No text — nothing to notify about, discard - await this.ctx.storage.delete(key); - continue; - } - - await this.sendNotification(msg); - msg.notified = true; - await this.ctx.storage.put(key, msg); - } - } - - private async sendNotification(msg: PendingMessage): Promise { - const sandboxId = msg.senderId.slice(4); const db = getWorkerDb(this.env.HYPERDRIVE.connectionString); - const [instance] = await db - .select({ - id: kiloclaw_instances.id, - user_id: kiloclaw_instances.user_id, - name: kiloclaw_instances.name, - }) - .from(kiloclaw_instances) - .where( - and(eq(kiloclaw_instances.sandbox_id, sandboxId), isNull(kiloclaw_instances.destroyed_at)) - ) - .limit(1); - - if (!instance) { - return; - } - - // Increment the badge count for this bucket and return the new total across all buckets. - // Done before the token guard so unread state is always persisted even if the user - // temporarily has no registered push tokens (e.g. between reinstalls). - // Uses UPSERT so the row is created on first notification for this bucket. - const badgeBucket = badgeBucketForInstance(sandboxId); - await db - .insert(badge_counts) - .values({ user_id: instance.user_id, badge_bucket: badgeBucket, badge_count: 1 }) - .onConflictDoUpdate({ - target: [badge_counts.user_id, badge_counts.badge_bucket], - set: { badge_count: sql`${badge_counts.badge_count} + 1` }, - }); - - const [totals] = await db - .select({ total: sum(badge_counts.badge_count) }) - .from(badge_counts) - .where(eq(badge_counts.user_id, instance.user_id)); - - const badgeCount = Number(totals?.total ?? 0); - + // 3. Tokens const tokens = await db .select({ token: user_push_tokens.token }) .from(user_push_tokens) - .where(eq(user_push_tokens.user_id, instance.user_id)); - - if (tokens.length === 0) { - return; + .where(eq(user_push_tokens.user_id, input.userId)); + + if (tokens.length === 0) return { kind: 'no_tokens' }; + + // 4. Badge math. On a retry the badge was already incremented during + // the prior attempt; re-applying the delta would double-count. + // The total is recomputed in either case (other writers may have + // advanced it). + let badgeTotal: number | undefined; + if (input.badge) { + if (!isRetry) { + // Mark `pending` BEFORE the increment so any later failure path + // is gated on the marker and a retry skips the increment. + const ts = Date.now(); + await this.ctx.storage.put(idemKey, { stage: 'pending', ts }); + // Also schedule cleanup at this point — if Expo keeps failing and + // no future push ever lands, `pending` would otherwise leak. + await this.ensureCleanupAlarm(ts); + await db + .insert(badge_counts) + .values({ + user_id: input.userId, + badge_bucket: input.badge.badgeBucket, + badge_count: input.badge.delta, + }) + .onConflictDoUpdate({ + target: [badge_counts.user_id, badge_counts.badge_bucket], + set: { badge_count: sql`${badge_counts.badge_count} + ${input.badge.delta}` }, + }); + } + const [totals] = await db + .select({ total: sum(badge_counts.badge_count) }) + .from(badge_counts) + .where(eq(badge_counts.user_id, input.userId)); + badgeTotal = Number(totals?.total ?? 0); } - const truncatedMessage = msg.text.length > 100 ? msg.text.slice(0, 97) + '...' : msg.text; - + // 5. Send via Expo const messages: ExpoPushMessage[] = tokens.map(({ token }) => ({ to: token, - title: instance.name ?? 'KiloClaw', - body: truncatedMessage, - // Keep in sync with NotificationData in apps/mobile/src/lib/notifications.ts - data: { type: 'chat', instanceId: sandboxId }, - badge: badgeCount, - sound: 'default' as const, - priority: 'high' as const, + title: input.push.title, + body: input.push.body, + data: input.push.data, + ...(badgeTotal !== undefined && { badge: badgeTotal }), + sound: input.push.sound ?? undefined, + priority: input.push.priority ?? 'default', })); const accessToken = await this.env.EXPO_ACCESS_TOKEN.get(); - const { ticketTokenPairs, staleTokens } = await sendPushNotifications(messages, accessToken); + let result: { ticketTokenPairs: TicketTokenPair[]; staleTokens: string[] }; + try { + result = await sendPushNotifications(messages, accessToken); + } catch (err) { + // Leave any `pending` marker in place — retries will re-attempt the + // send while skipping the badge increment. + return { + kind: 'failed', + error: err instanceof Error ? err.message : String(err), + }; + } - if (staleTokens.length > 0) { - await db.delete(user_push_tokens).where(inArray(user_push_tokens.token, staleTokens)); + if (result.staleTokens.length > 0) { + await db.delete(user_push_tokens).where(inArray(user_push_tokens.token, result.staleTokens)); } - if (ticketTokenPairs.length > 0) { - const receiptMsg: ReceiptCheckMessage = { ticketTokenPairs }; + if (result.ticketTokenPairs.length > 0) { + const receiptMsg: ReceiptCheckMessage = { ticketTokenPairs: result.ticketTokenPairs }; await this.env.RECEIPTS_QUEUE.send(receiptMsg, { delaySeconds: 900 }); } - } - private async markWebhookSeen(webhookId: string): Promise { - await this.ctx.storage.put(`${DEDUP_PREFIX}${webhookId}`, Date.now()); + // 6. Mark `delivered` so future retries short-circuit as duplicate. + const ts = Date.now(); + await this.ctx.storage.put(idemKey, { stage: 'delivered', ts }); + await this.ensureCleanupAlarm(ts); + + return { kind: 'delivered', tokenCount: tokens.length }; } - private async scheduleAlarm(delayMs: number): Promise { - // Always reset the alarm to the new debounce window - await this.ctx.storage.setAlarm(Date.now() + delayMs); + override async alarm(): Promise { + const now = Date.now(); + const entries = await this.ctx.storage.list({ prefix: IDEM_PREFIX }); + const expired: string[] = []; + let earliestRemaining: number | undefined; + for (const [key, rec] of entries) { + if (now - rec.ts > IDEM_TTL_MS) { + expired.push(key); + } else if (earliestRemaining === undefined || rec.ts < earliestRemaining) { + earliestRemaining = rec.ts; + } + } + if (expired.length > 0) await this.ctx.storage.delete(expired); + // Reschedule for the earliest remaining record so a quiet conversation + // still gets its leftover entries pruned exactly once their TTL elapses. + if (earliestRemaining !== undefined) { + await this.ctx.storage.setAlarm(earliestRemaining + IDEM_TTL_MS); + } } -} -export function getNotificationChannelDO( - env: Env, - channelId: string -): DurableObjectStub { - const id = env.NOTIFICATION_CHANNEL_DO.idFromName(channelId); - return env.NOTIFICATION_CHANNEL_DO.get(id) as DurableObjectStub; + // Schedule cleanup `IDEM_TTL_MS` from `refTs` only if no alarm is pending. + // `setAlarm` replaces any existing alarm; calling it unconditionally would + // push cleanup forward indefinitely on a busy conversation. + private async ensureCleanupAlarm(refTs: number): Promise { + if ((await this.ctx.storage.getAlarm()) === null) { + await this.ctx.storage.setAlarm(refTs + IDEM_TTL_MS); + } + } } diff --git a/services/notifications/src/index.ts b/services/notifications/src/index.ts index 231df770e7..bb4f673346 100644 --- a/services/notifications/src/index.ts +++ b/services/notifications/src/index.ts @@ -1,14 +1,91 @@ +import { WorkerEntrypoint } from 'cloudflare:workers'; import { Hono } from 'hono'; +import { presenceContextForConversation } from '@kilocode/event-service'; +import { + badgeBucketForConversation, + type DispatchPushInput, + type DispatchPushOutcome, + type PerRecipientResult, + type SendPushForConversationInput, + type SendPushForConversationOutput, +} from '@kilocode/notifications'; + import { queue } from './queue-consumer'; -import { webhooks } from './routes/webhooks'; export { NotificationChannelDO } from './dos/NotificationChannelDO'; const app = new Hono<{ Bindings: Env }>(); +app.get('/', c => c.json({ ok: true })); -app.route('/webhooks', webhooks); +type ConversationDOStub = { + dispatchPush: (input: DispatchPushInput) => Promise; +}; -app.get('/', c => c.json({ ok: true })); +/** Pure core for unit testability. */ +export async function sendPushForConversationCore( + input: SendPushForConversationInput, + deps: { + getConversationDOStub: (conversationId: string) => ConversationDOStub; + } +): Promise { + const recipients: string[] = []; + const seen = new Set(); + for (const id of input.recipientUserIds) { + if (id === input.senderUserId) continue; + if (seen.has(id)) continue; + seen.add(id); + recipients.push(id); + } + + const perRecipient: PerRecipientResult[] = []; + for (const userId of recipients) { + const stub = deps.getConversationDOStub(input.conversationId); + const outcome = await stub.dispatchPush({ + userId, + presenceContext: presenceContextForConversation(input.sandboxId, input.conversationId), + idempotencyKey: `chat:${input.messageId}:${userId}`, + badge: { + badgeBucket: badgeBucketForConversation(input.sandboxId, input.conversationId), + delta: 1, + }, + push: { + title: input.title, + body: input.bodyPreview, + data: { + type: 'chat.message', + sandboxId: input.sandboxId, + conversationId: input.conversationId, + messageId: input.messageId, + }, + sound: 'default', + priority: 'high', + }, + }); + perRecipient.push({ userId, outcome: outcome.kind }); + } + return { perRecipient }; +} + +export class NotificationsService extends WorkerEntrypoint { + override async fetch(request: Request): Promise { + return app.fetch(request, this.env, this.ctx); + } + + override async queue(batch: MessageBatch): Promise { + return queue(batch as Parameters[0], this.env); + } + + async sendPushForConversation( + input: SendPushForConversationInput + ): Promise { + return sendPushForConversationCore(input, { + getConversationDOStub: (conversationId: string) => + this.env.NOTIFICATION_CHANNEL_DO.get( + this.env.NOTIFICATION_CHANNEL_DO.idFromName(conversationId) + ) as unknown as ConversationDOStub, + }); + } +} -export default { fetch: app.fetch, queue }; +export default NotificationsService; diff --git a/services/notifications/src/routes/webhooks.ts b/services/notifications/src/routes/webhooks.ts deleted file mode 100644 index dc66a30d2d..0000000000 --- a/services/notifications/src/routes/webhooks.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { createHmac, timingSafeEqual } from 'node:crypto'; -import { Hono } from 'hono'; -import type { Event } from 'stream-chat'; - -import { getNotificationChannelDO } from '../dos/NotificationChannelDO'; - -const webhooks = new Hono<{ Bindings: Env }>(); - -function verifyWebhookSignature(body: string, signature: string | null, secret: string): boolean { - if (!signature) return false; - - const expectedSignature = createHmac('sha256', secret).update(body).digest('hex'); - - if (signature.length !== expectedSignature.length) return false; - return timingSafeEqual(Buffer.from(signature), Buffer.from(expectedSignature)); -} - -webhooks.post('/stream-chat', async c => { - const rawBody = await c.req.text(); - const signature = c.req.header('x-signature') ?? null; - const webhookId = c.req.header('x-webhook-id'); - - const secret = await c.env.STREAM_CHAT_API_SECRET.get(); - if (!verifyWebhookSignature(rawBody, signature, secret)) { - return c.json({ error: 'Invalid signature' }, 401); - } - - const payload = JSON.parse(rawBody) as Event; - - // Only handle new and updated messages - if (payload.type !== 'message.new' && payload.type !== 'message.updated') { - return c.json({ ok: true }); - } - - const channelId = payload.channel_id; - if (!channelId || !webhookId) { - return c.json({ ok: true }); - } - - // Forward to the channel's Durable Object for dedup + delivery - const stub = getNotificationChannelDO(c.env, channelId); - return stub.processWebhook(payload, webhookId); -}); - -export { webhooks }; diff --git a/services/notifications/tsconfig.json b/services/notifications/tsconfig.json index 635e98f321..71b42aebcf 100644 --- a/services/notifications/tsconfig.json +++ b/services/notifications/tsconfig.json @@ -14,7 +14,7 @@ "forceConsistentCasingInFileNames": true, "strict": true, "skipLibCheck": true, - "types": ["./worker-configuration.d.ts", "node"] + "types": ["./worker-configuration.d.ts", "node", "@cloudflare/vitest-pool-workers"] }, "exclude": ["test"], "include": ["worker-configuration.d.ts", "src/**/*.ts"] diff --git a/services/notifications/vitest.config.mts b/services/notifications/vitest.config.mts index d9430c7554..d386792f55 100644 --- a/services/notifications/vitest.config.mts +++ b/services/notifications/vitest.config.mts @@ -1,10 +1,33 @@ import { defineWorkersConfig } from '@cloudflare/vitest-pool-workers/config'; +const kCurrentWorker = Symbol.for('miniflare.kCurrentWorker'); + export default defineWorkersConfig({ test: { + passWithNoTests: true, + setupFiles: ['./src/__tests__/setup.ts'], poolOptions: { workers: { wrangler: { configPath: './wrangler.jsonc' }, + miniflare: { + serviceBindings: { + EVENT_SERVICE: 'event-service-stub', + SELF: kCurrentWorker as unknown as string, + }, + workers: [ + { + name: 'event-service-stub', + modules: true, + script: ` + import { WorkerEntrypoint } from 'cloudflare:workers'; + export default class EventServiceStub extends WorkerEntrypoint { + async fetch() { return new Response('ok'); } + async isUserInContext() { return false; } + } + `, + }, + ], + }, }, }, }, diff --git a/services/notifications/worker-configuration.d.ts b/services/notifications/worker-configuration.d.ts index cbc9201506..0e8a5f8948 100644 --- a/services/notifications/worker-configuration.d.ts +++ b/services/notifications/worker-configuration.d.ts @@ -1,5 +1,5 @@ /* eslint-disable */ -// Generated by Wrangler by running `wrangler types` (hash: b336c1c1e874405e99f5e26c8c9319df) +// Generated by Wrangler by running `wrangler types` (hash: 35f3a1e5a589a3db24bda461e9af3ff0) // Runtime types generated with workerd@1.20260312.1 2026-02-01 nodejs_compat declare namespace Cloudflare { interface GlobalProps { @@ -9,9 +9,9 @@ declare namespace Cloudflare { interface Env { HYPERDRIVE: Hyperdrive; RECEIPTS_QUEUE: Queue; - STREAM_CHAT_API_SECRET: SecretsStoreSecret; EXPO_ACCESS_TOKEN: SecretsStoreSecret; - NOTIFICATION_CHANNEL_DO: DurableObjectNamespace /* NotificationChannelDO */; + NOTIFICATION_CHANNEL_DO: DurableObjectNamespace; + EVENT_SERVICE: Fetcher /* event-service */; } } interface Env extends Cloudflare.Env {} diff --git a/services/notifications/wrangler.jsonc b/services/notifications/wrangler.jsonc index 943bd8176a..ab9c249bc5 100644 --- a/services/notifications/wrangler.jsonc +++ b/services/notifications/wrangler.jsonc @@ -50,12 +50,14 @@ ], }, - "secrets_store_secrets": [ + "services": [ { - "binding": "STREAM_CHAT_API_SECRET", - "store_id": "342a86d9e3a94da698e82d0c6e2a36f0", - "secret_name": "STREAM_CHAT_API_SECRET", + "binding": "EVENT_SERVICE", + "service": "event-service", }, + ], + + "secrets_store_secrets": [ { "binding": "EXPO_ACCESS_TOKEN", "store_id": "342a86d9e3a94da698e82d0c6e2a36f0",