From d0ab6c5ec0e19f37e297b9196795128f9b69aac8 Mon Sep 17 00:00:00 2001 From: jessicalussu Date: Thu, 19 Feb 2026 10:59:29 +0100 Subject: [PATCH 1/6] fix: change stream --- .../src/features/functions/controller.ts | 43 +-- .../src/services/mongodb-atlas/index.ts | 328 ++++++++++-------- 2 files changed, 211 insertions(+), 160 deletions(-) diff --git a/packages/flowerbase/src/features/functions/controller.ts b/packages/flowerbase/src/features/functions/controller.ts index 995f618..a1f22bf 100644 --- a/packages/flowerbase/src/features/functions/controller.ts +++ b/packages/flowerbase/src/features/functions/controller.ts @@ -194,32 +194,33 @@ export const functionsController: FunctionController = async ( if (!requestKey) return - const changeStream = streams[requestKey] + try { + const stream = await services['mongodb-atlas'](app, { + user, + rules + }) + .db(database) + .collection(collection) + .watch([], { fullDocument: 'whenAvailable' }); + + + stream.on('change', (change) => { + res.raw.write(`data: ${JSON.stringify(change)}\n\n`); + }); - if (changeStream) { - changeStream.on('change', (change) => { - res.raw.write(`data: ${serializeEjson(change)}\n\n`); + stream.on('error', (error) => { + console.error('change stream error', { database, collection, requestKey, error }) + stream?.close?.(); + res.raw.end(); }); req.raw.on('close', () => { - console.log("change stream closed"); - changeStream?.close?.(); - delete streams[requestKey] + console.log('change stream closed', { database, collection, requestKey }); + stream?.close?.(); }); - return + } catch (error) { + console.error('watch initialization error', { database, collection, requestKey, error }) + res.raw.end(); } - - streams[requestKey] = await services['mongodb-atlas'](app, { - user, - rules - }) - .db(database) - .collection(collection) - .watch([], { fullDocument: 'whenAvailable' }); - - - streams[requestKey].on('change', (change) => { - res.raw.write(`data: ${serializeEjson(change)}\n\n`); - }); }) } diff --git a/packages/flowerbase/src/services/mongodb-atlas/index.ts b/packages/flowerbase/src/services/mongodb-atlas/index.ts index 16effa1..487efe9 100644 --- a/packages/flowerbase/src/services/mongodb-atlas/index.ts +++ b/packages/flowerbase/src/services/mongodb-atlas/index.ts @@ -139,7 +139,25 @@ const normalizeFindOneAndUpdateOptions = ( const buildAndQuery = (clauses: MongoFilter[]): MongoFilter => clauses.length ? { $and: clauses } : {} -const hasAtomicOperators = (data: Document) => Object.keys(data).some((key) => key.startsWith('$')) +const toWatchMatchFilter = (value: unknown): unknown => { + if (Array.isArray(value)) { + return value.map((item) => toWatchMatchFilter(item)) + } + + if (!isPlainObject(value)) return value + + return Object.entries(value).reduce>((acc, [key, current]) => { + if (key.startsWith('$')) { + acc[key] = toWatchMatchFilter(current) + return acc + } + acc[`fullDocument.${key}`] = toWatchMatchFilter(current) + return acc + }, {}) +} + +const hasAtomicOperators = (data: Document) => + Object.keys(data).some((key) => key.startsWith('$')) const normalizeUpdatePayload = (data: Document) => hasAtomicOperators(data) ? data : { $set: data } @@ -364,22 +382,22 @@ const getOperators: GetOperatorsFunction = ( return { /** - * Finds a single document in a MongoDB collection with optional role-based filtering and validation. - * - * @param {Filter} query - The MongoDB query used to match the document. - * @param {Document} [projection] - Optional projection to select returned fields. - * @param {FindOneOptions} [options] - Optional settings for the findOne operation. - * @returns {Promise} A promise resolving to the document if found and permitted, an empty object if access is denied, or `null` if not found. - * - * @description - * If `run_as_system` is enabled, the function behaves like a standard `collection.findOne(query)` with no access checks. - * Otherwise: - * - Merges the provided query with any access control filters using `getFormattedQuery`. - * - Attempts to find the document using the formatted query. - * - Determines the user's role via `getWinningRole`. - * - Validates the result using `checkValidation` to ensure read permission. - * - If validation fails, returns an empty object; otherwise returns the validated document. - */ + * Finds a single document in a MongoDB collection with optional role-based filtering and validation. + * + * @param {Filter} query - The MongoDB query used to match the document. + * @param {Document} [projection] - Optional projection to select returned fields. + * @param {FindOneOptions} [options] - Optional settings for the findOne operation. + * @returns {Promise} A promise resolving to the document if found and permitted, an empty object if access is denied, or `null` if not found. + * + * @description + * If `run_as_system` is enabled, the function behaves like a standard `collection.findOne(query)` with no access checks. + * Otherwise: + * - Merges the provided query with any access control filters using `getFormattedQuery`. + * - Attempts to find the document using the formatted query. + * - Determines the user's role via `getWinningRole`. + * - Validates the result using `checkValidation` to ensure read permission. + * - If validation fails, returns an empty object; otherwise returns the validated document. + */ findOne: async (query = {}, projectionOrOptions, options) => { try { const { projection, options: normalizedOptions } = resolveFindArgs( @@ -389,9 +407,9 @@ const getOperators: GetOperatorsFunction = ( const resolvedOptions = projection || normalizedOptions ? { - ...(normalizedOptions ?? {}), - ...(projection ? { projection } : {}) - } + ...(normalizedOptions ?? {}), + ...(projection ? { projection } : {}) + } : undefined const resolvedQuery = query ?? {} if (!run_as_system) { @@ -429,15 +447,15 @@ const getOperators: GetOperatorsFunction = ( }) const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: result, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: result, + expansions: {} + }, + user + ) : fallbackAccess(result) // Return validated document or empty object if not permitted @@ -490,15 +508,15 @@ const getOperators: GetOperatorsFunction = ( }) const { status } = winningRole ? await checkValidation( - winningRole, - { - type: 'delete', - roles, - cursor: result, - expansions: {} - }, - user - ) + winningRole, + { + type: 'delete', + roles, + cursor: result, + expansions: {} + }, + user + ) : fallbackAccess(result) if (!status) { @@ -545,15 +563,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'insert', - roles, - cursor: data, - expansions: {} - }, - user - ) + winningRole, + { + type: 'insert', + roles, + cursor: data, + expansions: {} + }, + user + ) : fallbackAccess(data) if (!status || !isEqual(data, document)) { @@ -636,15 +654,15 @@ const getOperators: GetOperatorsFunction = ( // Validate update permissions const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'write', - roles, - cursor: docToCheck, - expansions: {} - }, - user - ) + winningRole, + { + type: 'write', + roles, + cursor: docToCheck, + expansions: {} + }, + user + ) : fallbackAccess(docToCheck) // Ensure no unauthorized changes are made const areDocumentsEqual = areUpdatedFieldsAllowed(document, docToCheck, updatedPaths) @@ -751,15 +769,15 @@ const getOperators: GetOperatorsFunction = ( const readRole = getWinningRole(updateResult, user, roles) const readResult = readRole ? await checkValidation( - readRole, - { - type: 'read', - roles, - cursor: updateResult, - expansions: {} - }, - user - ) + readRole, + { + type: 'read', + roles, + cursor: updateResult, + expansions: {} + }, + user + ) : fallbackAccess(updateResult) const sanitizedDoc = readResult.status ? (readResult.document ?? updateResult) : {} @@ -806,9 +824,9 @@ const getOperators: GetOperatorsFunction = ( const resolvedOptions = projection || normalizedOptions ? { - ...(normalizedOptions ?? {}), - ...(projection ? { projection } : {}) - } + ...(normalizedOptions ?? {}), + ...(projection ? { projection } : {}) + } : undefined if (!run_as_system) { checkDenyOperation(normalizedRules, collection.collectionName, CRUD_OPERATIONS.READ) @@ -839,15 +857,15 @@ const getOperators: GetOperatorsFunction = ( }) const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined @@ -931,20 +949,26 @@ const getOperators: GetOperatorsFunction = ( watch: (pipeline = [], options) => { try { if (!run_as_system) { - checkDenyOperation(normalizedRules, collection.collectionName, CRUD_OPERATIONS.READ) + checkDenyOperation( + normalizedRules, + collection.collectionName, + CRUD_OPERATIONS.READ + ) // Apply access filters to initial change stream pipeline const formattedQuery = getFormattedQuery(filters, {}, user) + const watchFormattedQuery = formattedQuery.map( + (condition) => toWatchMatchFilter(condition) as MongoFilter + ) - const firstStep = formattedQuery.length ? { - $match: { - $and: formattedQuery - } - } : undefined + const firstStep = watchFormattedQuery.length + ? { + $match: { + $and: watchFormattedQuery + } + } + : undefined - const formattedPipeline = [ - firstStep, - ...pipeline - ].filter(Boolean) as Document[] + const formattedPipeline = [firstStep, ...pipeline].filter(Boolean) as Document[] const result = collection.watch(formattedPipeline, options) const originalOn = result.on.bind(result) @@ -953,38 +977,48 @@ const getOperators: GetOperatorsFunction = ( * Validates a change event against the user's roles. * * @param {Document} change - A change event from the ChangeStream. - * @returns {Promise<{ status: boolean, document: Document, updatedFieldsStatus: boolean, updatedFields: Document }>} + * @returns {Promise<{ status: boolean, document: Document, updatedFieldsStatus: boolean, updatedFields: Document, hasFullDocument: boolean, hasWinningRole: boolean }>} */ - const isValidChange = async ({ fullDocument, updateDescription }: Document) => { + const isValidChange = async (change: Document) => { + const { fullDocument, updateDescription } = change + const hasFullDocument = !!fullDocument const winningRole = getWinningRole(fullDocument, user, roles) - const { status, document } = winningRole + const fullDocumentValidation = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: fullDocument, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: fullDocument, + expansions: {} + }, + user + ) : fallbackAccess(fullDocument) + const { status, document } = fullDocumentValidation const { status: updatedFieldsStatus, document: updatedFields } = winningRole ? await checkValidation( - winningRole, - { - type: 'read', - roles, - cursor: updateDescription?.updatedFields, - expansions: {} - }, - user - ) + winningRole, + { + type: 'read', + roles, + cursor: updateDescription?.updatedFields, + expansions: {} + }, + user + ) : fallbackAccess(updateDescription?.updatedFields) - return { status, document, updatedFieldsStatus, updatedFields } + return { + status, + document, + updatedFieldsStatus, + updatedFields, + hasFullDocument, + hasWinningRole: !!winningRole + } } // Override the .on() method to apply validation before emitting events @@ -993,9 +1027,13 @@ const getOperators: GetOperatorsFunction = ( listener: EventsDescription[EventKey] ) => { return originalOn(eventType, async (change: Document) => { - const { status, document, updatedFieldsStatus, updatedFields } = - await isValidChange(change) - if (!status) return + const { + document, + updatedFieldsStatus, + updatedFields, + hasFullDocument, + hasWinningRole + } = await isValidChange(change) const filteredChange = { ...change, @@ -1006,6 +1044,18 @@ const getOperators: GetOperatorsFunction = ( } } + console.log('[flowerbase watch] delivered change', { + collection: collName, + operationType: change?.operationType, + eventType, + hasFullDocument, + hasWinningRole, + updatedFieldsStatus, + documentKey: + change?.documentKey?._id?.toString?.() || + change?.documentKey?._id || + null + }) listener(filteredChange) }) } @@ -1105,15 +1155,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'insert', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'insert', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined @@ -1166,15 +1216,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'write', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'write', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined @@ -1236,15 +1286,15 @@ const getOperators: GetOperatorsFunction = ( const { status, document } = winningRole ? await checkValidation( - winningRole, - { - type: 'delete', - roles, - cursor: currentDoc, - expansions: {} - }, - user - ) + winningRole, + { + type: 'delete', + roles, + cursor: currentDoc, + expansions: {} + }, + user + ) : fallbackAccess(currentDoc) return status ? document : undefined From d0d4b9188be516c177b591d13535afa191b89333 Mon Sep 17 00:00:00 2001 From: jessicalussu Date: Thu, 19 Feb 2026 12:20:06 +0100 Subject: [PATCH 2/6] fix: restore stream watch controller main --- .../src/features/functions/controller.ts | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/packages/flowerbase/src/features/functions/controller.ts b/packages/flowerbase/src/features/functions/controller.ts index a1f22bf..995f618 100644 --- a/packages/flowerbase/src/features/functions/controller.ts +++ b/packages/flowerbase/src/features/functions/controller.ts @@ -194,33 +194,32 @@ export const functionsController: FunctionController = async ( if (!requestKey) return - try { - const stream = await services['mongodb-atlas'](app, { - user, - rules - }) - .db(database) - .collection(collection) - .watch([], { fullDocument: 'whenAvailable' }); - - - stream.on('change', (change) => { - res.raw.write(`data: ${JSON.stringify(change)}\n\n`); - }); + const changeStream = streams[requestKey] - stream.on('error', (error) => { - console.error('change stream error', { database, collection, requestKey, error }) - stream?.close?.(); - res.raw.end(); + if (changeStream) { + changeStream.on('change', (change) => { + res.raw.write(`data: ${serializeEjson(change)}\n\n`); }); req.raw.on('close', () => { - console.log('change stream closed', { database, collection, requestKey }); - stream?.close?.(); + console.log("change stream closed"); + changeStream?.close?.(); + delete streams[requestKey] }); - } catch (error) { - console.error('watch initialization error', { database, collection, requestKey, error }) - res.raw.end(); + return } + + streams[requestKey] = await services['mongodb-atlas'](app, { + user, + rules + }) + .db(database) + .collection(collection) + .watch([], { fullDocument: 'whenAvailable' }); + + + streams[requestKey].on('change', (change) => { + res.raw.write(`data: ${serializeEjson(change)}\n\n`); + }); }) } From 0de14d94e5cca841e17501b2002098de59700ecb Mon Sep 17 00:00:00 2001 From: jessicalussu Date: Thu, 19 Feb 2026 12:22:03 +0100 Subject: [PATCH 3/6] fix: add stream error --- packages/flowerbase/src/features/functions/controller.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/flowerbase/src/features/functions/controller.ts b/packages/flowerbase/src/features/functions/controller.ts index 995f618..8fac5da 100644 --- a/packages/flowerbase/src/features/functions/controller.ts +++ b/packages/flowerbase/src/features/functions/controller.ts @@ -200,6 +200,12 @@ export const functionsController: FunctionController = async ( changeStream.on('change', (change) => { res.raw.write(`data: ${serializeEjson(change)}\n\n`); }); + changeStream.on('error', (error) => { + res.raw.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) + changeStream?.close?.() + delete streams[requestKey] + res.raw.end() + }) req.raw.on('close', () => { console.log("change stream closed"); From 88faf1bd5ddb8f481d4de12f59eca0597a7d7be0 Mon Sep 17 00:00:00 2001 From: jessicalussu Date: Thu, 19 Feb 2026 14:34:53 +0100 Subject: [PATCH 4/6] feat: add extraMatches watch --- .../src/services/mongodb-atlas/index.ts | 63 +++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/packages/flowerbase/src/services/mongodb-atlas/index.ts b/packages/flowerbase/src/services/mongodb-atlas/index.ts index 487efe9..e68a42c 100644 --- a/packages/flowerbase/src/services/mongodb-atlas/index.ts +++ b/packages/flowerbase/src/services/mongodb-atlas/index.ts @@ -4,6 +4,7 @@ import isEqual from 'lodash/isEqual' import set from 'lodash/set' import unset from 'lodash/unset' import { + ChangeStreamOptions, ClientSession, ClientSessionOptions, Collection, @@ -156,6 +157,54 @@ const toWatchMatchFilter = (value: unknown): unknown => { }, {}) } +type RealmCompatibleWatchOptions = Document & { + filter?: MongoFilter + ids?: unknown[] +} + +const resolveWatchArgs = ( + pipelineOrOptions?: Document[] | RealmCompatibleWatchOptions, + options?: RealmCompatibleWatchOptions +) => { + const inputPipeline = Array.isArray(pipelineOrOptions) ? pipelineOrOptions : [] + const rawOptions = (Array.isArray(pipelineOrOptions) ? options : pipelineOrOptions) ?? {} + + if (!isPlainObject(rawOptions)) { + return { + pipeline: inputPipeline, + options: options as ChangeStreamOptions | undefined, + extraMatches: [] as Document[] + } + } + + const { + filter: watchFilter, + ids, + ...watchOptions + } = rawOptions as RealmCompatibleWatchOptions + + const extraMatches: Document[] = [] + if (typeof watchFilter !== 'undefined') { + extraMatches.push({ $match: toWatchMatchFilter(watchFilter) as Document }) + } + if (Array.isArray(ids)) { + extraMatches.push({ + $match: { + $or: [ + { 'documentKey._id': { $in: ids } }, + { 'fullDocument._id': { $in: ids } } + ] + } + }) + } + + return { + pipeline: inputPipeline, + options: watchOptions as ChangeStreamOptions, + extraMatches + } +} + const hasAtomicOperators = (data: Document) => Object.keys(data).some((key) => key.startsWith('$')) @@ -946,8 +995,14 @@ const getOperators: GetOperatorsFunction = ( * * This allows fine-grained control over what change events a user can observe, based on roles and filters. */ - watch: (pipeline = [], options) => { + watch: (pipelineOrOptions = [], options) => { try { + const { + pipeline, + options: watchOptions, + extraMatches + } = resolveWatchArgs(pipelineOrOptions as Document[] | RealmCompatibleWatchOptions, options as RealmCompatibleWatchOptions) + if (!run_as_system) { checkDenyOperation( normalizedRules, @@ -968,9 +1023,9 @@ const getOperators: GetOperatorsFunction = ( } : undefined - const formattedPipeline = [firstStep, ...pipeline].filter(Boolean) as Document[] + const formattedPipeline = [firstStep, ...extraMatches, ...pipeline].filter(Boolean) as Document[] - const result = collection.watch(formattedPipeline, options) + const result = collection.watch(formattedPipeline, watchOptions) const originalOn = result.on.bind(result) /** @@ -1064,7 +1119,7 @@ const getOperators: GetOperatorsFunction = ( } // System mode: no filtering applied - const result = collection.watch(pipeline, options) + const result = collection.watch([...extraMatches, ...pipeline], watchOptions) emitMongoEvent('watch') return result } catch (error) { From c8a132a73332aa2355a8780bf2dac0a0bfff5456 Mon Sep 17 00:00:00 2001 From: Andrea Zucca Date: Thu, 19 Feb 2026 14:35:52 +0100 Subject: [PATCH 5/6] fix: close stream --- .../src/features/functions/controller.ts | 73 ++++++++++--------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/packages/flowerbase/src/features/functions/controller.ts b/packages/flowerbase/src/features/functions/controller.ts index 8fac5da..d41fb16 100644 --- a/packages/flowerbase/src/features/functions/controller.ts +++ b/packages/flowerbase/src/features/functions/controller.ts @@ -1,6 +1,6 @@ import { EJSON, ObjectId } from 'bson' import type { FastifyRequest } from 'fastify' -import { ChangeStream, Document } from 'mongodb'; +import { Document } from 'mongodb' import { services } from '../../services' import { StateManager } from '../../state' import { GenerateContext } from '../../utils/context' @@ -63,8 +63,6 @@ export const functionsController: FunctionController = async ( ) => { app.addHook('preHandler', app.jwtAuthentication) - const streams = {} as Record> - app.post<{ Body: FunctionCallDto }>('/call', { schema: { tags: ['Functions'] @@ -190,42 +188,51 @@ export const functionsController: FunctionController = async ( res.raw.writeHead(200, headers) res.raw.flushHeaders(); - const requestKey = baas_request || stitch_request - - if (!requestKey) return - - const changeStream = streams[requestKey] - - if (changeStream) { - changeStream.on('change', (change) => { - res.raw.write(`data: ${serializeEjson(change)}\n\n`); - }); - changeStream.on('error', (error) => { - res.raw.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) - changeStream?.close?.() - delete streams[requestKey] - res.raw.end() - }) - - req.raw.on('close', () => { - console.log("change stream closed"); - changeStream?.close?.(); - delete streams[requestKey] - }); - return - } - - streams[requestKey] = await services['mongodb-atlas'](app, { + const changeStream = await services['mongodb-atlas'](app, { user, rules }) .db(database) .collection(collection) - .watch([], { fullDocument: 'whenAvailable' }); + .watch([], { fullDocument: 'whenAvailable' }) + + let cleanedUp = false + const safeWrite = (chunk: string) => { + if (res.raw.writableEnded || res.raw.destroyed) return + res.raw.write(chunk) + } + const cleanup = async () => { + if (cleanedUp) return + cleanedUp = true + changeStream.off('change', onChange) + changeStream.off('error', onError) + req.raw.off('close', onRequestClose) + try { + await changeStream.close() + } catch { + // Ignore close errors on already-closed streams. + } + } + + const onChange = (change: Document) => { + safeWrite(`data: ${serializeEjson(change)}\n\n`) + } + + const onError = (error: unknown) => { + safeWrite(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) + if (!res.raw.writableEnded && !res.raw.destroyed) { + res.raw.end() + } + void cleanup() + } + + const onRequestClose = () => { + void cleanup() + } - streams[requestKey].on('change', (change) => { - res.raw.write(`data: ${serializeEjson(change)}\n\n`); - }); + changeStream.on('change', onChange) + changeStream.on('error', onError) + req.raw.on('close', onRequestClose) }) } From d2da92b345b911e2dab0b81781f287a4c2a0f949 Mon Sep 17 00:00:00 2001 From: Andrea Zucca Date: Thu, 19 Feb 2026 14:56:58 +0100 Subject: [PATCH 6/6] fix: add multiplex stream watch --- .../src/features/functions/controller.ts | 187 +++++++++--- ...atlas.rules.e2e.rules-and-triggers.test.ts | 276 +++++++++++++++++- 2 files changed, 420 insertions(+), 43 deletions(-) diff --git a/packages/flowerbase/src/features/functions/controller.ts b/packages/flowerbase/src/features/functions/controller.ts index d41fb16..f8a7532 100644 --- a/packages/flowerbase/src/features/functions/controller.ts +++ b/packages/flowerbase/src/features/functions/controller.ts @@ -1,8 +1,8 @@ +import type { ServerResponse } from 'http' import { EJSON, ObjectId } from 'bson' import type { FastifyRequest } from 'fastify' -import { Document } from 'mongodb' +import type { Document } from 'mongodb' import { services } from '../../services' -import { StateManager } from '../../state' import { GenerateContext } from '../../utils/context' import { Base64Function, FunctionCallBase64Dto, FunctionCallDto } from './dtos' import { FunctionController } from './interface' @@ -51,6 +51,44 @@ const isReturnedError = (value: unknown): value is { message: string; name: stri const serializeEjson = (value: unknown) => JSON.stringify(EJSON.serialize(value, { relaxed: false })) +const isRecord = (value: unknown): value is Record => + !!value && typeof value === 'object' && !Array.isArray(value) + +type WatchSubscriber = { + id: string + user: Record + response: ServerResponse + extraFilter?: Document +} + +type SharedWatchStream = { + database: string + collection: string + stream: { + on: (event: 'change' | 'error', listener: (payload: any) => void) => void + off: (event: 'change' | 'error', listener: (payload: any) => void) => void + close: () => Promise | void + } + subscribers: Map +} + +const sharedWatchStreams = new Map() +let watchSubscriberCounter = 0 + +const parseWatchFilter = (args: unknown): Document | undefined => { + if (!isRecord(args)) return undefined + const candidate = + (isRecord(args.filter) ? args.filter : undefined) ?? + (isRecord(args.query) ? args.query : undefined) + return candidate ? (candidate as Document) : undefined +} + +const isReadableDocumentResult = (value: unknown) => + !!value && + typeof value === 'object' && + !Array.isArray(value) && + Object.keys(value as Record).length > 0 + /** * > Creates a pre handler for every query * @param app -> the fastify instance @@ -168,13 +206,12 @@ export const functionsController: FunctionController = async ( } const { baas_request, stitch_request } = query - const config: Base64Function = JSON.parse( + const decodedConfig = JSON.parse( Buffer.from(baas_request || stitch_request || '', 'base64').toString('utf8') ) + const config = EJSON.deserialize(decodedConfig) as Base64Function - const [{ database, collection }] = config.arguments - const app = StateManager.select('app') - const services = StateManager.select('services') + const [{ database, collection, ...watchArgs }] = config.arguments const headers = { 'Content-Type': 'text/event-stream', @@ -188,51 +225,119 @@ export const functionsController: FunctionController = async ( res.raw.writeHead(200, headers) res.raw.flushHeaders(); - const changeStream = await services['mongodb-atlas'](app, { - user, - rules - }) - .db(database) - .collection(collection) - .watch([], { fullDocument: 'whenAvailable' }) - - let cleanedUp = false - const safeWrite = (chunk: string) => { - if (res.raw.writableEnded || res.raw.destroyed) return - res.raw.write(chunk) + const streamKey = `${database}::${collection}` + const subscriberId = `${Date.now()}-${watchSubscriberCounter++}` + const extraFilter = parseWatchFilter(watchArgs) + const mongoClient = app.mongo.client as unknown as { + db: (name: string) => { collection: (name: string) => { watch: (...args: any[]) => any } } } - const cleanup = async () => { - if (cleanedUp) return - cleanedUp = true - changeStream.off('change', onChange) - changeStream.off('error', onError) - req.raw.off('close', onRequestClose) - try { - await changeStream.close() - } catch { - // Ignore close errors on already-closed streams. + let hub = sharedWatchStreams.get(streamKey) + if (!hub) { + const stream = mongoClient.db(database).collection(collection).watch([], { + fullDocument: 'whenAvailable' + }) + hub = { + database, + collection, + stream, + subscribers: new Map() } + sharedWatchStreams.set(streamKey, hub) } - const onChange = (change: Document) => { - safeWrite(`data: ${serializeEjson(change)}\n\n`) - } + const ensureHubListeners = (currentHub: SharedWatchStream) => { + if ((currentHub as SharedWatchStream & { listenersBound?: boolean }).listenersBound) { + return + } - const onError = (error: unknown) => { - safeWrite(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) - if (!res.raw.writableEnded && !res.raw.destroyed) { - res.raw.end() + const closeHub = async () => { + currentHub.stream.off('change', onHubChange) + currentHub.stream.off('error', onHubError) + sharedWatchStreams.delete(streamKey) + try { + await currentHub.stream.close() + } catch { + // Ignore stream close errors. + } + } + + const onHubChange = async (change: Document) => { + const subscribers = Array.from(currentHub.subscribers.values()) + await Promise.all(subscribers.map(async (subscriber) => { + const subscriberRes = subscriber.response + if (subscriberRes.writableEnded || subscriberRes.destroyed) { + currentHub.subscribers.delete(subscriber.id) + return + } + + const docId = + (change as { documentKey?: { _id?: unknown } })?.documentKey?._id ?? + (change as { fullDocument?: { _id?: unknown } })?.fullDocument?._id + if (typeof docId === 'undefined') return + + const readQuery = subscriber.extraFilter + ? ({ $and: [subscriber.extraFilter, { _id: docId }] } as Document) + : ({ _id: docId } as Document) + + try { + const readableDoc = await services['mongodb-atlas'](app, { + user: subscriber.user, + rules + }) + .db(currentHub.database) + .collection(currentHub.collection) + .findOne(readQuery) + + if (!isReadableDocumentResult(readableDoc)) return + subscriberRes.write(`data: ${serializeEjson(change)}\n\n`) + } catch (error) { + subscriberRes.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) + subscriberRes.end() + currentHub.subscribers.delete(subscriber.id) + } + })) + + if (!currentHub.subscribers.size) { + await closeHub() + } } - void cleanup() + + const onHubError = async (error: unknown) => { + for (const subscriber of currentHub.subscribers.values()) { + const subscriberRes = subscriber.response + if (!subscriberRes.writableEnded && !subscriberRes.destroyed) { + subscriberRes.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`) + subscriberRes.end() + } + } + currentHub.subscribers.clear() + await closeHub() + } + + currentHub.stream.on('change', onHubChange) + currentHub.stream.on('error', onHubError) + ; (currentHub as SharedWatchStream & { listenersBound?: boolean }).listenersBound = true } - const onRequestClose = () => { - void cleanup() + ensureHubListeners(hub) + + const subscriber: WatchSubscriber = { + id: subscriberId, + user, + response: res.raw, + extraFilter } + hub.subscribers.set(subscriberId, subscriber) - changeStream.on('change', onChange) - changeStream.on('error', onError) - req.raw.on('close', onRequestClose) + req.raw.on('close', () => { + const currentHub = sharedWatchStreams.get(streamKey) + if (!currentHub) return + currentHub.subscribers.delete(subscriberId) + if (!currentHub.subscribers.size) { + void currentHub.stream.close() + sharedWatchStreams.delete(streamKey) + } + }) }) } diff --git a/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts b/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts index 5a5a504..9d2ec89 100644 --- a/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts +++ b/tests/e2e/mongodb-atlas.rules.e2e.rules-and-triggers.test.ts @@ -1,3 +1,5 @@ +import http from 'node:http' +import { AddressInfo } from 'node:net' import path from 'node:path' import { EJSON } from 'bson' import { FastifyInstance } from 'fastify' @@ -555,8 +557,8 @@ const ensureFilteredTriggerCollections = async () => { } await recreateCollection(FILTERED_TRIGGER_EVENTS_COLLECTION) - await recreateCollection(FILTERED_UPDATE_TRIGGER_EVENTS_COLLECTION) - await recreateCollection(FILTERED_TRIGGER_ITEMS_COLLECTION, { + await recreateCollection(FILTERED_UPDATE_TRIGGER_EVENTS_COLLECTION) + await recreateCollection(FILTERED_TRIGGER_ITEMS_COLLECTION, { changeStreamPreAndPostImages: { enabled: true } }) } @@ -620,6 +622,188 @@ const waitForFilteredUpdateTriggerEvent = async (documentId: string) => { return null } +type WatchEvent = Document & { + operationType?: string + fullDocument?: Document + documentKey?: { + _id?: unknown + } +} + +type WatchConnection = { + nextEvent: (timeoutMs?: number) => Promise + expectNoEvent: (timeoutMs?: number) => Promise + close: () => Promise +} + +const getServerPort = () => { + if (!appInstance) { + throw new Error('App instance not initialized') + } + const address = appInstance.server.address() + if (!address || typeof address === 'string') { + throw new Error('Unable to resolve app server address') + } + return (address as AddressInfo).port +} + +const openWatchConnection = async ({ + user, + collection, + filter +}: { + user: TestUser + collection: string + filter?: Document +}): Promise => { + const token = getTokenFor(user) + if (!token) { + throw new Error(`Missing token for ${user.id}`) + } + + const payload = { + name: 'watch', + service: 'mongodb-atlas', + arguments: [ + { + database: DB_NAME, + collection, + ...(filter ? { filter } : {}) + } + ] + } + const encoded = Buffer.from(EJSON.stringify(payload)).toString('base64') + const port = getServerPort() + const path = `${FUNCTION_CALL_URL}?baas_request=${encodeURIComponent(encoded)}` + + return new Promise((resolve, reject) => { + const queue: WatchEvent[] = [] + const waiters: Array<(event: WatchEvent) => void> = [] + let streamEnded = false + let settled = false + let buffer = '' + + const emitEvent = (event: WatchEvent) => { + const waiter = waiters.shift() + if (waiter) { + waiter(event) + return + } + queue.push(event) + } + + const parseSseChunk = (chunk: string) => { + buffer += chunk + while (true) { + const boundary = buffer.indexOf('\n\n') + if (boundary === -1) break + const rawEvent = buffer.slice(0, boundary) + buffer = buffer.slice(boundary + 2) + const lines = rawEvent.split('\n') + const dataLines = lines + .filter((line) => line.startsWith('data:')) + .map((line) => line.slice(5).trim()) + .filter(Boolean) + + if (!dataLines.length) continue + const dataPayload = dataLines.join('\n') + try { + const parsed = EJSON.deserialize(JSON.parse(dataPayload)) as WatchEvent + emitEvent(parsed) + } catch { + // Ignore malformed SSE payloads in tests. + } + } + } + + const req = http.request( + { + host: '127.0.0.1', + port, + method: 'GET', + path, + headers: { + Authorization: `Bearer ${token}`, + Accept: 'text/event-stream' + } + }, + (response) => { + if (response.statusCode !== 200) { + if (!settled) { + settled = true + reject(new Error(`watch failed with status ${response.statusCode}`)) + } + return + } + + response.setEncoding('utf8') + response.on('data', parseSseChunk) + response.on('end', () => { + streamEnded = true + }) + response.on('error', (error) => { + if (!settled) { + settled = true + reject(error) + } + }) + + if (!settled) { + settled = true + resolve({ + nextEvent: (timeoutMs = 5000) => + new Promise((resolveEvent, rejectEvent) => { + const immediate = queue.shift() + if (immediate) { + resolveEvent(immediate) + return + } + if (streamEnded) { + rejectEvent(new Error('watch stream ended before receiving an event')) + return + } + const timer = setTimeout(() => { + const index = waiters.indexOf(waitForEvent) + if (index >= 0) waiters.splice(index, 1) + rejectEvent(new Error(`Timed out waiting for watch event (${timeoutMs}ms)`)) + }, timeoutMs) + const waitForEvent = (event: WatchEvent) => { + clearTimeout(timer) + resolveEvent(event) + } + waiters.push(waitForEvent) + }), + expectNoEvent: async (timeoutMs = 800) => { + const immediate = queue.shift() + if (immediate) { + throw new Error(`Unexpected watch event: ${JSON.stringify(immediate)}`) + } + await new Promise((resolveDelay) => setTimeout(resolveDelay, timeoutMs)) + const late = queue.shift() + if (late) { + throw new Error(`Unexpected watch event: ${JSON.stringify(late)}`) + } + }, + close: async () => { + streamEnded = true + req.destroy() + } + }) + } + } + ) + + req.on('error', (error) => { + if (!settled) { + settled = true + reject(error) + } + }) + + req.end() + }) +} + const isReplicaSetNotInitializedError = (error: unknown) => { if (!(error instanceof Error)) { return false @@ -1373,6 +1557,94 @@ describe('MongoDB Atlas rule enforcement (e2e)', () => { const event = await waitForFilteredUpdateTriggerEvent(documentId.toString()) expect(event).toBeNull() }) + + it('keeps remaining same-user watchers alive when one tab closes', async () => { + const first = await openWatchConnection({ user: ownerUser, collection: COUNTERS_COLLECTION }) + const second = await openWatchConnection({ user: ownerUser, collection: COUNTERS_COLLECTION }) + + await first.close() + await new Promise((resolve) => setTimeout(resolve, 200)) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: ownerUser.id, + workspace: 'workspace-1', + value: 501, + visibility: { type: 'all' } + }) + + const event = await second.nextEvent() + expect(event.operationType).toBe('insert') + expect(String(event.documentKey?._id)).toBe(String(insertedId)) + + await second.close() + }) + + it('keeps other users watchers alive when one user disconnects', async () => { + const ownerWatch = await openWatchConnection({ user: ownerUser, collection: COUNTERS_COLLECTION }) + const adminWatch = await openWatchConnection({ user: adminUser, collection: COUNTERS_COLLECTION }) + + await ownerWatch.close() + await new Promise((resolve) => setTimeout(resolve, 200)) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: adminUser.id, + workspace: 'workspace-1', + value: 777, + visibility: { type: 'all' } + }) + + const event = await adminWatch.nextEvent() + expect(event.operationType).toBe('insert') + expect(String(event.documentKey?._id)).toBe(String(insertedId)) + + await adminWatch.close() + }) + + it('does not under-deliver when a less-privileged user subscribes first', async () => { + const guestWatch = await openWatchConnection({ user: guestUser, collection: COUNTERS_COLLECTION }) + const adminWatch = await openWatchConnection({ user: adminUser, collection: COUNTERS_COLLECTION }) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: adminUser.id, + workspace: 'workspace-1', + value: 888, + visibility: { type: 'private' } + }) + + const adminEvent = await adminWatch.nextEvent() + expect(String(adminEvent.documentKey?._id)).toBe(String(insertedId)) + await guestWatch.expectNoEvent() + + await guestWatch.close() + await adminWatch.close() + }) + + it('does not leak events when a privileged user subscribes first', async () => { + const adminWatch = await openWatchConnection({ user: adminUser, collection: COUNTERS_COLLECTION }) + const guestWatch = await openWatchConnection({ user: guestUser, collection: COUNTERS_COLLECTION }) + + const insertedId = new ObjectId() + await client.db(DB_NAME).collection(COUNTERS_COLLECTION).insertOne({ + _id: insertedId, + ownerId: adminUser.id, + workspace: 'workspace-1', + value: 999, + visibility: { type: 'private' } + }) + + const adminEvent = await adminWatch.nextEvent() + expect(String(adminEvent.documentKey?._id)).toBe(String(insertedId)) + await guestWatch.expectNoEvent() + + await adminWatch.close() + await guestWatch.close() + }) afterAll(async () => { await appInstance?.close() await client.close()