Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 1 addition & 61 deletions src/plugins/replication-supabase/helper.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,3 @@
import { SupabaseClient } from '@supabase/supabase-js';
import { RxDocumentData, RxJsonSchema, WithDeleted } from '../../types';

export const POSTGRES_INSERT_CONFLICT_CODE = "23505";
export const DEFAULT_MODIFIED_FIELD = '_modified';
export const DEFAULT_DELETED_FIELD = '_deleted';


export function addDocEqualityToQuery<RxDocType>(
jsonSchema: RxJsonSchema<RxDocumentData<RxDocType>>,
deletedField: string,
modifiedField: string,
doc: WithDeleted<RxDocType>,
query: any
) {
const ignoreKeys = new Set([
modifiedField,
deletedField,
'_meta',
'_attachments',
'_rev'
]);

for (const key of Object.keys(doc)) {
if (
ignoreKeys.has(key)
) {
continue;
}

const v = (doc as any)[key];
const type = typeof v;

if (type === "string" || type === "number") {
query = query.eq(key, v);
} else if (type === "boolean" || v === null) {
query = query.is(key, v);
} else if (type === 'undefined') {
query = query.is(key, null);
} else {
throw new Error(`unknown how to handle type: ${type}`)
}
}

const schemaProps: Record<string, any> = jsonSchema.properties;
for (const key of Object.keys(schemaProps)) {
if (
ignoreKeys.has(key) ||
Object.hasOwn(doc, key)
) {
continue;
}
query = query.is(key, null);
}

query = query.eq(deletedField, doc._deleted);
if (schemaProps[modifiedField]) {
query = query.eq(modifiedField, (doc as any)[modifiedField]);
}


return query;
}
export const DEFAULT_DELETED_FIELD = '_deleted';
43 changes: 22 additions & 21 deletions src/plugins/replication-supabase/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import { Subject } from 'rxjs';
import {
DEFAULT_DELETED_FIELD,
DEFAULT_MODIFIED_FIELD,
POSTGRES_INSERT_CONFLICT_CODE,
addDocEqualityToQuery
POSTGRES_INSERT_CONFLICT_CODE
} from './helper.ts';
import { ensureNotFalsy, flatClone, lastOfArray } from '../utils/index.ts';

Expand Down Expand Up @@ -185,7 +184,8 @@ export function replicateSupabase<RxDocType>(
assumedMasterState: WithDeleted<RxDocType>
): Promise<WithDeleted<RxDocType> | undefined> {
ensureNotFalsy(assumedMasterState);
const id = (doc as any)[primaryPath];
const primaryKey: string = primaryPath;
const id = (doc as any)[primaryKey];
const toRow: Record<string, any> = flatClone(doc);
if (doc._deleted) {
toRow[deletedField] = !!doc._deleted;
Expand All @@ -197,29 +197,30 @@ export function replicateSupabase<RxDocType>(
// modified field will be set server-side
delete toRow[modifiedField];

let query = options.client
.from(options.tableName)
.update(toRow);

query = addDocEqualityToQuery(
collection.schema.jsonSchema,
deletedField,
modifiedField,
assumedMasterState,
query
);
// fetch the current document state from the server
const docOnServer: WithDeleted<RxDocType> = await fetchById(id);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By fetching here first it is not inside of one transaction anymore. What happens if someone else changes the document on the server after we fetch docOnServer and before sending the new one to the server?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, this implementation seems problematic. If someone modifies the data in the database while another user is fetching it, the last user to write will overwrite the previous changes without any conflict handling.

Another way to address the issue of overly long URLs would be to add a hash property to the object. This "hash" could be included in the URL alongside the identifier, allowing the update to be performed in a single transaction while also helping detect potential conflicts.

What do you think about this approach?


const { data, error } = await query.select();
if (error) {
throw error;
if (!docOnServer) {
// the document does not exist on the server -> treat as conflict
return docOnServer;
}

const isSame = (Object.keys(assumedMasterState) as (keyof WithDeleted<RxDocType>)[])
.every((prop) => docOnServer[prop] === assumedMasterState[prop])

// check whether the server state matches the assumed master state
if (isSame) {
// no conflict -> proceed with the update
await options.client
.from(options.tableName)
.update(toRow)
.eq(primaryKey, id);

if (data && data.length > 0) {
return;
} else {
// no match -> conflict
return await fetchById(id);
}

// conflict detected -> return the current server state
return docOnServer;
}

const conflicts: WithDeleted<RxDocType>[] = [];
Expand Down
58 changes: 56 additions & 2 deletions test/replication-supabase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {
ensureNotFalsy,
addRxPlugin,
RxCollection,
WithDeleted
WithDeleted,
RxJsonSchema
} from '../plugins/core/index.mjs';
import {
lastOfArray
Expand All @@ -17,7 +18,8 @@ import {
ensureReplicationHasNoErrors,
SimpleHumanDocumentType,
PrimaryHumanDocType,
runReplicationBaseTestSuite
runReplicationBaseTestSuite,
HumanDocumentType
} from '../plugins/test-utils/index.mjs';
import { RxDBDevModePlugin } from '../plugins/dev-mode/index.mjs';
import config from './unit/config.ts';
Expand Down Expand Up @@ -480,6 +482,58 @@ describe('replication-supabase.test.ts', function () {

await collection.database.close();
});

it('#7986 does not add all document fields as equality conditions in the PATCH request URL', async () => {
await cleanUpServer();

const customHumanSchemaWithoutPropertySizeLimits: RxJsonSchema<HumanDocumentType> = {
title: 'human schema without property size limits',
version: 0,
keyCompression: false,
type: 'object',
primaryKey: 'passportId',
properties: {
passportId: {
type: 'string',
maxLength: 100
},
firstName: {
type: 'string'
},
lastName: {
type: 'string'
},
age: {
type: 'integer',
}
},
required: ['passportId']
};

const collection = await humansCollection.createBySchema(customHumanSchemaWithoutPropertySizeLimits);

const replicationState = replicateSupabase<PrimaryHumanDocType>({
tableName,
client: supabase,
replicationIdentifier: randomToken(10),
collection,
pull: { batchSize },
push: { batchSize }
});
ensureReplicationHasNoErrors(replicationState);

const commonId = randomToken(10);
const doc = await collection.insert(schemaObjects.humanData(commonId, undefined, randomToken(40000)));
await replicationState.awaitInSync();

await doc.patch({ firstName: '0' });

const serverState = await getServerState();
assert.strictEqual(serverState.length, 1);

await collection.database.close();
await cleanUpServer();
});
});

describe('last', () => {
Expand Down
Loading