diff --git a/docs-src/docs/releases/17.0.0.md b/docs-src/docs/releases/17.0.0.md index 591e9dac2f0..19b9ab8e283 100644 --- a/docs-src/docs/releases/17.0.0.md +++ b/docs-src/docs/releases/17.0.0.md @@ -68,6 +68,7 @@ To improve vibe-coding when working with RxDB directly we: - **ADD** [Google Drive Replication](../replication-google-drive.md) plugin to replicate client data to a clients Google Drive folder without any server. - **ADD** [Microsoft OneDrive Replication](../replication-microsoft-onedrive.md) plugin to replicate client data to a clients OneDrive folder without any server. - **ADD** [react-hooks plugin](../react.md). +- **ADD** [Flutter Plugin](../articles/flutter-database.md) improvements: `findOne()`, `bulkInsert()`, `bulkRemove()`, `upsert()`, `count()`, `patch()`, `incrementalPatch()`, `incrementalRemove()`, `RxDatabase.close()`, and multi-database support. ### 🔁 Reactivity & APIs diff --git a/src/plugins/flutter/dart/lib/src/rxdb_base.dart b/src/plugins/flutter/dart/lib/src/rxdb_base.dart index 83d0bcca39a..6e8be1f2d63 100644 --- a/src/plugins/flutter/dart/lib/src/rxdb_base.dart +++ b/src/plugins/flutter/dart/lib/src/rxdb_base.dart @@ -125,12 +125,14 @@ Future getRxDatabase(String jsFilePath, String databaseName) async { class RxChangeEvent { String operation; + dynamic documentData; dynamic previousDocumentData; String documentId; String? collectionName; bool isLocal; RxChangeEvent( this.operation, + this.documentData, this.previousDocumentData, this.documentId, this.collectionName, @@ -140,6 +142,7 @@ class RxChangeEvent { static RxChangeEvent fromJSON(dynamic json) { RxChangeEvent ret = RxChangeEvent( json['operation'], + json['documentData'], json['previousDocumentData'], json['documentId'], json['collectionName'], @@ -159,6 +162,8 @@ class RxChangeEventBulk { List> events; dynamic checkpoint; String context; + int startTime; + int endTime; RxChangeEventBulk( this.collectionName, @@ -168,7 +173,9 @@ class RxChangeEventBulk { this.internal, this.events, this.checkpoint, - this.context); + this.context, + this.startTime, + this.endTime); static RxChangeEventBulk fromJSON(dynamic json) { List eventsJson = json['events']; @@ -185,7 +192,9 @@ class RxChangeEventBulk { json['internal'], events, json['checkpoint'], - json['context']); + json['context'], + json['startTime'] ?? 0, + json['endTime'] ?? 0); return ret; } } @@ -196,8 +205,12 @@ class RxDatabase { List collectionMeta; Map> collections = {}; ReplaySubject> eventBulks$; + bool closed = false; RxDatabase(this.name, this.engine, this.eventBulks$, this.collectionMeta); + String get _jsDbRef => + "process.databases[" + jsonEncode(name) + "].db"; + RxCollection getCollection(String name) { var meta = collectionMeta.firstWhere((meta) => meta['name'] == name); String collectionName = meta['name']; @@ -214,6 +227,14 @@ class RxDatabase { return collections[name] as RxCollection; } } + + Future close() async { + if (closed) return; + await engine.evaluate('process.close(' + jsonEncode(name) + ');'); + closed = true; + await eventBulks$.close(); + engine.close(); + } } class RxCollection { @@ -229,20 +250,70 @@ class RxCollection { docCache = DocCache(this); } + String get _jsCollRef => + database._jsDbRef + "[" + jsonEncode(name) + "]"; + RxQuery find(dynamic query) { var rxQuery = RxQuery(query, this); return rxQuery; } + RxQuerySingle findOne([dynamic queryOrPrimaryKey]) { + var rxQuery = RxQuerySingle(queryOrPrimaryKey, this); + return rxQuery; + } + Future> insert(data) async { - dynamic result = await database.engine.evaluate("process.db['" + - name + - "'].insert(" + + dynamic result = await database.engine.evaluate(_jsCollRef + + ".insert(" + + jsonEncode(data) + + ").then(d => d.toJSON(true));"); + var document = docCache.getByDocData(result); + return document; + } + + Future>> bulkInsert(List docs) async { + List result = await database.engine.evaluate(_jsCollRef + + ".bulkInsert(" + + jsonEncode(docs) + + ").then(r => r.success.map(d => d.toJSON(true)));"); + return result.map((docData) { + return docCache.getByDocData(docData); + }).toList(); + } + + Future>> bulkRemove(List ids) async { + List result = await database.engine.evaluate(_jsCollRef + + ".bulkRemove(" + + jsonEncode(ids) + + ").then(r => r.success.map(d => d.toJSON(true)));"); + return result.map((docData) { + return docCache.getByDocData(docData); + }).toList(); + } + + Future> upsert(dynamic data) async { + dynamic result = await database.engine.evaluate(_jsCollRef + + ".upsert(" + jsonEncode(data) + ").then(d => d.toJSON(true));"); var document = docCache.getByDocData(result); return document; } + + Future count([dynamic query]) async { + String queryStr = query != null ? jsonEncode(query) : '{}'; + dynamic result = await database.engine.evaluate(_jsCollRef + + ".count(" + + queryStr + + ").exec();"); + return (result as num).toInt(); + } + + Future remove() async { + await database.engine.evaluate(_jsCollRef + + ".remove();"); + } } class RxDocument { @@ -250,13 +321,71 @@ class RxDocument { dynamic data; RxDocument(this.collection, this.data); + String get primary => data[collection.primaryKey].toString(); + + bool get deleted => data['_deleted'] == true; + + Map toJSON() { + return Map.from(data); + } + + dynamic get(String fieldName) { + return data[fieldName]; + } + + /// Sets the value of a field in the local document data. + /// This does not persist the change to the database. + /// Use [patch] or [incrementalPatch] to persist changes. + void set(String fieldName, dynamic value) { + data[fieldName] = value; + } + + Future> patch(Map patchData) async { + String id = primary; + dynamic result = await collection.database.engine.evaluate( + collection._jsCollRef + + ".findOne(" + + jsonEncode(id) + + ").exec().then(d => d.patch(" + + jsonEncode(patchData) + + ")).then(d => d.toJSON(true));"); + data = result; + collection.docCache.updateDocData(id, data); + return this; + } + + Future> incrementalPatch( + Map patchData) async { + String id = primary; + dynamic result = await collection.database.engine.evaluate( + collection._jsCollRef + + ".findOne(" + + jsonEncode(id) + + ").exec().then(d => d.incrementalPatch(" + + jsonEncode(patchData) + + ")).then(d => d.toJSON(true));"); + data = result; + collection.docCache.updateDocData(id, data); + return this; + } + Future> remove() async { - String id = data[collection.primaryKey]; - await collection.database.engine.evaluate("process.db['" + - collection.name + - "'].findOne('" + - id + - "').exec().then(d => d.remove());"); + String id = primary; + await collection.database.engine.evaluate( + collection._jsCollRef + + ".findOne(" + + jsonEncode(id) + + ").exec().then(d => d.remove());"); + return this; + } + + Future> incrementalRemove() async { + String id = primary; + await collection.database.engine.evaluate( + collection._jsCollRef + + ".findOne(" + + jsonEncode(id) + + ").exec().then(d => d.incrementalRemove());"); return this; } } @@ -271,9 +400,8 @@ class RxQuery { RxQuery(this.query, this.collection); Future>> exec() async { List result = await collection.database.engine.evaluate( - "process.db['" + - collection.name + - "'].find(" + + collection._jsCollRef + + ".find(" + jsonEncode(query) + ").exec().then(docs => docs.map(d => d.toJSON(true)));"); @@ -300,6 +428,48 @@ class RxQuery { } } +class RxQuerySingle { + dynamic queryOrPrimaryKey; + RxCollection collection; + + Stream?> results$ = ReplaySubject(); + bool subscribed = false; + + RxQuerySingle(this.queryOrPrimaryKey, this.collection); + + Future?> exec() async { + String queryArg; + if (queryOrPrimaryKey == null) { + queryArg = ''; + } else { + queryArg = jsonEncode(queryOrPrimaryKey); + } + dynamic result = await collection.database.engine.evaluate( + collection._jsCollRef + + ".findOne(" + + queryArg + + ").exec().then(d => d ? d.toJSON(true) : null);"); + if (result == null) { + return null; + } + return collection.docCache.getByDocData(result); + } + + Stream?> $() { + if (subscribed == false) { + subscribed = true; + results$ = MergeStream([ + collection.eventBulks$, + Stream.fromIterable([1]) + ]).asyncMap((event) async { + var newResult = await exec(); + return newResult; + }); + } + return results$; + } +} + class DocCache { RxCollection collection; Map> map = {}; @@ -310,6 +480,7 @@ class DocCache { String id = data[collection.primaryKey]; var docInCache = map[id]; if (docInCache != null) { + docInCache.data = data; return docInCache; } else { var doc = RxDocument(collection, data); @@ -317,6 +488,13 @@ class DocCache { return doc; } } + + void updateDocData(String id, dynamic data) { + var docInCache = map[id]; + if (docInCache != null) { + docInCache.data = data; + } + } } abstract class RxDocTypeParent { diff --git a/src/plugins/flutter/index.ts b/src/plugins/flutter/index.ts index 0ac5427b3a6..9b30fd8028d 100644 --- a/src/plugins/flutter/index.ts +++ b/src/plugins/flutter/index.ts @@ -8,14 +8,17 @@ export type CreateRxDatabaseFunctionType = (databaseName: string) => Promise { const db = await createDB(databaseName); - db.eventBulks$.subscribe((eventBulk: RxChangeEventBulk) => { + const eventSub = db.eventBulks$.subscribe((eventBulk: RxChangeEventBulk) => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore sendRxDBEvent(JSON.stringify(eventBulk)); }); - (process as any).db = db; + (process as any).databases[databaseName] = { db, eventSub }; const collections: { name: string; primaryKey: string; }[] = []; Object.entries(db.collections).forEach(([collectionName, collection]) => { collections.push({ @@ -28,6 +31,14 @@ export function setFlutterRxDatabaseConnector( collections }; }; + (process as any).close = async (databaseName: string) => { + const entry = (process as any).databases[databaseName]; + if (entry) { + entry.eventSub.unsubscribe(); + await entry.db.close(); + delete (process as any).databases[databaseName]; + } + }; } /** diff --git a/test/unit.test.ts b/test/unit.test.ts index d5ab0bc7f4d..7d6ad6ed24a 100644 --- a/test/unit.test.ts +++ b/test/unit.test.ts @@ -69,5 +69,6 @@ import './unit/leader-election.test.ts'; import './unit/backup.test.ts'; import './unit/import-export.test.ts'; import './unit/database-lifecycle.ts'; +import './unit/flutter.test.ts'; import './unit/plugin.test.ts'; import './unit/last.test.ts'; diff --git a/test/unit/flutter.test.ts b/test/unit/flutter.test.ts new file mode 100644 index 00000000000..15de61d7802 --- /dev/null +++ b/test/unit/flutter.test.ts @@ -0,0 +1,229 @@ +import assert from 'assert'; +import { + createRxDatabase, + randomToken, +} from '../../plugins/core/index.mjs'; +import { + setFlutterRxDatabaseConnector, +} from '../../plugins/flutter/index.mjs'; +import config from './config.ts'; +import { schemas } from '../../plugins/test-utils/index.mjs'; +import { isNode } from '../../plugins/test-utils/index.mjs'; + +describe('flutter.test.ts', () => { + if (!isNode) { + return; + } + + function cleanupProcessFields() { + delete (process as any).databases; + delete (process as any).init; + delete (process as any).close; + } + + async function createTestDb(databaseName: string) { + const rxdb = await createRxDatabase({ + name: databaseName, + storage: config.storage.getStorage(), + }); + await rxdb.addCollections({ + humans: { + schema: schemas.human, + }, + }); + return rxdb; + } + + describe('setFlutterRxDatabaseConnector()', () => { + it('should set process.init and process.close and process.databases', () => { + cleanupProcessFields(); + setFlutterRxDatabaseConnector((_name: string) => { + return Promise.resolve({} as any); + }); + assert.ok(typeof (process as any).init === 'function'); + assert.ok(typeof (process as any).close === 'function'); + assert.ok(typeof (process as any).databases === 'object'); + cleanupProcessFields(); + }); + + it('should not reset existing databases when called again', () => { + cleanupProcessFields(); + setFlutterRxDatabaseConnector((_name: string) => { + return Promise.resolve({} as any); + }); + (process as any).databases['existing'] = { db: 'test', eventSub: { unsubscribe() { } } }; + + setFlutterRxDatabaseConnector((_name: string) => { + return Promise.resolve({} as any); + }); + assert.ok((process as any).databases['existing']); + cleanupProcessFields(); + }); + }); + + describe('process.init()', () => { + it('should initialize a database and return its config', async () => { + cleanupProcessFields(); + const sentEvents: string[] = []; + (global as any).sendRxDBEvent = (json: string) => sentEvents.push(json); + + setFlutterRxDatabaseConnector(createTestDb); + + const dbName = randomToken(10); + const result = await (process as any).init(dbName); + + assert.strictEqual(result.databaseName, dbName); + assert.ok(Array.isArray(result.collections)); + assert.strictEqual(result.collections.length, 1); + assert.strictEqual(result.collections[0].name, 'humans'); + assert.strictEqual(result.collections[0].primaryKey, 'passportId'); + + // database should be stored in the map + assert.ok((process as any).databases[dbName]); + assert.ok((process as any).databases[dbName].db); + assert.ok((process as any).databases[dbName].eventSub); + + // cleanup + await (process as any).close(dbName); + delete (global as any).sendRxDBEvent; + cleanupProcessFields(); + }); + + it('should forward eventBulks to sendRxDBEvent', async () => { + cleanupProcessFields(); + const sentEvents: string[] = []; + (global as any).sendRxDBEvent = (json: string) => sentEvents.push(json); + + setFlutterRxDatabaseConnector(createTestDb); + + const dbName = randomToken(10); + await (process as any).init(dbName); + + // insert a document to trigger an event + const storedDb = (process as any).databases[dbName].db; + await storedDb.humans.insert({ + passportId: 'flutter-test-1', + firstName: 'Bob', + lastName: 'Kelso', + age: 56, + }); + + // events should have been forwarded + assert.ok(sentEvents.length > 0); + const parsed = JSON.parse(sentEvents[0]); + assert.ok(parsed.events); + + // cleanup + await (process as any).close(dbName); + delete (global as any).sendRxDBEvent; + cleanupProcessFields(); + }); + + it('should support multiple databases at the same time', async () => { + cleanupProcessFields(); + (global as any).sendRxDBEvent = () => { }; + + setFlutterRxDatabaseConnector(createTestDb); + + const dbName1 = randomToken(10); + const dbName2 = randomToken(10); + + const result1 = await (process as any).init(dbName1); + const result2 = await (process as any).init(dbName2); + + assert.strictEqual(result1.databaseName, dbName1); + assert.strictEqual(result2.databaseName, dbName2); + + // both databases should exist + assert.ok((process as any).databases[dbName1]); + assert.ok((process as any).databases[dbName2]); + + // insert into each database independently + const db1 = (process as any).databases[dbName1].db; + const db2 = (process as any).databases[dbName2].db; + + await db1.humans.insert({ + passportId: 'db1-doc', + firstName: 'Alice', + lastName: 'One', + age: 30, + }); + await db2.humans.insert({ + passportId: 'db2-doc', + firstName: 'Bob', + lastName: 'Two', + age: 40, + }); + + const docs1 = await db1.humans.find().exec(); + const docs2 = await db2.humans.find().exec(); + assert.strictEqual(docs1.length, 1); + assert.strictEqual(docs2.length, 1); + assert.strictEqual(docs1[0].passportId, 'db1-doc'); + assert.strictEqual(docs2[0].passportId, 'db2-doc'); + + // cleanup + await (process as any).close(dbName1); + await (process as any).close(dbName2); + delete (global as any).sendRxDBEvent; + cleanupProcessFields(); + }); + }); + + describe('process.close()', () => { + it('should close a database and remove it from the map', async () => { + cleanupProcessFields(); + (global as any).sendRxDBEvent = () => { }; + + setFlutterRxDatabaseConnector(createTestDb); + + const dbName = randomToken(10); + await (process as any).init(dbName); + assert.ok((process as any).databases[dbName]); + + await (process as any).close(dbName); + assert.strictEqual((process as any).databases[dbName], undefined); + + delete (global as any).sendRxDBEvent; + cleanupProcessFields(); + }); + + it('should not throw when closing a non-existent database', async () => { + cleanupProcessFields(); + setFlutterRxDatabaseConnector((_name: string) => { + return Promise.resolve({} as any); + }); + // should not throw + await (process as any).close('does-not-exist'); + cleanupProcessFields(); + }); + + it('should close one database without affecting the other', async () => { + cleanupProcessFields(); + (global as any).sendRxDBEvent = () => { }; + + setFlutterRxDatabaseConnector(createTestDb); + + const dbName1 = randomToken(10); + const dbName2 = randomToken(10); + + await (process as any).init(dbName1); + await (process as any).init(dbName2); + + // close only dbName1 + await (process as any).close(dbName1); + assert.strictEqual((process as any).databases[dbName1], undefined); + + // dbName2 should still be active + assert.ok((process as any).databases[dbName2]); + const remainingDb = (process as any).databases[dbName2].db; + const docs = await remainingDb.humans.find().exec(); + assert.ok(Array.isArray(docs)); + + // cleanup + await (process as any).close(dbName2); + delete (global as any).sendRxDBEvent; + cleanupProcessFields(); + }); + }); +});