Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 149 additions & 31 deletions packages/flowerbase/src/features/functions/controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { ServerResponse } from 'http'
import { EJSON, ObjectId } from 'bson'
import type { FastifyRequest } from 'fastify'
import { ChangeStream, Document } from 'mongodb';
import type { Document } from 'mongodb'
import { services } from '../../services'
import { StateManager } from '../../state'
import { GenerateContext } from '../../utils/context'
import { Base64Function, FunctionCallBase64Dto, FunctionCallDto } from './dtos'
import { FunctionController } from './interface'
Expand Down Expand Up @@ -51,6 +51,44 @@ const isReturnedError = (value: unknown): value is { message: string; name: stri
const serializeEjson = (value: unknown) =>
JSON.stringify(EJSON.serialize(value, { relaxed: false }))

const isRecord = (value: unknown): value is Record<string, unknown> =>
!!value && typeof value === 'object' && !Array.isArray(value)

type WatchSubscriber = {
id: string
user: Record<string, any>
response: ServerResponse
extraFilter?: Document
}

type SharedWatchStream = {
database: string
collection: string
stream: {
on: (event: 'change' | 'error', listener: (payload: any) => void) => void
off: (event: 'change' | 'error', listener: (payload: any) => void) => void
close: () => Promise<void> | void
}
subscribers: Map<string, WatchSubscriber>
}

const sharedWatchStreams = new Map<string, SharedWatchStream>()
let watchSubscriberCounter = 0

const parseWatchFilter = (args: unknown): Document | undefined => {
if (!isRecord(args)) return undefined
const candidate =
(isRecord(args.filter) ? args.filter : undefined) ??
(isRecord(args.query) ? args.query : undefined)
return candidate ? (candidate as Document) : undefined
}

const isReadableDocumentResult = (value: unknown) =>
!!value &&
typeof value === 'object' &&
!Array.isArray(value) &&
Object.keys(value as Record<string, unknown>).length > 0

/**
* > Creates a pre handler for every query
* @param app -> the fastify instance
Expand All @@ -63,8 +101,6 @@ export const functionsController: FunctionController = async (
) => {
app.addHook('preHandler', app.jwtAuthentication)

const streams = {} as Record<string, ChangeStream<Document, Document>>

app.post<{ Body: FunctionCallDto }>('/call', {
schema: {
tags: ['Functions']
Expand Down Expand Up @@ -170,13 +206,12 @@ export const functionsController: FunctionController = async (
}
const { baas_request, stitch_request } = query

const config: Base64Function = JSON.parse(
const decodedConfig = JSON.parse(
Buffer.from(baas_request || stitch_request || '', 'base64').toString('utf8')
)
const config = EJSON.deserialize(decodedConfig) as Base64Function

const [{ database, collection }] = config.arguments
const app = StateManager.select('app')
const services = StateManager.select('services')
const [{ database, collection, ...watchArgs }] = config.arguments

const headers = {
'Content-Type': 'text/event-stream',
Expand All @@ -190,36 +225,119 @@ export const functionsController: FunctionController = async (
res.raw.writeHead(200, headers)
res.raw.flushHeaders();

const requestKey = baas_request || stitch_request
const streamKey = `${database}::${collection}`
const subscriberId = `${Date.now()}-${watchSubscriberCounter++}`
const extraFilter = parseWatchFilter(watchArgs)
const mongoClient = app.mongo.client as unknown as {
db: (name: string) => { collection: (name: string) => { watch: (...args: any[]) => any } }
}

if (!requestKey) return
let hub = sharedWatchStreams.get(streamKey)
if (!hub) {
const stream = mongoClient.db(database).collection(collection).watch([], {
fullDocument: 'whenAvailable'
})
hub = {
database,
collection,
stream,
subscribers: new Map<string, WatchSubscriber>()
}
sharedWatchStreams.set(streamKey, hub)
}

const changeStream = streams[requestKey]
const ensureHubListeners = (currentHub: SharedWatchStream) => {
if ((currentHub as SharedWatchStream & { listenersBound?: boolean }).listenersBound) {
return
}

if (changeStream) {
changeStream.on('change', (change) => {
res.raw.write(`data: ${serializeEjson(change)}\n\n`);
});
const closeHub = async () => {
currentHub.stream.off('change', onHubChange)
currentHub.stream.off('error', onHubError)
sharedWatchStreams.delete(streamKey)
try {
await currentHub.stream.close()
} catch {
// Ignore stream close errors.
}
}

const onHubChange = async (change: Document) => {
const subscribers = Array.from(currentHub.subscribers.values())
await Promise.all(subscribers.map(async (subscriber) => {
const subscriberRes = subscriber.response
if (subscriberRes.writableEnded || subscriberRes.destroyed) {
currentHub.subscribers.delete(subscriber.id)
return
}

const docId =
(change as { documentKey?: { _id?: unknown } })?.documentKey?._id ??
(change as { fullDocument?: { _id?: unknown } })?.fullDocument?._id
if (typeof docId === 'undefined') return

const readQuery = subscriber.extraFilter
? ({ $and: [subscriber.extraFilter, { _id: docId }] } as Document)
: ({ _id: docId } as Document)

try {
const readableDoc = await services['mongodb-atlas'](app, {
user: subscriber.user,
rules
})
.db(currentHub.database)
.collection(currentHub.collection)
.findOne(readQuery)

if (!isReadableDocumentResult(readableDoc)) return
subscriberRes.write(`data: ${serializeEjson(change)}\n\n`)
} catch (error) {
subscriberRes.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`)
subscriberRes.end()
currentHub.subscribers.delete(subscriber.id)
}
}))

if (!currentHub.subscribers.size) {
await closeHub()
}
}

req.raw.on('close', () => {
console.log("change stream closed");
changeStream?.close?.();
delete streams[requestKey]
});
return
const onHubError = async (error: unknown) => {
for (const subscriber of currentHub.subscribers.values()) {
const subscriberRes = subscriber.response
if (!subscriberRes.writableEnded && !subscriberRes.destroyed) {
subscriberRes.write(`event: error\ndata: ${formatFunctionExecutionError(error)}\n\n`)
subscriberRes.end()
}
}
currentHub.subscribers.clear()
await closeHub()
}

currentHub.stream.on('change', onHubChange)
currentHub.stream.on('error', onHubError)
; (currentHub as SharedWatchStream & { listenersBound?: boolean }).listenersBound = true
}

streams[requestKey] = await services['mongodb-atlas'](app, {
user,
rules
})
.db(database)
.collection(collection)
.watch([], { fullDocument: 'whenAvailable' });
ensureHubListeners(hub)

const subscriber: WatchSubscriber = {
id: subscriberId,
user,
response: res.raw,
extraFilter
}
hub.subscribers.set(subscriberId, subscriber)

streams[requestKey].on('change', (change) => {
res.raw.write(`data: ${serializeEjson(change)}\n\n`);
});
req.raw.on('close', () => {
const currentHub = sharedWatchStreams.get(streamKey)
if (!currentHub) return
currentHub.subscribers.delete(subscriberId)
if (!currentHub.subscribers.size) {
void currentHub.stream.close()
sharedWatchStreams.delete(streamKey)
}
})
})
}
Loading
Loading