diff --git a/packages/flowerbase/src/features/functions/controller.ts b/packages/flowerbase/src/features/functions/controller.ts index 995f618..f8a7532 100644 --- a/packages/flowerbase/src/features/functions/controller.ts +++ b/packages/flowerbase/src/features/functions/controller.ts @@ -1,8 +1,8 @@ +import type { ServerResponse } from 'http' import { EJSON, ObjectId } from 'bson' import type { FastifyRequest } from 'fastify' -import { ChangeStream, Document } from 'mongodb'; +import type { Document } from 'mongodb' import { services } from '../../services' -import { StateManager } from '../../state' import { GenerateContext } from '../../utils/context' import { Base64Function, FunctionCallBase64Dto, FunctionCallDto } from './dtos' import { FunctionController } from './interface' @@ -51,6 +51,44 @@ const isReturnedError = (value: unknown): value is { message: string; name: stri const serializeEjson = (value: unknown) => JSON.stringify(EJSON.serialize(value, { relaxed: false })) +const isRecord = (value: unknown): value is Record => + !!value && typeof value === 'object' && !Array.isArray(value) + +type WatchSubscriber = { + id: string + user: Record + response: ServerResponse + extraFilter?: Document +} + +type SharedWatchStream = { + database: string + collection: string + stream: { + on: (event: 'change' | 'error', listener: (payload: any) => void) => void + off: (event: 'change' | 'error', listener: (payload: any) => void) => void + close: () => Promise | void + } + subscribers: Map +} + +const sharedWatchStreams = new Map() +let watchSubscriberCounter = 0 + +const parseWatchFilter = (args: unknown): Document | undefined => { + if (!isRecord(args)) return undefined + const candidate = + (isRecord(args.filter) ? args.filter : undefined) ?? + (isRecord(args.query) ? args.query : undefined) + return candidate ? (candidate as Document) : undefined +} + +const isReadableDocumentResult = (value: unknown) => + !!value && + typeof value === 'object' && + !Array.isArray(value) && + Object.keys(value as Record).length > 0 + /** * > Creates a pre handler for every query * @param app -> the fastify instance @@ -63,8 +101,6 @@ export const functionsController: FunctionController = async ( ) => { app.addHook('preHandler', app.jwtAuthentication) - const streams = {} as Record> - app.post<{ Body: FunctionCallDto }>('/call', { schema: { tags: ['Functions'] @@ -170,13 +206,12 @@ export const functionsController: FunctionController = async ( } const { baas_request, stitch_request } = query - const config: Base64Function = JSON.parse( + const decodedConfig = JSON.parse( Buffer.from(baas_request || stitch_request || '', 'base64').toString('utf8') ) + const config = EJSON.deserialize(decodedConfig) as Base64Function - const [{ database, collection }] = config.arguments - const app = StateManager.select('app') - const services = StateManager.select('services') + const [{ database, collection, ...watchArgs }] = config.arguments const headers = { 'Content-Type': 'text/event-stream', @@ -190,36 +225,119 @@ export const functionsController: FunctionController = async ( res.raw.writeHead(200, headers) res.raw.flushHeaders(); - const requestKey = baas_request || stitch_request + const streamKey = `${database}::${collection}` + const subscriberId = `${Date.now()}-${watchSubscriberCounter++}` + const extraFilter = parseWatchFilter(watchArgs) + const mongoClient = app.mongo.client as unknown as { + db: (name: string) => { collection: (name: string) => { watch: (...args: any[]) => any } } + } - if (!requestKey) return + let hub = sharedWatchStreams.get(streamKey) + if (!hub) { + const stream = mongoClient.db(database).collection(collection).watch([], { + fullDocument: 'whenAvailable' + }) + hub = { + database, + collection, + stream, + subscribers: new Map() + } + sharedWatchStreams.set(streamKey, hub) + } - const changeStream = streams[requestKey] + const ensureHubListeners = (currentHub: SharedWatchStream) => { + if ((currentHub as SharedWatchStream & { listenersBound?: boolean }).listenersBound) { + return + } - if (changeStream) { - changeStream.on('change', (change) => { - res.raw.write(`data: ${serializeEjson(change)}\n\n`); - }); + const closeHub = async () => { + currentHub.stream.off('change', onHubChange) + currentHub.stream.off('error', onHubError) + sharedWatchStreams.delete(streamKey) + try { + await currentHub.stream.close() + } catch { + // Ignore stream close errors. + } + } + + const onHubChange = async (change: Document) => { + const subscribers = Array.from(currentHub.subscribers.values()) + await Promise.all(subscribers.map(async (subscriber) => { + const subscriberRes = subscriber.response + if (subscriberRes.writableEnded || subscriberRes.destroyed) { + currentHub.subscribers.delete(subscriber.id) + return + } + + const docId = + (change as { documentKey?: { _id?: unknown } })?.documentKey?._id ?? + (change as { fullDocument?: { _id?: unknown } })?.fullDocument?._id + if (typeof docId === 'undefined') return + + const readQuery = subscriber.extraFilter + ? ({ $and: [subscriber.extraFilter, { _id: docId }] } as Document) + : ({ _id: docId } as Document) + + try { + const readableDoc = await services['mongodb-atlas'](app, { + user: subscriber.user, + rules + }) + .db(currentHub.database) + .collection(currentHub.collection) + .findOne(readQuery) + + if (!isReadableDocumentResult(readableDoc)) return + subscriberRes.write(`data: ${serializeEjson(change)}\n\n`) + } catch (error) { + subscriberRes.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) + subscriberRes.end() + currentHub.subscribers.delete(subscriber.id) + } + })) + + if (!currentHub.subscribers.size) { + await closeHub() + } + } - req.raw.on('close', () => { - console.log("change stream closed"); - changeStream?.close?.(); - delete streams[requestKey] - }); - return + const onHubError = async (error: unknown) => { + for (const subscriber of currentHub.subscribers.values()) { + const subscriberRes = subscriber.response + if (!subscriberRes.writableEnded && !subscriberRes.destroyed) { + subscriberRes.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) + subscriberRes.end() + } + } + currentHub.subscribers.clear() + await closeHub() + } + + currentHub.stream.on('change', onHubChange) + currentHub.stream.on('error', onHubError) + ; (currentHub as SharedWatchStream & { listenersBound?: boolean }).listenersBound = true } - streams[requestKey] = await services['mongodb-atlas'](app, { - user, - rules - }) - .db(database) - .collection(collection) - .watch([], { fullDocument: 'whenAvailable' }); + ensureHubListeners(hub) + const subscriber: WatchSubscriber = { + id: subscriberId, + user, + response: res.raw, + extraFilter + } + hub.subscribers.set(subscriberId, subscriber) - streams[requestKey].on('change', (change) => { - res.raw.write(`data: ${serializeEjson(change)}\n\n`); - }); + req.raw.on('close', () => { + const currentHub = sharedWatchStreams.get(streamKey) + if (!currentHub) return + currentHub.subscribers.delete(subscriberId) + if (!currentHub.subscribers.size) { + void currentHub.stream.close() + sharedWatchStreams.delete(streamKey) + } + }) }) } diff --git a/packages/flowerbase/src/services/mongodb-atlas/index.ts b/packages/flowerbase/src/services/mongodb-atlas/index.ts index 16effa1..e68a42c 100644 --- a/packages/flowerbase/src/services/mongodb-atlas/index.ts +++ b/packages/flowerbase/src/services/mongodb-atlas/index.ts @@ -4,6 +4,7 @@ import isEqual from 'lodash/isEqual' import set from 'lodash/set' import unset from 'lodash/unset' import { + ChangeStreamOptions, ClientSession, ClientSessionOptions, Collection, @@ -139,7 +140,73 @@ const normalizeFindOneAndUpdateOptions = ( const buildAndQuery = (clauses: MongoFilter[]): MongoFilter => clauses.length ? { $and: clauses } : {} -const hasAtomicOperators = (data: Document) => Object.keys(data).some((key) => key.startsWith('$')) +const toWatchMatchFilter = (value: unknown): unknown => { + if (Array.isArray(value)) { + return value.map((item) => toWatchMatchFilter(item)) + } + + if (!isPlainObject(value)) return value + + return Object.entries(value).reduce>((acc, [key, current]) => { + if (key.startsWith('$')) { + acc[key] = toWatchMatchFilter(current) + return acc + } + acc[`fullDocument.${key}`] = toWatchMatchFilter(current) + return acc + }, {}) +} + +type RealmCompatibleWatchOptions = Document & { + filter?: MongoFilter + ids?: unknown[] +} + +const resolveWatchArgs = ( + pipelineOrOptions?: Document[] | RealmCompatibleWatchOptions, + options?: RealmCompatibleWatchOptions +) => { + const inputPipeline = Array.isArray(pipelineOrOptions) ? pipelineOrOptions : [] + const rawOptions = (Array.isArray(pipelineOrOptions) ? options : pipelineOrOptions) ?? {} + + if (!isPlainObject(rawOptions)) { + return { + pipeline: inputPipeline, + options: options as ChangeStreamOptions | undefined, + extraMatches: [] as Document[] + } + } + + const { + filter: watchFilter, + ids, + ...watchOptions + } = rawOptions as RealmCompatibleWatchOptions + + const extraMatches: Document[] = [] + if (typeof watchFilter !== 'undefined') { + extraMatches.push({ $match: toWatchMatchFilter(watchFilter) as Document }) + } + if (Array.isArray(ids)) { + extraMatches.push({ + $match: { + $or: [ + { 'documentKey._id': { $in: ids } }, + { 'fullDocument._id': { $in: ids } } + ] + } + }) + } + + return { + pipeline: inputPipeline, + options: watchOptions as ChangeStreamOptions, + extraMatches + } +} + +const hasAtomicOperators = (data: Document) => + Object.keys(data).some((key) => key.startsWith('$')) const normalizeUpdatePayload = (data: Document) => hasAtomicOperators(data) ? data : { $set: data } @@ -364,22 +431,22 @@ const getOperators: GetOperatorsFunction = ( return { /** - * Finds a single document in a MongoDB collection with optional role-based filtering and validation. - * - * @param {Filter} query - The MongoDB query used to match the document. - * @param {Document} [projection] - Optional projection to select returned fields. - * @param {FindOneOptions} [options] - Optional settings for the findOne operation. - * @returns {Promise} A promise resolving to the document if found and permitted, an empty object if access is denied, or `null` if not found. - * - * @description - * If `run_as_system` is enabled, the function behaves like a standard `collection.findOne(query)` with no access checks. - * Otherwise: - * - Merges the provided query with any access control filters using `getFormattedQuery`. - * - Attempts to find the document using the formatted query. - * - Determines the user's role via `getWinningRole`. - * - Validates the result using `checkValidation` to ensure read permission. - * - If validation fails, returns an empty object; otherwise returns the validated document. - */ + * Finds a single document in a MongoDB collection with optional role-based filtering and validation. + * + * @param {Filter} query - The MongoDB query used to match the document. + * @param {Document} [projection] - Optional projection to select returned fields. + * @param {FindOneOptions} [options] - Optional settings for the findOne operation. + * @returns {Promise} A promise resolving to the document if found and permitted, an empty object if access is denied, or `null` if not found. + * + * @description + * If `run_as_system` is enabled, the function behaves like a standard `collection.findOne(query)` with no access checks. + * Otherwise: + * - Merges the provided query with any access control filters using `getFormattedQuery`. + * - Attempts to find the document using the formatted query. + * - Determines the user's role via `getWinningRole`. + * - Validates the result using `checkValidation` to ensure read permission. + * - If validation fails, returns an empty object; otherwise returns the validated document. + */ findOne: async (query = {}, projectionOrOptions, options) => { try { const { projection, options: normalizedOptions } = resolveFindArgs( @@ -389,9 +456,9 @@ const getOperators: GetOperatorsFunction = ( const resolvedOptions = projection || normalizedOptions ? { - ...(normalizedOptions ?? {}), - ...(projection ? { projection } : {}) - } + ...(normalizedOptions ?? {}), + ...(projection ? { projection } : {}) + } : undefined const resolvedQuery = query ?? {} if (!run_as_system) { @@ -429,15 +496,15 @@ const getOperators: GetOperatorsFunction = ( }) const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: result, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: result, + expansions: {} + }, + user + ) : fallbackAccess(result) // Return validated document or empty object if not permitted @@ -490,15 +557,15 @@ const getOperators: GetOperatorsFunction = ( }) const { status } = winningRole ? await checkValidation( - winningRole, - { - type: 'delete', - roles, - cursor: result, - expansions: {} - }, - user - ) + winningRole, + { + type: 'delete', + roles, + cursor: result, + expansions: {} + }, + user + ) : fallbackAccess(result) if (!status) { @@ -545,15 +612,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'insert', - roles, - cursor: data, - expansions: {} - }, - user - ) + winningRole, + { + type: 'insert', + roles, + cursor: data, + expansions: {} + }, + user + ) : fallbackAccess(data) if (!status || !isEqual(data, document)) { @@ -636,15 +703,15 @@ const getOperators: GetOperatorsFunction = ( // Validate update permissions const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'write', - roles, - cursor: docToCheck, - expansions: {} - }, - user - ) + winningRole, + { + type: 'write', + roles, + cursor: docToCheck, + expansions: {} + }, + user + ) : fallbackAccess(docToCheck) // Ensure no unauthorized changes are made const areDocumentsEqual = areUpdatedFieldsAllowed(document, docToCheck, updatedPaths) @@ -751,15 +818,15 @@ const getOperators: GetOperatorsFunction = ( const readRole = getWinningRole(updateResult, user, roles) const readResult = readRole ? await checkValidation( - readRole, - { - type: 'read', - roles, - cursor: updateResult, - expansions: {} - }, - user - ) + readRole, + { + type: 'read', + roles, + cursor: updateResult, + expansions: {} + }, + user + ) : fallbackAccess(updateResult) const sanitizedDoc = readResult.status ? (readResult.document ?? updateResult) : {} @@ -806,9 +873,9 @@ const getOperators: GetOperatorsFunction = ( const resolvedOptions = projection || normalizedOptions ? { - ...(normalizedOptions ?? {}), - ...(projection ? { projection } : {}) - } + ...(normalizedOptions ?? {}), + ...(projection ? { projection } : {}) + } : undefined if (!run_as_system) { checkDenyOperation(normalizedRules, collection.collectionName, CRUD_OPERATIONS.READ) @@ -839,15 +906,15 @@ const getOperators: GetOperatorsFunction = ( }) const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined @@ -928,63 +995,85 @@ const getOperators: GetOperatorsFunction = ( * * This allows fine-grained control over what change events a user can observe, based on roles and filters. */ - watch: (pipeline = [], options) => { + watch: (pipelineOrOptions = [], options) => { try { + const { + pipeline, + options: watchOptions, + extraMatches + } = resolveWatchArgs(pipelineOrOptions as Document[] | RealmCompatibleWatchOptions, options as RealmCompatibleWatchOptions) + if (!run_as_system) { - checkDenyOperation(normalizedRules, collection.collectionName, CRUD_OPERATIONS.READ) + checkDenyOperation( + normalizedRules, + collection.collectionName, + CRUD_OPERATIONS.READ + ) // Apply access filters to initial change stream pipeline const formattedQuery = getFormattedQuery(filters, {}, user) + const watchFormattedQuery = formattedQuery.map( + (condition) => toWatchMatchFilter(condition) as MongoFilter + ) - const firstStep = formattedQuery.length ? { - $match: { - $and: formattedQuery - } - } : undefined + const firstStep = watchFormattedQuery.length + ? { + $match: { + $and: watchFormattedQuery + } + } + : undefined - const formattedPipeline = [ - firstStep, - ...pipeline - ].filter(Boolean) as Document[] + const formattedPipeline = [firstStep, ...extraMatches, ...pipeline].filter(Boolean) as Document[] - const result = collection.watch(formattedPipeline, options) + const result = collection.watch(formattedPipeline, watchOptions) const originalOn = result.on.bind(result) /** * Validates a change event against the user's roles. * * @param {Document} change - A change event from the ChangeStream. - * @returns {Promise<{ status: boolean, document: Document, updatedFieldsStatus: boolean, updatedFields: Document }>} + * @returns {Promise<{ status: boolean, document: Document, updatedFieldsStatus: boolean, updatedFields: Document, hasFullDocument: boolean, hasWinningRole: boolean }>} */ - const isValidChange = async ({ fullDocument, updateDescription }: Document) => { + const isValidChange = async (change: Document) => { + const { fullDocument, updateDescription } = change + const hasFullDocument = !!fullDocument const winningRole = getWinningRole(fullDocument, user, roles) - const { status, document } = winningRole + const fullDocumentValidation = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: fullDocument, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: fullDocument, + expansions: {} + }, + user + ) : fallbackAccess(fullDocument) + const { status, document } = fullDocumentValidation const { status: updatedFieldsStatus, document: updatedFields } = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: updateDescription?.updatedFields, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: updateDescription?.updatedFields, + expansions: {} + }, + user + ) : fallbackAccess(updateDescription?.updatedFields) - return { status, document, updatedFieldsStatus, updatedFields } + return { + status, + document, + updatedFieldsStatus, + updatedFields, + hasFullDocument, + hasWinningRole: !!winningRole + } } // Override the .on() method to apply validation before emitting events @@ -993,9 +1082,13 @@ const getOperators: GetOperatorsFunction = ( listener: EventsDescription[EventKey] ) => { return originalOn(eventType, async (change: Document) => { - const { status, document, updatedFieldsStatus, updatedFields } = - await isValidChange(change) - if (!status) return + const { + document, + updatedFieldsStatus, + updatedFields, + hasFullDocument, + hasWinningRole + } = await isValidChange(change) const filteredChange = { ...change, @@ -1006,6 +1099,18 @@ const getOperators: GetOperatorsFunction = ( } } + console.log('[flowerbase watch] delivered change', { + collection: collName, + operationType: change?.operationType, + eventType, + hasFullDocument, + hasWinningRole, + updatedFieldsStatus, + documentKey: + change?.documentKey?._id?.toString?.() || + change?.documentKey?._id || + null + }) listener(filteredChange) }) } @@ -1014,7 +1119,7 @@ const getOperators: GetOperatorsFunction = ( } // System mode: no filtering applied - const result = collection.watch(pipeline, options) + const result = collection.watch([...extraMatches, ...pipeline], watchOptions) emitMongoEvent('watch') return result } catch (error) { @@ -1105,15 +1210,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'insert', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'insert', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined @@ -1166,15 +1271,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'write', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'write', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined @@ -1236,15 +1341,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'delete', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'delete', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined diff --git a/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts b/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts index 5a5a504..9d2ec89 100644 --- a/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts +++ b/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts @@ -1,3 +1,5 @@ +import http from 'node:http' +import { AddressInfo } from 'node:net' import path from 'node:path' import { EJSON } from 'bson' import { FastifyInstance } from 'fastify' @@ -555,8 +557,8 @@ const ensureFilteredTriggerCollections = async () => { } await recreateCollection(FILTERED_TRIGGER_EVENTS_COLLECTION) - await recreateCollection(FILTERED_UPDATE_TRIGGER_EVENTS_COLLECTION) - await recreateCollection(FILTERED_TRIGGER_ITEMS_COLLECTION, { + await recreateCollection(FILTERED_UPDATE_TRIGGER_EVENTS_COLLECTION) + await recreateCollection(FILTERED_TRIGGER_ITEMS_COLLECTION, { changeStreamPreAndPostImages: { enabled: true } }) } @@ -620,6 +622,188 @@ const waitForFilteredUpdateTriggerEvent = async (documentId: string) => { return null } +type WatchEvent = Document & { + operationType?: string + fullDocument?: Document + documentKey?: { + _id?: unknown + } +} + +type WatchConnection = { + nextEvent: (timeoutMs?: number) => Promise + expectNoEvent: (timeoutMs?: number) => Promise + close: () => Promise +} + +const getServerPort = () => { + if (!appInstance) { + throw new Error('App instance not initialized') + } + const address = appInstance.server.address() + if (!address || typeof address === 'string') { + throw new Error('Unable to resolve app server address') + } + return (address as AddressInfo).port +} + +const openWatchConnection = async ({ + user, + collection, + filter +}: { + user: TestUser + collection: string + filter?: Document +}): Promise => { + const token = getTokenFor(user) + if (!token) { + throw new Error(`Missing token for ${user.id}`) + } + + const payload = { + name: 'watch', + service: 'mongodb-atlas', + arguments: [ + { + database: DB_NAME, + collection, + ...(filter ? { filter } : {}) + } + ] + } + const encoded = Buffer.from(EJSON.stringify(payload)).toString('base64') + const port = getServerPort() + const path = `${FUNCTION_CALL_URL}?baas_request=${encodeURIComponent(encoded)}` + + return new Promise((resolve, reject) => { + const queue: WatchEvent[] = [] + const waiters: Array<(event: WatchEvent) => void> = [] + let streamEnded = false + let settled = false + let buffer = '' + + const emitEvent = (event: WatchEvent) => { + const waiter = waiters.shift() + if (waiter) { + waiter(event) + return + } + queue.push(event) + } + + const parseSseChunk = (chunk: string) => { + buffer += chunk + while (true) { + const boundary = buffer.indexOf('\n\n') + if (boundary === -1) break + const rawEvent = buffer.slice(0, boundary) + buffer = buffer.slice(boundary + 2) + const lines = rawEvent.split('\n') + const dataLines = lines + .filter((line) => line.startsWith('data:')) + .map((line) => line.slice(5).trim()) + .filter(Boolean) + + if (!dataLines.length) continue + const dataPayload = dataLines.join('\n') + try { + const parsed = EJSON.deserialize(JSON.parse(dataPayload)) as WatchEvent + emitEvent(parsed) + } catch { + // Ignore malformed SSE payloads in tests. + } + } + } + + const req = http.request( + { + host: '127.0.0.1', + port, + method: 'GET', + path, + headers: { + Authorization: `Bearer ${token}`, + Accept: 'text/event-stream' + } + }, + (response) => { + if (response.statusCode !== 200) { + if (!settled) { + settled = true + reject(new Error(`watch failed with status ${response.statusCode}`)) + } + return + } + + response.setEncoding('utf8') + response.on('data', parseSseChunk) + response.on('end', () => { + streamEnded = true + }) + response.on('error', (error) => { + if (!settled) { + settled = true + reject(error) + } + }) + + if (!settled) { + settled = true + resolve({ + nextEvent: (timeoutMs = 5000) => + new Promise((resolveEvent, rejectEvent) => { + const immediate = queue.shift() + if (immediate) { + resolveEvent(immediate) + return + } + if (streamEnded) { + rejectEvent(new Error('watch stream ended before receiving an event')) + return + } + const timer = setTimeout(() => { + const index = waiters.indexOf(waitForEvent) + if (index >= 0) waiters.splice(index, 1) + rejectEvent(new Error(`Timed out waiting for watch event (${timeoutMs}ms)`)) + }, timeoutMs) + const waitForEvent = (event: WatchEvent) => { + clearTimeout(timer) + resolveEvent(event) + } + waiters.push(waitForEvent) + }), + expectNoEvent: async (timeoutMs = 800) => { + const immediate = queue.shift() + if (immediate) { + throw new Error(`Unexpected watch event: ${JSON.stringify(immediate)}`) + } + await new Promise((resolveDelay) => setTimeout(resolveDelay, timeoutMs)) + const late = queue.shift() + if (late) { + throw new Error(`Unexpected watch event: ${JSON.stringify(late)}`) + } + }, + close: async () => { + streamEnded = true + req.destroy() + } + }) + } + } + ) + + req.on('error', (error) => { + if (!settled) { + settled = true + reject(error) + } + }) + + req.end() + }) +} + const isReplicaSetNotInitializedError = (error: unknown) => { if (!(error instanceof Error)) { return false @@ -1373,6 +1557,94 @@ describe('MongoDB Atlas rule enforcement (e2e)', () => { const event = await waitForFilteredUpdateTriggerEvent(documentId.toString()) expect(event).toBeNull() }) + + it('keeps remaining same-user watchers alive when one tab closes', async () => { + const first = await openWatchConnection({ user: ownerUser, collection: COUNTERS_COLLECTION }) + const second = await openWatchConnection({ user: ownerUser, collection: COUNTERS_COLLECTION }) + + await first.close() + await new Promise((resolve) => setTimeout(resolve, 200)) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: ownerUser.id, + workspace: 'workspace-1', + value: 501, + visibility: { type: 'all' } + }) + + const event = await second.nextEvent() + expect(event.operationType).toBe('insert') + expect(String(event.documentKey?._id)).toBe(String(insertedId)) + + await second.close() + }) + + it('keeps other users watchers alive when one user disconnects', async () => { + const ownerWatch = await openWatchConnection({ user: ownerUser, collection: COUNTERS_COLLECTION }) + const adminWatch = await openWatchConnection({ user: adminUser, collection: COUNTERS_COLLECTION }) + + await ownerWatch.close() + await new Promise((resolve) => setTimeout(resolve, 200)) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: adminUser.id, + workspace: 'workspace-1', + value: 777, + visibility: { type: 'all' } + }) + + const event = await adminWatch.nextEvent() + expect(event.operationType).toBe('insert') + expect(String(event.documentKey?._id)).toBe(String(insertedId)) + + await adminWatch.close() + }) + + it('does not under-deliver when a less-privileged user subscribes first', async () => { + const guestWatch = await openWatchConnection({ user: guestUser, collection: COUNTERS_COLLECTION }) + const adminWatch = await openWatchConnection({ user: adminUser, collection: COUNTERS_COLLECTION }) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: adminUser.id, + workspace: 'workspace-1', + value: 888, + visibility: { type: 'private' } + }) + + const adminEvent = await adminWatch.nextEvent() + expect(String(adminEvent.documentKey?._id)).toBe(String(insertedId)) + await guestWatch.expectNoEvent() + + await guestWatch.close() + await adminWatch.close() + }) + + it('does not leak events when a privileged user subscribes first', async () => { + const adminWatch = await openWatchConnection({ user: adminUser, collection: COUNTERS_COLLECTION }) + const guestWatch = await openWatchConnection({ user: guestUser, collection: COUNTERS_COLLECTION }) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: adminUser.id, + workspace: 'workspace-1', + value: 999, + visibility: { type: 'private' } + }) + + const adminEvent = await adminWatch.nextEvent() + expect(String(adminEvent.documentKey?._id)).toBe(String(insertedId)) + await guestWatch.expectNoEvent() + + await adminWatch.close() + await guestWatch.close() + }) afterAll(async () => { await appInstance?.close() await client.close()