From 571827dbb276e8dde08b39976f877f67cdb6a2f1 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jun 2026 10:54:51 +0100 Subject: [PATCH 1/6] feat(backend): add gateway integration tests for multi-node and multi-device scenarios (#215) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Spin up two Socket.IO instances sharing a real Redis adapter to verify cross-node delivery: message sent on node-1 reaches a socket on node-2 - Assert multi-device fanout: all active devices of a recipient receive the new_message event when per-device envelopes are provided - Verify persist-before-deliver ordering via an intentional DB latency probe — new_message is never emitted until the DB insert resolves - Test revocation disconnect: publishing device_revoked:{id} to Redis triggers force-disconnect and client notification on any node - Test resume/sync determinism: missed ephemeral events are replayed from the Redis stream on reconnect, idempotent with an advanced cursor Add redis:7-alpine service to the backend-ci GitHub Actions workflow so these tests run in CI. Bump vitest testTimeout to 15 s for network I/O. --- .github/workflows/backend-ci.yml | 12 + apps/backend/package.json | 1 + .../integration/gateway.integration.test.ts | 614 ++++++++++++++++++ apps/backend/vitest.config.ts | 1 + pnpm-lock.yaml | 69 +- 5 files changed, 668 insertions(+), 29 deletions(-) create mode 100644 apps/backend/src/__tests__/integration/gateway.integration.test.ts diff --git a/.github/workflows/backend-ci.yml b/.github/workflows/backend-ci.yml index e3a40c9..8c8453e 100644 --- a/.github/workflows/backend-ci.yml +++ b/.github/workflows/backend-ci.yml @@ -15,6 +15,17 @@ jobs: name: Format · Lint · Test runs-on: ubuntu-latest + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 3s + --health-retries 5 + defaults: run: working-directory: apps/backend @@ -45,3 +56,4 @@ jobs: run: pnpm test env: JWT_SECRET: ${{ secrets.JWT_SECRET || 'ci-test-secret' }} + REDIS_URL: redis://localhost:6379 diff --git a/apps/backend/package.json b/apps/backend/package.json index b7b4c69..96eab8c 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -59,6 +59,7 @@ "tsx": "^4.21.0", "typescript": "^5.9.3", "typescript-eslint": "^8.59.3", + "socket.io-client": "^4.8.3", "vitest": "^4.1.6" } } \ No newline at end of file diff --git a/apps/backend/src/__tests__/integration/gateway.integration.test.ts b/apps/backend/src/__tests__/integration/gateway.integration.test.ts new file mode 100644 index 0000000..7db8aa5 --- /dev/null +++ b/apps/backend/src/__tests__/integration/gateway.integration.test.ts @@ -0,0 +1,614 @@ +/** + * Gateway integration tests — issue #215 + * + * Spins up two Socket.IO gateway instances sharing a real Redis instance to + * assert the following acceptance criteria: + * + * 1. Cross-node delivery — message sent on node-1 arrives on node-2 + * 2. Multi-device fanout — every active device of a user receives the envelope + * 3. Persist-before-deliver — DB write completes before new_message is broadcast + * 4. Revocation disconnect — a device revoked via Redis pub/sub is force-disconnected + * 5. Resume/sync after drop — missed ephemeral events are replayed on reconnect + * + * Requires Redis at REDIS_URL (default redis://localhost:6379). + * Start one locally with: docker run -p 6379:6379 redis:7-alpine + */ + +import { describe, it, expect, vi, beforeAll, afterAll, beforeEach } from 'vitest'; +import { createServer } from 'http'; +import { Server } from 'socket.io'; +import { io as ioc } from 'socket.io-client'; +import type { Socket as ClientSocket } from 'socket.io-client'; +import { createAdapter } from '@socket.io/redis-adapter'; +import { Redis } from 'ioredis'; +import jwt from 'jsonwebtoken'; + +// ── hoisted redis reference ─────────────────────────────────────────────────── +// +// vi.hoisted executes before vi.mock factories and before any import, so we +// can close over this reference in the redis mock factory below. + +const redisRef = vi.hoisted(() => ({ instance: null as Redis | null })); + +// ── module mocks ────────────────────────────────────────────────────────────── + +vi.mock('../../db/index.js', () => ({ + db: { + query: { + devices: { findFirst: vi.fn() }, + users: { findFirst: vi.fn() }, + conversationMembers: { findFirst: vi.fn(), findMany: vi.fn() }, + messages: { findFirst: vi.fn(), findMany: vi.fn() }, + userDevices: { findMany: vi.fn() }, + }, + insert: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + execute: vi.fn(), + }, +})); + +vi.mock('../../db/schema.js', () => ({ + devices: {}, + conversations: {}, + conversationMembers: {}, + messages: {}, + messageEnvelopes: {}, + userDevices: {}, + users: {}, +})); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args), + eq: vi.fn(), + lt: vi.fn(), + desc: vi.fn(), + sql: vi.fn(), + inArray: vi.fn(), +})); + +// Expose our test Redis instance through the module singleton so that +// presence, resume-stream, and rate-limit services all talk to the same +// Redis used by the Socket.IO adapter. +vi.mock('../../lib/redis.js', () => ({ + get redis() { + return redisRef.instance; + }, + CONV_CACHE_TTL: 30, + convCacheKey: (userId: string) => `conversations:${userId}`, +})); + +vi.mock('../../lib/conversationCache.js', () => ({ + invalidateConversationCaches: vi.fn().mockResolvedValue(undefined), +})); + +// Allow every event through — rate limiting is tested independently. +vi.mock('../../services/rateLimit.js', () => ({ + checkRateLimit: vi.fn().mockResolvedValue({ allowed: true }), + checkPayloadSize: vi.fn().mockReturnValue({ valid: true, size: 0 }), + recordViolation: vi.fn().mockReturnValue(0), + clearViolations: vi.fn(), +})); + +vi.mock('../../services/heartbeat.js', () => ({ + startHeartbeatTimer: vi.fn(), + clearHeartbeatTimer: vi.fn(), +})); + +vi.mock('../../services/backpressure.js', () => ({ + registerForBackpressure: vi.fn(), + unregisterForBackpressure: vi.fn(), +})); + +// ── imports (resolved after mocks are registered) ───────────────────────────── + +import { db } from '../../db/index.js'; +import { socketAuthMiddleware } from '../../middleware/socketAuth.js'; +import { registerMessagingHandlers } from '../../socket/messaging.js'; +import { + registerDeviceSocket, + unregisterDeviceSocket, + startDeviceRevocationListener, +} from '../../services/deviceRevocation.js'; +import { setOnline, setOffline } from '../../services/presence.js'; +import { recordEphemeralEvent } from '../../services/resumeStream.js'; +import { setSocketServer } from '../../lib/socket.js'; + +// ── fixtures ────────────────────────────────────────────────────────────────── + +const JWT_SECRET = 'test-secret-for-ci-only'; +const REDIS_URL = process.env['REDIS_URL'] ?? 'redis://localhost:6379'; +const CONV_ID = 'conv-integration-215'; + +// Port range reserved for this suite — avoids clashes with other listeners. +const BASE_PORT = 14400; + +const ALICE = { userId: 'user-alice', deviceId: 'device-alice', walletAddress: '0xaaa' }; +const ALICE2 = { userId: 'user-alice', deviceId: 'device-alice-2', walletAddress: '0xaaa' }; +const BOB = { userId: 'user-bob', deviceId: 'device-bob', walletAddress: '0xbbb' }; +const CAROL = { userId: 'user-carol', deviceId: 'device-carol', walletAddress: '0xccc' }; + +function makeToken(u: { userId: string; deviceId: string; walletAddress: string }): string { + return jwt.sign(u, JWT_SECRET, { expiresIn: '1h' }); +} + +// ── gateway factory ─────────────────────────────────────────────────────────── + +interface GatewayNode { + io: Server; + port: number; + close: () => Promise; +} + +async function createGatewayNode(port: number, redis: Redis): Promise { + const httpServer = createServer(); + const io = new Server(httpServer, { cors: { origin: '*' } }); + + const pub = redis.duplicate(); + const sub = redis.duplicate(); + + io.adapter(createAdapter(pub, sub)); + + io.use(socketAuthMiddleware); + + io.on('connection', async (socket) => { + const { userId, deviceId } = (socket as { auth?: { userId: string; deviceId: string } }).auth!; + + registerDeviceSocket(deviceId, socket.id); + await setOnline(redis, userId, socket.id); + + // Auto-join every conversation the user belongs to (mirrors index.ts). + // Our mock distinguishes connection-time calls (no query arg) from + // send_message calls (passes a where clause) via mockImplementation below. + const memberships = (await vi.mocked(db.query.conversationMembers.findMany)()) as Array<{ + conversationId: string; + }>; + for (const m of memberships) { + await socket.join(m.conversationId); + } + + registerMessagingHandlers(io, socket as never); + + socket.on('disconnect', async () => { + unregisterDeviceSocket(socket.id); + await setOffline(redis, userId, socket.id); + }); + }); + + await new Promise((resolve) => httpServer.listen(port, resolve)); + + return { + io, + port, + close: async () => { + io.close(); + await new Promise((resolve, reject) => + httpServer.close((err) => (err ? reject(err) : resolve())), + ); + pub.disconnect(); + sub.disconnect(); + }, + }; +} + +// ── test helpers ────────────────────────────────────────────────────────────── + +function connect(port: number, user: typeof ALICE): Promise { + return new Promise((resolve, reject) => { + const socket = ioc(`http://localhost:${port}`, { + auth: { token: makeToken(user) }, + forceNew: true, + reconnection: false, + }); + socket.on('connect', () => resolve(socket)); + socket.on('connect_error', (err) => reject(err)); + }); +} + +function waitFor(socket: ClientSocket, event: string, ms = 4000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error(`Timed out waiting for "${event}" on socket ${socket.id}`)), + ms, + ); + socket.once(event, (data: T) => { + clearTimeout(timer); + resolve(data); + }); + }); +} + +// Propagate a short pause so the Redis adapter can sync room subscriptions +// across nodes before we send events. +const adapterSync = () => new Promise((r) => setTimeout(r, 150)); + +// ── mock configurators ──────────────────────────────────────────────────────── + +function mockDevice(user: typeof ALICE, isRevoked = false) { + vi.mocked(db.query.devices.findFirst).mockResolvedValue({ + id: user.deviceId, + userId: user.userId, + isRevoked, + } as never); +} + +// Connection-time findMany (no args) → returns conversationId entries. +// send_message findMany (with args) → returns userId entries for cache invalidation. +function mockMemberships(convIds: string[], members: string[]) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (db.query.conversationMembers.findMany as any).mockImplementation(async (query?: unknown) => + query ? members.map((userId) => ({ userId })) : convIds.map((c) => ({ conversationId: c })), + ); +} + +function mockInsertMessage(msg: { + id: string; + conversationId: string; + senderId: string; + senderDeviceId: string; + ciphertext: string; + sequenceNumber?: number; +}) { + const row = { + ...msg, + contentType: 'text/plain', + sequenceNumber: msg.sequenceNumber ?? 1, + createdAt: new Date(), + }; + const returning = vi.fn().mockResolvedValue([row]); + vi.mocked(db.insert).mockReturnValue({ values: vi.fn().mockReturnValue({ returning }) } as never); + return { returning, row }; +} + +// ───────────────────────────────────────────────────────────────────────────── + +describe('Gateway integration — issue #215', () => { + let redis: Redis; + + beforeAll(async () => { + redis = new Redis(REDIS_URL, { lazyConnect: true }); + await redis.connect(); + redisRef.instance = redis; + }); + + afterAll(async () => { + redis.disconnect(); + }); + + beforeEach(async () => { + vi.clearAllMocks(); + + // Flush all keys written by this suite so tests are hermetically isolated. + const patterns = [ + `presence:${ALICE.userId}`, + `presence:${ALICE2.userId}`, + `presence:${BOB.userId}`, + `presence:${CAROL.userId}`, + `resume:events:${ALICE.userId}`, + `resume:events:${BOB.userId}`, + `resume:events:${CAROL.userId}`, + ]; + const existing = (await Promise.all(patterns.map((k) => redis.exists(k)))).flatMap((e, i) => + e ? [patterns[i]!] : [], + ); + if (existing.length) await redis.del(...existing); + }); + + // ── 1. Cross-node delivery ────────────────────────────────────────────────── + + describe('cross-node delivery', () => { + it('delivers a message from a socket on node-1 to a socket on node-2', async () => { + const node1 = await createGatewayNode(BASE_PORT, redis); + const node2 = await createGatewayNode(BASE_PORT + 1, redis); + + try { + const MSG_ID = 'msg-cross-node-215'; + + // Alice on node-1, Bob on node-2 — both belong to CONV_ID. + mockDevice(ALICE); + mockMemberships([CONV_ID], [ALICE.userId, BOB.userId]); + const clientAlice = await connect(node1.port, ALICE); + + mockDevice(BOB); + const clientBob = await connect(node2.port, BOB); + + // Allow the Redis adapter to propagate room subscriptions across nodes. + await adapterSync(); + + // Configure DB for send_message. + vi.mocked(db.query.conversationMembers.findFirst).mockResolvedValue({ + id: 'm1', + userId: ALICE.userId, + conversationId: CONV_ID, + } as never); + vi.mocked(db.query.messages.findFirst).mockResolvedValue(undefined); + vi.mocked(db.query.userDevices.findMany).mockResolvedValue([] as never); + mockInsertMessage({ + id: MSG_ID, + conversationId: CONV_ID, + senderId: ALICE.userId, + senderDeviceId: ALICE.deviceId, + ciphertext: 'hello from node-1', + }); + + const bobReceived = waitFor<{ id: string; conversationId: string }>( + clientBob, + 'new_message', + ); + + clientAlice.emit('send_message', { + conversationId: CONV_ID, + messageId: MSG_ID, + ciphertext: 'hello from node-1', + }); + + const msg = await bobReceived; + expect(msg.id).toBe(MSG_ID); + expect(msg.conversationId).toBe(CONV_ID); + + clientAlice.disconnect(); + clientBob.disconnect(); + } finally { + await node1.close(); + await node2.close(); + } + }); + }); + + // ── 2. Multi-device fanout ────────────────────────────────────────────────── + + describe('multi-device fanout', () => { + it('delivers a message to every active device of the recipient user', async () => { + const node1 = await createGatewayNode(BASE_PORT + 2, redis); + const node2 = await createGatewayNode(BASE_PORT + 3, redis); + + try { + const MSG_ID = 'msg-fanout-215'; + + // Alice's device-1 on node-1 and device-2 on node-2. + mockDevice(ALICE); + mockMemberships([CONV_ID], [ALICE.userId, BOB.userId]); + const aliceD1 = await connect(node1.port, ALICE); + + mockDevice(ALICE2); + const aliceD2 = await connect(node2.port, ALICE2); + + // Bob sends from node-1. + mockDevice(BOB); + const clientBob = await connect(node1.port, BOB); + + await adapterSync(); + + vi.mocked(db.query.conversationMembers.findFirst).mockResolvedValue({ + id: 'm1', + userId: BOB.userId, + conversationId: CONV_ID, + } as never); + vi.mocked(db.query.messages.findFirst).mockResolvedValue(undefined); + vi.mocked(db.query.userDevices.findMany).mockResolvedValue([ + { id: ALICE.deviceId, userId: ALICE.userId }, + { id: ALICE2.deviceId, userId: ALICE.userId }, + ] as never); + + // db.insert is called twice: messages then messageEnvelopes. + // Both need to return a chainable object; only messages.returning() matters. + const msgRow = { + id: MSG_ID, + conversationId: CONV_ID, + senderId: BOB.userId, + senderDeviceId: BOB.deviceId, + ciphertext: 'broadcast', + contentType: 'text/plain', + sequenceNumber: 1, + createdAt: new Date(), + }; + vi.mocked(db.insert).mockReturnValue({ + values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([msgRow]) }), + } as never); + + const d1Promise = waitFor<{ id: string }>(aliceD1, 'new_message'); + const d2Promise = waitFor<{ id: string }>(aliceD2, 'new_message'); + + clientBob.emit('send_message', { + conversationId: CONV_ID, + messageId: MSG_ID, + ciphertext: 'broadcast', + envelopes: [ + { recipientDeviceId: ALICE.deviceId, ciphertext: 'for-device-1' }, + { recipientDeviceId: ALICE2.deviceId, ciphertext: 'for-device-2' }, + ], + }); + + const [msg1, msg2] = await Promise.all([d1Promise, d2Promise]); + expect(msg1.id).toBe(MSG_ID); + expect(msg2.id).toBe(MSG_ID); + + aliceD1.disconnect(); + aliceD2.disconnect(); + clientBob.disconnect(); + } finally { + await node1.close(); + await node2.close(); + } + }); + }); + + // ── 3. Persist-before-deliver ────────────────────────────────────────────── + + describe('persist-before-deliver', () => { + it('completes the DB insert before broadcasting new_message to peers', async () => { + const node1 = await createGatewayNode(BASE_PORT + 4, redis); + + try { + const MSG_ID = 'msg-persist-215'; + const order: string[] = []; + + mockDevice(ALICE); + mockMemberships([CONV_ID], [ALICE.userId, BOB.userId]); + const clientAlice = await connect(node1.port, ALICE); + + mockDevice(BOB); + const clientBob = await connect(node1.port, BOB); + + await adapterSync(); + + vi.mocked(db.query.conversationMembers.findFirst).mockResolvedValue({ + id: 'm1', + userId: ALICE.userId, + conversationId: CONV_ID, + } as never); + vi.mocked(db.query.messages.findFirst).mockResolvedValue(undefined); + vi.mocked(db.query.userDevices.findMany).mockResolvedValue([] as never); + + // Introduce latency on the returning() step to prove ordering. + const returning = vi.fn().mockImplementation(async () => { + await new Promise((r) => setTimeout(r, 30)); + order.push('db_insert_done'); + return [ + { + id: MSG_ID, + conversationId: CONV_ID, + senderId: ALICE.userId, + senderDeviceId: ALICE.deviceId, + ciphertext: 'persist-test', + contentType: 'text/plain', + sequenceNumber: 99, + createdAt: new Date(), + }, + ]; + }); + vi.mocked(db.insert).mockReturnValue({ + values: vi.fn().mockReturnValue({ returning }), + } as never); + + const bobMessage = waitFor<{ id: string; sequenceNumber: number }>( + clientBob, + 'new_message', + ).then((m) => { + order.push('new_message_received'); + return m; + }); + + clientAlice.emit('send_message', { + conversationId: CONV_ID, + messageId: MSG_ID, + ciphertext: 'persist-before-deliver', + }); + + const received = await bobMessage; + + expect(returning).toHaveBeenCalledOnce(); + expect(order).toEqual(['db_insert_done', 'new_message_received']); + expect(received.sequenceNumber).toBe(99); + + clientAlice.disconnect(); + clientBob.disconnect(); + } finally { + await node1.close(); + } + }); + }); + + // ── 4. Revocation disconnect ─────────────────────────────────────────────── + + describe('revocation disconnect', () => { + it('disconnects and notifies a socket when its device is revoked cross-node', async () => { + const node = await createGatewayNode(BASE_PORT + 5, redis); + + // Register this node's io as the socket server so the revocation listener + // can look up sockets by ID. + setSocketServer(node.io); + + // Dedicated subscriber Redis client (ioredis becomes subscriber-only + // after psubscribe, so we must not reuse the main redis instance). + const revSub = redis.duplicate(); + await startDeviceRevocationListener(revSub, redis); + + try { + mockDevice(CAROL); + mockMemberships([], []); + const clientCarol = await connect(node.port, CAROL); + + await adapterSync(); + + const revokedEvent = waitFor(clientCarol, 'device_revoked'); + const disconnected = new Promise((resolve) => + clientCarol.on('disconnect', () => resolve()), + ); + + // Any gateway instance can publish this — here we simulate it directly. + await redis.publish(`device_revoked:${CAROL.deviceId}`, '1'); + + await Promise.all([revokedEvent, disconnected]); + + expect(clientCarol.connected).toBe(false); + } finally { + revSub.disconnect(); + await node.close(); + } + }); + }); + + // ── 5. Resume / sync after simulated drop ───────────────────────────────── + + describe('resume/sync after simulated drop', () => { + it('replays all missed ephemeral events and signals syncRequired on reconnect', async () => { + const node = await createGatewayNode(BASE_PORT + 6, redis); + + try { + const { userId } = ALICE; + + // Write two ephemeral events to Alice's resume stream before she connects. + const id1 = await recordEphemeralEvent(redis, userId, { + type: 'read_receipt', + data: { conversationId: CONV_ID, lastReadMessageId: 'msg-old-1' }, + }); + const id2 = await recordEphemeralEvent(redis, userId, { + type: 'presence_update', + data: { userId: BOB.userId, online: true }, + }); + + expect(id1).toBeTruthy(); + expect(id2).toBeTruthy(); + + mockDevice(ALICE); + mockMemberships([], []); + const client = await connect(node.port, ALICE); + + const replays: Array<{ id: string; type: string }> = []; + const firstReplayHandler = (evt: { id: string; type: string }) => replays.push(evt); + client.on('ephemeral_replay', firstReplayHandler); + + const complete = waitFor<{ lastEventId: string; syncRequired: boolean }>( + client, + 'resume_complete', + ); + + // Simulate a reconnect with no prior cursor → full replay. + client.emit('resume', { lastEventId: '' }); + + const result = await complete; + + expect(result.syncRequired).toBe(true); + expect(result.lastEventId).toBe(id2); + expect(replays).toHaveLength(2); + expect(replays[0]!.type).toBe('read_receipt'); + expect(replays[1]!.type).toBe('presence_update'); + + // Replaying with the advanced cursor must produce no new replays. + client.off('ephemeral_replay', firstReplayHandler); + const replays2: unknown[] = []; + client.on('ephemeral_replay', (evt) => replays2.push(evt)); + const complete2 = waitFor(client, 'resume_complete'); + client.emit('resume', { lastEventId: id2 }); + + await complete2; + expect(replays2).toHaveLength(0); + + client.disconnect(); + } finally { + await node.close(); + } + }); + }); +}); diff --git a/apps/backend/vitest.config.ts b/apps/backend/vitest.config.ts index 08b6a6b..3756f13 100644 --- a/apps/backend/vitest.config.ts +++ b/apps/backend/vitest.config.ts @@ -4,5 +4,6 @@ export default defineConfig({ test: { environment: 'node', setupFiles: ['./src/__tests__/setup.ts'], + testTimeout: 15000, }, }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c1ac0a3..7733bf2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -10,10 +10,10 @@ importers: devDependencies: prettier: specifier: latest - version: 3.8.3 + version: 3.9.1 turbo: specifier: latest - version: 2.9.16 + version: 2.10.0 apps/backend: dependencies: @@ -99,6 +99,9 @@ importers: prettier: specifier: ^3.8.3 version: 3.8.3 + socket.io-client: + specifier: ^4.8.3 + version: 4.8.3 supertest: specifier: ^7.2.2 version: 7.2.2 @@ -1342,6 +1345,7 @@ packages: '@stellar/stellar-base@15.0.0': resolution: {integrity: sha512-XQhxUr9BYiEcFcgc4oWcCMR9QJCny/GmmGsuwPKf/ieIcOeb5149KLHYx9mJCA0ea8QbucR2/GzV58QbXOTxQA==} engines: {node: '>=20.0.0'} + deprecated: This package is now rolled into @stellar/stellar-sdk. Please use @stellar/stellar-sdk to continue receiving updates and support. '@stellar/stellar-sdk@15.1.0': resolution: {integrity: sha512-GsJUcWx2yboVzYdhTe/LHS3V1wVLSHkUkglC5bBoYWGJt31vzIhbSGno60NP9CdCTNkLJdnrsLJ63oA58Zvh5A==} @@ -1451,33 +1455,33 @@ packages: '@tsconfig/node16@1.0.4': resolution: {integrity: sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==} - '@turbo/darwin-64@2.9.16': - resolution: {integrity: sha512-jLjApWTSNd7JZ5JaLYfelW1ytnGQOvB7ivl+2RD1xQvJTbi8I9gBjzcga7tDZVPyaxpl10YTfJt3BrYXR18KDw==} + '@turbo/darwin-64@2.10.0': + resolution: {integrity: sha512-EwvHThXzpY0KGd1/NAmuewI5D+aVa3Rl/OlxE36yfjUKb/+ySrfJrSlEFt8aD1OXwnnaHnQnPKHFndor0Zxlsg==} cpu: [x64] os: [darwin] - '@turbo/darwin-arm64@2.9.16': - resolution: {integrity: sha512-YPgrn+5HIGzrx0O2a631SV4MBQUe4W/DafMFUuBVgaU32PW9/OTT0ehviF0QSxTXuRJlHvW2eUTemddF5/spmw==} + '@turbo/darwin-arm64@2.10.0': + resolution: {integrity: sha512-9d2fTyyG0lf5Wq1bwJA9qUaeecViMkLcdctWaMMmCkxZ/JqypmqOwK3W6vmejeKVgkr06gSoiX8bD+xN5Jpxcg==} cpu: [arm64] os: [darwin] - '@turbo/linux-64@2.9.16': - resolution: {integrity: sha512-vAEf1H6l26lTpl9FJ/peQo1NUB8RC0sbEJJz5mPcUhHA2bPDup2x3CZPgo/bH8S4cUcBLm4FN3UHd5iUO2RAew==} + '@turbo/linux-64@2.10.0': + resolution: {integrity: sha512-sZBtjMuufitanjzi6UssoUpJMnnPlLMcdcJj3m3ptNsSq31Xh7MnjhwA5nWvLDTfEFg8GPcbYFXMo8vSdKRfqQ==} cpu: [x64] os: [linux] - '@turbo/linux-arm64@2.9.16': - resolution: {integrity: sha512-xDBLR2PZg4BrQOchfG6svgpv5FCNJ2TOtT2psLdEJcdKo1BH+pnPs9Xj6pvUjgfkHbuvBOfeE4R6tvxMoQKDHQ==} + '@turbo/linux-arm64@2.10.0': + resolution: {integrity: sha512-vkq/Z8R+1DQ+kifWFa810IjRy2NNBVvha3cg9sWA3nFh6nnGrHSMnnJKrzH7c/No9kq4Jb55Ru44YKsCSBgrKg==} cpu: [arm64] os: [linux] - '@turbo/windows-64@2.9.16': - resolution: {integrity: sha512-NBAJnaUiGdgkSzQwUIdOvkCkcpTSu58G/sBGa0mvBtzfvFOOgrQwepKOOQ8cp6sWM6OcKDNFj2p1dsZA1OWjPg==} + '@turbo/windows-64@2.10.0': + resolution: {integrity: sha512-CRUEguLWxFQHptYZS7HjPhNhAFawfea07iR+xAQ5e4klgLrPCMdexBkXwSCwOxqTFknJ7RZFN3gOaADsw+Gttg==} cpu: [x64] os: [win32] - '@turbo/windows-arm64@2.9.16': - resolution: {integrity: sha512-Y7SJppD0Z8wjO3Ec0ZGd9KQ4Yv0BMnA8CIowj5Vp+OEVsosXDG2weK6/t1RRLfJmc2Ozrnd6y4DOgQys+mn3WQ==} + '@turbo/windows-arm64@2.10.0': + resolution: {integrity: sha512-dVHGaf9F8twzgibcBqKoADT/LLqf9++jDb+hq/LPWWaOmRpp4M+/pVOm7vy4z9D++xg8eaxWLT0+wQxFwhYu9A==} cpu: [arm64] os: [win32] @@ -3351,6 +3355,11 @@ packages: engines: {node: '>=14'} hasBin: true + prettier@3.9.1: + resolution: {integrity: sha512-ppiDo2CSwexck1eyZUwJHg/N3nf1+6IRCv7W/VJ5vaLnVCmB7+3CdRfMwoCHBBX6xTrREDTksZ4OZl5SSf4zXA==} + engines: {node: '>=14'} + hasBin: true + prop-types@15.8.1: resolution: {integrity: sha512-oj87CgZICdulUohogVAR7AjlC0327U4el4L6eAvOqCeudMDVU0NThNaV+b9Df4dXgSP1gXMTnPdhfe/2qDH5cg==} @@ -3735,8 +3744,8 @@ packages: engines: {node: '>=18.0.0'} hasBin: true - turbo@2.9.16: - resolution: {integrity: sha512-NqgRQy6j6dPYcdSdv0q1g9QsZg7SWg87RERM8otw/1AtKU2yTFVClOM7cbwKzOonZr/Ek1blTBucw64L9H0Bwg==} + turbo@2.10.0: + resolution: {integrity: sha512-o016H9PPtuH2deb3mh3Vci3Avfi9UYgM/RONQisY7HnloupP0IFSbFS3gFYJgFJP8nwBrByHWFQIDa8T2zIXPw==} hasBin: true tweetnacl@1.0.3: @@ -5118,22 +5127,22 @@ snapshots: '@tsconfig/node16@1.0.4': {} - '@turbo/darwin-64@2.9.16': + '@turbo/darwin-64@2.10.0': optional: true - '@turbo/darwin-arm64@2.9.16': + '@turbo/darwin-arm64@2.10.0': optional: true - '@turbo/linux-64@2.9.16': + '@turbo/linux-64@2.10.0': optional: true - '@turbo/linux-arm64@2.9.16': + '@turbo/linux-arm64@2.10.0': optional: true - '@turbo/windows-64@2.9.16': + '@turbo/windows-64@2.10.0': optional: true - '@turbo/windows-arm64@2.9.16': + '@turbo/windows-arm64@2.10.0': optional: true '@tybys/wasm-util@0.10.1': @@ -7197,6 +7206,8 @@ snapshots: prettier@3.8.3: {} + prettier@3.9.1: {} + prop-types@15.8.1: dependencies: loose-envify: 1.4.0 @@ -7742,14 +7753,14 @@ snapshots: optionalDependencies: fsevents: 2.3.3 - turbo@2.9.16: + turbo@2.10.0: optionalDependencies: - '@turbo/darwin-64': 2.9.16 - '@turbo/darwin-arm64': 2.9.16 - '@turbo/linux-64': 2.9.16 - '@turbo/linux-arm64': 2.9.16 - '@turbo/windows-64': 2.9.16 - '@turbo/windows-arm64': 2.9.16 + '@turbo/darwin-64': 2.10.0 + '@turbo/darwin-arm64': 2.10.0 + '@turbo/linux-64': 2.10.0 + '@turbo/linux-arm64': 2.10.0 + '@turbo/windows-64': 2.10.0 + '@turbo/windows-arm64': 2.10.0 tweetnacl@1.0.3: {} From a0f7dc504d56f83f9125f100c3982509486f34f9 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jun 2026 11:36:53 +0100 Subject: [PATCH 2/6] fix(tests): isolate readReceipts from CI Redis and graceful teardown in integration suite - Add vi.mock('../lib/redis.js', () => ({ redis: null })) to readReceipts.test.ts so the if (redis) branch in message_read never runs during these unit tests. Adding a Redis service to CI (#215) made redis truthy, triggering a findMany call that was absent from the db mock and failing two tests. - Add conversationMembers.findMany and the missing drizzle-orm exports (ne, isNull, inArray, sql) to readReceipts.test.ts for completeness. - Replace pub/sub .disconnect() with await .quit().catch(() => {}) in the gateway integration test to prevent unhandled 'Connection is closed' rejections during suite teardown. --- .../integration/gateway.integration.test.ts | 8 ++++---- apps/backend/src/__tests__/readReceipts.test.ts | 13 ++++++++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/apps/backend/src/__tests__/integration/gateway.integration.test.ts b/apps/backend/src/__tests__/integration/gateway.integration.test.ts index 7db8aa5..cb278f3 100644 --- a/apps/backend/src/__tests__/integration/gateway.integration.test.ts +++ b/apps/backend/src/__tests__/integration/gateway.integration.test.ts @@ -185,8 +185,8 @@ async function createGatewayNode(port: number, redis: Redis): Promise((resolve, reject) => httpServer.close((err) => (err ? reject(err) : resolve())), ); - pub.disconnect(); - sub.disconnect(); + await pub.quit().catch(() => {}); + await sub.quit().catch(() => {}); }, }; } @@ -272,7 +272,7 @@ describe('Gateway integration — issue #215', () => { }); afterAll(async () => { - redis.disconnect(); + await redis.quit().catch(() => {}); }); beforeEach(async () => { @@ -543,7 +543,7 @@ describe('Gateway integration — issue #215', () => { expect(clientCarol.connected).toBe(false); } finally { - revSub.disconnect(); + await revSub.quit().catch(() => {}); await node.close(); } }); diff --git a/apps/backend/src/__tests__/readReceipts.test.ts b/apps/backend/src/__tests__/readReceipts.test.ts index 063b1c0..558c6fe 100644 --- a/apps/backend/src/__tests__/readReceipts.test.ts +++ b/apps/backend/src/__tests__/readReceipts.test.ts @@ -6,10 +6,12 @@ import { EventEmitter } from 'events'; const mockFindFirst = vi.fn(); const mockUpdate = vi.fn(); +const mockFindMany = vi.fn(); + vi.mock('../db/index.js', () => ({ db: { query: { - conversationMembers: { findFirst: mockFindFirst }, + conversationMembers: { findFirst: mockFindFirst, findMany: mockFindMany }, messages: { findFirst: mockFindFirst }, }, update: mockUpdate, @@ -22,11 +24,19 @@ vi.mock('../db/schema.js', () => ({ messages: {}, })); +// Keep these unit tests isolated from the CI Redis service so the +// if (redis) branch in message_read never runs here. +vi.mock('../lib/redis.js', () => ({ redis: null })); + vi.mock('drizzle-orm', () => ({ and: vi.fn((...args: unknown[]) => args), eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), + ne: vi.fn((col: unknown, val: unknown) => ({ col, val, op: 'ne' })), + isNull: vi.fn((col: unknown) => ({ col, op: 'isNull' })), lt: vi.fn(), desc: vi.fn(), + inArray: vi.fn((col: unknown, vals: unknown) => ({ col, vals })), + sql: vi.fn(), })); // ── Mock Socket helpers ──────────────────────────────────────────────────── @@ -67,6 +77,7 @@ function makeIo() { describe('message_read socket event', () => { beforeEach(() => { vi.clearAllMocks(); + mockFindMany.mockResolvedValue([]); }); it('persists last_read_message_id and broadcasts read_receipt', async () => { From ff5b6f88248efde5a6c23780a4a6d7ee60229a0e Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jun 2026 11:52:06 +0100 Subject: [PATCH 3/6] fix(tests): suppress ioredis Connection is closed unhandled rejections on teardown ioredis rejects pending-command Promises in its event_handler.js close() function when a connection is force-closed. Those rejections are internal to ioredis and cannot be caught via .catch() on the quit() call because they are created from event callbacks, not from the quit() promise chain. Register a targeted unhandledRejection handler before the suite starts that silences only Error('Connection is closed.') messages and re-throws everything else, then removes itself in afterAll once the Redis connection is fully torn down. This prevents Vitest from treating the ioredis teardown noise as test errors while preserving visibility of any other genuine unhandled rejections. --- .../integration/gateway.integration.test.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/apps/backend/src/__tests__/integration/gateway.integration.test.ts b/apps/backend/src/__tests__/integration/gateway.integration.test.ts index cb278f3..37f8955 100644 --- a/apps/backend/src/__tests__/integration/gateway.integration.test.ts +++ b/apps/backend/src/__tests__/integration/gateway.integration.test.ts @@ -262,10 +262,21 @@ function mockInsertMessage(msg: { // ───────────────────────────────────────────────────────────────────────────── +// ioredis internally rejects pending-command Promises when a connection closes. +// Those rejections are not catchable on the quit() promise itself — they surface +// as unhandled rejections from ioredis's event_handler.js. Register a handler +// that silences only this specific message so Vitest doesn't report it as an +// error while still letting genuine unhandled rejections propagate. +const suppressConnectionClosed = (err: unknown) => { + if (err instanceof Error && err.message === 'Connection is closed.') return; + throw err; +}; + describe('Gateway integration — issue #215', () => { let redis: Redis; beforeAll(async () => { + process.on('unhandledRejection', suppressConnectionClosed); redis = new Redis(REDIS_URL, { lazyConnect: true }); await redis.connect(); redisRef.instance = redis; @@ -273,6 +284,7 @@ describe('Gateway integration — issue #215', () => { afterAll(async () => { await redis.quit().catch(() => {}); + process.off('unhandledRejection', suppressConnectionClosed); }); beforeEach(async () => { From 95205cf50d998b2050dcae96d49a66bed77239f6 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jun 2026 11:53:45 +0100 Subject: [PATCH 4/6] Revert "fix(tests): suppress ioredis Connection is closed unhandled rejections on teardown" This reverts commit ff5b6f88248efde5a6c23780a4a6d7ee60229a0e. --- .../integration/gateway.integration.test.ts | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/apps/backend/src/__tests__/integration/gateway.integration.test.ts b/apps/backend/src/__tests__/integration/gateway.integration.test.ts index 37f8955..cb278f3 100644 --- a/apps/backend/src/__tests__/integration/gateway.integration.test.ts +++ b/apps/backend/src/__tests__/integration/gateway.integration.test.ts @@ -262,21 +262,10 @@ function mockInsertMessage(msg: { // ───────────────────────────────────────────────────────────────────────────── -// ioredis internally rejects pending-command Promises when a connection closes. -// Those rejections are not catchable on the quit() promise itself — they surface -// as unhandled rejections from ioredis's event_handler.js. Register a handler -// that silences only this specific message so Vitest doesn't report it as an -// error while still letting genuine unhandled rejections propagate. -const suppressConnectionClosed = (err: unknown) => { - if (err instanceof Error && err.message === 'Connection is closed.') return; - throw err; -}; - describe('Gateway integration — issue #215', () => { let redis: Redis; beforeAll(async () => { - process.on('unhandledRejection', suppressConnectionClosed); redis = new Redis(REDIS_URL, { lazyConnect: true }); await redis.connect(); redisRef.instance = redis; @@ -284,7 +273,6 @@ describe('Gateway integration — issue #215', () => { afterAll(async () => { await redis.quit().catch(() => {}); - process.off('unhandledRejection', suppressConnectionClosed); }); beforeEach(async () => { From 62dd70f971900430382424196928e2353a7e0a12 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jun 2026 11:58:16 +0100 Subject: [PATCH 5/6] Reapply "fix(tests): suppress ioredis Connection is closed unhandled rejections on teardown" This reverts commit 95205cf50d998b2050dcae96d49a66bed77239f6. --- .../integration/gateway.integration.test.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/apps/backend/src/__tests__/integration/gateway.integration.test.ts b/apps/backend/src/__tests__/integration/gateway.integration.test.ts index cb278f3..37f8955 100644 --- a/apps/backend/src/__tests__/integration/gateway.integration.test.ts +++ b/apps/backend/src/__tests__/integration/gateway.integration.test.ts @@ -262,10 +262,21 @@ function mockInsertMessage(msg: { // ───────────────────────────────────────────────────────────────────────────── +// ioredis internally rejects pending-command Promises when a connection closes. +// Those rejections are not catchable on the quit() promise itself — they surface +// as unhandled rejections from ioredis's event_handler.js. Register a handler +// that silences only this specific message so Vitest doesn't report it as an +// error while still letting genuine unhandled rejections propagate. +const suppressConnectionClosed = (err: unknown) => { + if (err instanceof Error && err.message === 'Connection is closed.') return; + throw err; +}; + describe('Gateway integration — issue #215', () => { let redis: Redis; beforeAll(async () => { + process.on('unhandledRejection', suppressConnectionClosed); redis = new Redis(REDIS_URL, { lazyConnect: true }); await redis.connect(); redisRef.instance = redis; @@ -273,6 +284,7 @@ describe('Gateway integration — issue #215', () => { afterAll(async () => { await redis.quit().catch(() => {}); + process.off('unhandledRejection', suppressConnectionClosed); }); beforeEach(async () => { From 6e437ff3d2c45db3ab2db75342fcbda038f2e95e Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 30 Jun 2026 06:26:51 +0100 Subject: [PATCH 6/6] fix(tests): add db.select mock and isNull to gateway integration test deliverMessage (deliveryPipeline.ts) uses db.select().from().where() which was absent from the mock. Add it with values that make deliverMessage take the new_message emit path: non-empty members + empty activeDevices. --- .../integration/gateway.integration.test.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/apps/backend/src/__tests__/integration/gateway.integration.test.ts b/apps/backend/src/__tests__/integration/gateway.integration.test.ts index 37f8955..32341e0 100644 --- a/apps/backend/src/__tests__/integration/gateway.integration.test.ts +++ b/apps/backend/src/__tests__/integration/gateway.integration.test.ts @@ -45,6 +45,7 @@ vi.mock('../../db/index.js', () => ({ update: vi.fn(), delete: vi.fn(), execute: vi.fn(), + select: vi.fn(), }, })); @@ -61,6 +62,8 @@ vi.mock('../../db/schema.js', () => ({ vi.mock('drizzle-orm', () => ({ and: vi.fn((...args: unknown[]) => args), eq: vi.fn(), + ne: vi.fn(), + isNull: vi.fn(), lt: vi.fn(), desc: vi.fn(), sql: vi.fn(), @@ -290,6 +293,18 @@ describe('Gateway integration — issue #215', () => { beforeEach(async () => { vi.clearAllMocks(); + // Set up db.select chain for deliverMessage (deliveryPipeline.ts). + // deliverMessage queries members then activeDevices via db.select().from().where(). + // Returning non-empty members + empty activeDevices causes it to call + // io.to(conversationId).emit('new_message', message) — the path tests expect. + const mockWhere = vi + .fn() + .mockResolvedValueOnce([{ userId: ALICE.userId }]) // members query + .mockResolvedValue([]); // activeDevices query → triggers new_message emit + vi.mocked(db.select).mockReturnValue({ + from: vi.fn().mockReturnValue({ where: mockWhere }), + } as never); + // Flush all keys written by this suite so tests are hermetically isolated. const patterns = [ `presence:${ALICE.userId}`,