From 361a1bc391b80714a46fac08420547efc37e654e Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Fri, 3 Mar 2023 14:26:13 +0100 Subject: [PATCH 1/7] Supported experimentalAcceptedIds as safer alternative to rejectedIds When design is set that each resource must be confirmed as synced it is safer to look at it from whitelist perspective than blacklist to prevent possible miss-sync. --- src/sync/impl/markAsSynced.js | 19 +++++++++++++++---- src/sync/impl/synchronize.js | 8 +++++++- src/sync/index.d.ts | 7 ++++++- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/sync/impl/markAsSynced.js b/src/sync/impl/markAsSynced.js index 8a3b6c13e..c555cc6b8 100644 --- a/src/sync/impl/markAsSynced.js +++ b/src/sync/impl/markAsSynced.js @@ -5,11 +5,13 @@ import { logError } from '../../utils/common' import type { Database, Model, TableName } from '../..' import { prepareMarkAsSynced } from './helpers' -import type { SyncLocalChanges, SyncRejectedIds } from '../index' +import type { SyncLocalChanges, SyncRejectedIds, SyncAcceptedIds } from '../index' const recordsToMarkAsSynced = ( { changes, affectedRecords }: SyncLocalChanges, allRejectedIds: SyncRejectedIds, + allAcceptedIds: ?SyncAcceptedIds, + allowOnlyAcceptedIds: boolean, ): Model[] => { const syncedRecords = [] @@ -17,6 +19,7 @@ const recordsToMarkAsSynced = ( const { created, updated } = changes[(table: any)] const raws = created.concat(updated) const rejectedIds = new Set(allRejectedIds[(table: any)]) + const acceptedIds = new Set(allAcceptedIds[(table: any)] || []) raws.forEach((raw) => { const { id } = raw @@ -27,7 +30,8 @@ const recordsToMarkAsSynced = ( ) return } - if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id)) { + const isAccepted = (!allAcceptedIds && allowOnlyAcceptedIds) || acceptedIds.has(id); + if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id) && isAccepted) { syncedRecords.push(record) } }) @@ -39,10 +43,13 @@ const destroyDeletedRecords = ( db: Database, { changes }: SyncLocalChanges, allRejectedIds: SyncRejectedIds, + allAcceptedIds: ?SyncAcceptedIds, + allowOnlyAcceptedIds: boolean, ): Promise[] => Object.keys(changes).map((_tableName) => { const tableName: TableName = (_tableName: any) const rejectedIds = new Set(allRejectedIds[tableName]) + const acceptedIds = new Set(allAcceptedIds[(table: any)] || []) const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id)) return deleted.length ? db.adapter.destroyDeletedRecords(tableName, deleted) : Promise.resolve() }) @@ -51,14 +58,18 @@ export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, rejectedIds?: ?SyncRejectedIds, + allAcceptedIds: ?SyncAcceptedIds, + allowOnlyAcceptedIds: boolean, ): Promise { return db.write(async () => { // update and destroy records concurrently await Promise.all([ db.batch( - recordsToMarkAsSynced(syncedLocalChanges, rejectedIds || {}).map(prepareMarkAsSynced), + recordsToMarkAsSynced(syncedLocalChanges, rejectedIds || {}, allAcceptedIds, + allowOnlyAcceptedIds).map(prepareMarkAsSynced), ), - ...destroyDeletedRecords(db, syncedLocalChanges, rejectedIds || {}), + ...destroyDeletedRecords(db, syncedLocalChanges, rejectedIds || {}, allAcceptedIds, + allowOnlyAcceptedIds), ]) }, 'sync-markLocalChangesAsSynced') } diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index eef1f6e81..cd9499b25 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -26,6 +26,7 @@ export default async function synchronize({ conflictResolver, _unsafeBatchPerCollection, unsafeTurbo, + pushShouldConfirmOnlyAccepted, // TODO add }: SyncArgs): Promise { const resetCount = database._resetCount log && (log.startedAt = new Date()) @@ -134,9 +135,14 @@ export default async function synchronize({ (await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {} log && (log.phase = 'pushed') log && (log.rejectedIds = pushResult.experimentalRejectedIds) + // TODO log.acceptedIds = pushResult.experimentalAcceptedIds + // or log.rejectedIds = localChanges - pushResult.experimentalAcceptedIds but can be more + // expensive + // or log.acceptedIds and just count with localChanges - pushResult.experimentalAcceptedIds ensureSameDatabase(database, resetCount) - await markLocalChangesAsSynced(database, localChanges, pushResult.experimentalRejectedIds) + await markLocalChangesAsSynced(database, localChanges, pushResult.experimentalRejectedIds, + pushResult.experimentalAcceptedIds, pushShouldConfirmOnlyAccepted) log && (log.phase = 'marked local changes as synced') } } else { diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index b4449d31f..16d39d343 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -29,9 +29,14 @@ export type SyncPullResult = export type SyncRejectedIds = { [tableName: TableName]: RecordId[] } +export type SyncAcceptedIds = { [tableName: TableName]: RecordId[] } + export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> -export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }> +export type SyncPushResult = $Exact<{ + experimentalRejectedIds?: SyncRejectedIds, + experimentalAcceptedIds?: SyncAcceptedIds, +}> type SyncConflict = $Exact<{ local: DirtyRaw; remote: DirtyRaw; resolved: DirtyRaw }> export type SyncLog = { From 3599f99baa86b42b15435953234b70750395af3a Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:24:38 +0100 Subject: [PATCH 2/7] rebased, fixed, added tests --- src/sync/impl/__tests__/markAsSynced.test.js | 28 +++++++- .../synchronize-partialRejections.test.js | 67 +++++++++++++++++++ src/sync/impl/helpers.js | 37 +++++++++- src/sync/impl/markAsSynced.d.ts | 6 +- src/sync/impl/markAsSynced.js | 29 ++++---- src/sync/impl/synchronize.d.ts | 1 + src/sync/impl/synchronize.js | 16 ++--- src/sync/index.d.ts | 15 +++-- src/sync/index.js | 9 ++- 9 files changed, 172 insertions(+), 36 deletions(-) diff --git a/src/sync/impl/__tests__/markAsSynced.test.js b/src/sync/impl/__tests__/markAsSynced.test.js index db2787641..08e18ae1f 100644 --- a/src/sync/impl/__tests__/markAsSynced.test.js +++ b/src/sync/impl/__tests__/markAsSynced.test.js @@ -131,7 +131,7 @@ describe('markLocalChangesAsSynced', () => { }) // test that second push will mark all as synced - await markLocalChangesAsSynced(database, localChanges2) + await markLocalChangesAsSynced(database, localChanges2, false) expect(destroyDeletedRecordsSpy).toHaveBeenCalledTimes(2) expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) @@ -146,7 +146,7 @@ describe('markLocalChangesAsSynced', () => { const localChanges = await fetchLocalChanges(database) // mark as synced - await markLocalChangesAsSynced(database, localChanges, { + await markLocalChangesAsSynced(database, localChanges, false, { mock_projects: ['pCreated1', 'pUpdated'], mock_comments: ['cDeleted'], }) @@ -161,6 +161,30 @@ describe('markLocalChangesAsSynced', () => { ) expect(await allDeletedRecords([comments])).toEqual(['cDeleted']) }) + it(`marks only acceptedIds as synced`, async () => { + const { database, comments } = makeDatabase() + + const { pCreated1, pUpdated } = await makeLocalChanges(database) + const localChanges = await fetchLocalChanges(database) + + // mark as synced + await markLocalChangesAsSynced(database, localChanges, true, {}, { + // probably better solution exists (we essentially list all but expected in verify) + mock_projects: ['pCreated2', 'pDeleted'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tCreated', 'tUpdated', 'tDeleted'], + }) + + // verify + const localChanges2 = await fetchLocalChanges(database) + expect(localChanges2.changes).toEqual( + makeChangeSet({ + mock_projects: { created: [pCreated1._raw], updated: [pUpdated._raw] }, + mock_comments: { deleted: ['cDeleted'] }, + }), + ) + expect(await allDeletedRecords([comments])).toEqual(['cDeleted']) + }) it(`can mark records as synced when ids are per-table not globally unique`, async () => { const { database, projects, tasks, comments } = makeDatabase() diff --git a/src/sync/impl/__tests__/synchronize-partialRejections.test.js b/src/sync/impl/__tests__/synchronize-partialRejections.test.js index c45f85a80..74700a78d 100644 --- a/src/sync/impl/__tests__/synchronize-partialRejections.test.js +++ b/src/sync/impl/__tests__/synchronize-partialRejections.test.js @@ -62,4 +62,71 @@ describe('synchronize - partial push rejections', () => { }), ) }) + it(`can partially accept a push`, async () => { + const { database } = makeDatabase() + + const { tCreated, tUpdated } = await makeLocalChanges(database) + + const acceptedIds = Object.freeze({ + // probably better solution exists (we essentially list all but expected in expect below) + mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tDeleted'], + }) + const rejectedIds = Object.freeze({ + mock_tasks: ['tCreated', 'tUpdated'], + mock_comments: ['cDeleted'], + }) + const log = {} + await synchronize({ + database, + pullChanges: jest.fn(emptyPull()), + pushChanges: jest.fn(() => ({ experimentalAcceptedIds: acceptedIds })), + pushShouldConfirmOnlyAccepted: true, + log, + }) + expect((await fetchLocalChanges(database)).changes).toEqual( + makeChangeSet({ + mock_tasks: { created: [tCreated._raw], updated: [tUpdated._raw] }, + mock_comments: { deleted: ['cDeleted'] }, + }), + ) + expect(log.rejectedIds).toStrictEqual(rejectedIds) + }) + it(`can partially accept a push and make changes during push`, async () => { + const { database, comments } = makeDatabase() + + const { pCreated1, tUpdated } = await makeLocalChanges(database) + const pCreated1Raw = { ...pCreated1._raw } + let newComment + await synchronize({ + database, + pullChanges: jest.fn(emptyPull()), + pushChanges: jest.fn(async () => { + await database.write(async () => { + await pCreated1.update((p) => { + p.name = 'updated!' + }) + newComment = await comments.create((c) => { + c.body = 'bazinga' + }) + }) + return { + experimentalAcceptedIds: { + mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tCreated', 'tDeleted'], + }, + } + }), + pushShouldConfirmOnlyAccepted: true, + }) + expect((await fetchLocalChanges(database)).changes).toEqual( + makeChangeSet({ + mock_projects: { created: [{ ...pCreated1Raw, _changed: 'name', name: 'updated!' }] }, + mock_tasks: { updated: [tUpdated._raw] }, + mock_comments: { created: [newComment._raw], deleted: ['cDeleted'] }, + }), + ) + }) }) diff --git a/src/sync/impl/helpers.js b/src/sync/impl/helpers.js index df9d32d19..ecfc06b2b 100644 --- a/src/sync/impl/helpers.js +++ b/src/sync/impl/helpers.js @@ -4,7 +4,7 @@ import { values } from '../../utils/fp' import areRecordsEqual from '../../utils/fp/areRecordsEqual' import { invariant } from '../../utils/common' -import type { Model, Collection, Database } from '../..' +import type { Model, Collection, Database, TableName, SyncIds, RecordId } from '../..' import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../../RawRecord' import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index' @@ -148,3 +148,38 @@ export const changeSetCount: (SyncDatabaseChangeSet) => number = (changeset) => ({ created, updated, deleted }) => created.length + updated.length + deleted.length, ), ) + +const extractChangeSetIds: (SyncDatabaseChangeSet) => { [TableName]: RecordId[] } = (changeset) => + Object.keys(changeset).reduce((acc, key) => { + const { created, updated, deleted } = changeset[key] + acc[key] = [ + ...created.map(it => it.id), + ...updated.map(it => it.id), + ...deleted, + ] + return acc + }, {}) + +// Returns all rejected ids and is used when accepted ids are used +export const findRejectedIds: + (SyncIds, SyncIds, SyncDatabaseChangeSet) => SyncIds = + (experimentalRejectedIds, experimentalAcceptedIds, changeset) => { + const localIds = extractChangeSetIds(changeset) + + const acceptedIdsSets = Object.keys(changeset).reduce((acc, key) => { + acc[key] = new Set(experimentalAcceptedIds[key]) + return acc + }, {}) + + return Object.keys(changeset).reduce((acc, key) => { + const rejectedIds = [ + ...(experimentalRejectedIds ? experimentalRejectedIds[key] || [] : []), + ...(localIds[key] || []), + ].filter(it => !acceptedIdsSets[key].has(it)) + + if (rejectedIds.length > 0) { + acc[key] = rejectedIds + } + return acc + }, {}) + } \ No newline at end of file diff --git a/src/sync/impl/markAsSynced.d.ts b/src/sync/impl/markAsSynced.d.ts index 242ec0356..e41621e0d 100644 --- a/src/sync/impl/markAsSynced.d.ts +++ b/src/sync/impl/markAsSynced.d.ts @@ -1,9 +1,11 @@ import type { Database, Model, TableName } from '../..' -import type { SyncLocalChanges, SyncRejectedIds } from '../index' +import type { SyncLocalChanges, SyncIds } from '../index' export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, - rejectedIds?: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + rejectedIds?: SyncIds, + allAcceptedIds?: SyncIds, ): Promise diff --git a/src/sync/impl/markAsSynced.js b/src/sync/impl/markAsSynced.js index c555cc6b8..6ad5a0fc7 100644 --- a/src/sync/impl/markAsSynced.js +++ b/src/sync/impl/markAsSynced.js @@ -5,13 +5,13 @@ import { logError } from '../../utils/common' import type { Database, Model, TableName } from '../..' import { prepareMarkAsSynced } from './helpers' -import type { SyncLocalChanges, SyncRejectedIds, SyncAcceptedIds } from '../index' +import type { SyncLocalChanges, SyncIds } from '../index' const recordsToMarkAsSynced = ( { changes, affectedRecords }: SyncLocalChanges, - allRejectedIds: SyncRejectedIds, - allAcceptedIds: ?SyncAcceptedIds, allowOnlyAcceptedIds: boolean, + allRejectedIds: SyncIds, + allAcceptedIds: SyncIds, ): Model[] => { const syncedRecords = [] @@ -30,7 +30,7 @@ const recordsToMarkAsSynced = ( ) return } - const isAccepted = (!allAcceptedIds && allowOnlyAcceptedIds) || acceptedIds.has(id); + const isAccepted = !allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id); if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id) && isAccepted) { syncedRecords.push(record) } @@ -42,34 +42,35 @@ const recordsToMarkAsSynced = ( const destroyDeletedRecords = ( db: Database, { changes }: SyncLocalChanges, - allRejectedIds: SyncRejectedIds, - allAcceptedIds: ?SyncAcceptedIds, allowOnlyAcceptedIds: boolean, + allRejectedIds: SyncIds, + allAcceptedIds?: ?SyncIds, ): Promise[] => Object.keys(changes).map((_tableName) => { const tableName: TableName = (_tableName: any) const rejectedIds = new Set(allRejectedIds[tableName]) - const acceptedIds = new Set(allAcceptedIds[(table: any)] || []) - const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id)) + const acceptedIds = new Set(allAcceptedIds[tableName] || []) + const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id) && + (!allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id))) return deleted.length ? db.adapter.destroyDeletedRecords(tableName, deleted) : Promise.resolve() }) export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, - rejectedIds?: ?SyncRejectedIds, - allAcceptedIds: ?SyncAcceptedIds, allowOnlyAcceptedIds: boolean, + rejectedIds?: ?SyncIds, + allAcceptedIds?: ?SyncIds, ): Promise { return db.write(async () => { // update and destroy records concurrently await Promise.all([ db.batch( - recordsToMarkAsSynced(syncedLocalChanges, rejectedIds || {}, allAcceptedIds, - allowOnlyAcceptedIds).map(prepareMarkAsSynced), + recordsToMarkAsSynced(syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, + allAcceptedIds || {}).map(prepareMarkAsSynced), ), - ...destroyDeletedRecords(db, syncedLocalChanges, rejectedIds || {}, allAcceptedIds, - allowOnlyAcceptedIds), + ...destroyDeletedRecords(db, syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, + allAcceptedIds || {}), ]) }, 'sync-markLocalChangesAsSynced') } diff --git a/src/sync/impl/synchronize.d.ts b/src/sync/impl/synchronize.d.ts index f19c96957..ea9557b5c 100644 --- a/src/sync/impl/synchronize.d.ts +++ b/src/sync/impl/synchronize.d.ts @@ -9,6 +9,7 @@ export default function synchronize({ migrationsEnabledAtVersion, log, conflictResolver, + pushShouldConfirmOnlyAccepted, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index cd9499b25..9854b3f5d 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -11,7 +11,7 @@ import { setLastPulledSchemaVersion, getMigrationInfo, } from './index' -import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers' +import { ensureSameDatabase, isChangeSetEmpty, changeSetCount, findRejectedIds } from './helpers' import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index' export default async function synchronize({ @@ -24,9 +24,9 @@ export default async function synchronize({ migrationsEnabledAtVersion, log, conflictResolver, + pushShouldConfirmOnlyAccepted, _unsafeBatchPerCollection, unsafeTurbo, - pushShouldConfirmOnlyAccepted, // TODO add }: SyncArgs): Promise { const resetCount = database._resetCount log && (log.startedAt = new Date()) @@ -135,14 +135,14 @@ export default async function synchronize({ (await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {} log && (log.phase = 'pushed') log && (log.rejectedIds = pushResult.experimentalRejectedIds) - // TODO log.acceptedIds = pushResult.experimentalAcceptedIds - // or log.rejectedIds = localChanges - pushResult.experimentalAcceptedIds but can be more - // expensive - // or log.acceptedIds and just count with localChanges - pushResult.experimentalAcceptedIds + if (log && pushShouldConfirmOnlyAccepted) { + log.rejectedIds = findRejectedIds(pushResult.experimentalRejectedIds, + pushResult.experimentalAcceptedIds, localChanges.changes) + } ensureSameDatabase(database, resetCount) - await markLocalChangesAsSynced(database, localChanges, pushResult.experimentalRejectedIds, - pushResult.experimentalAcceptedIds, pushShouldConfirmOnlyAccepted) + await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted, + pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds) log && (log.phase = 'marked local changes as synced') } } else { diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index 16d39d343..04371ee3f 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -27,15 +27,13 @@ export type SyncPullResult = | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> -export type SyncRejectedIds = { [tableName: TableName]: RecordId[] } - -export type SyncAcceptedIds = { [tableName: TableName]: RecordId[] } +export type SyncIds = { [tableName: TableName]: RecordId[] } export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> export type SyncPushResult = $Exact<{ - experimentalRejectedIds?: SyncRejectedIds, - experimentalAcceptedIds?: SyncAcceptedIds, + experimentalRejectedIds?: SyncIds, + experimentalAcceptedIds?: SyncIds, }> type SyncConflict = $Exact<{ local: DirtyRaw; remote: DirtyRaw; resolved: DirtyRaw }> @@ -46,7 +44,7 @@ export type SyncLog = { migration?: MigrationSyncChanges; newLastPulledAt?: number; resolvedConflicts?: SyncConflict[]; - rejectedIds?: SyncRejectedIds; + rejectedIds?: SyncIds; finishedAt?: Date; remoteChangeCount?: number; localChangeCount?: number; @@ -75,6 +73,11 @@ export type SyncArgs = $Exact<{ // If you don't want to change default behavior for a given record, return `resolved` as is // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. conflictResolver?: SyncConflictResolver; + // experimental customization that will cause to only set records as synced if we return id. + // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to + // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that + // unpredicted error will cause data loss (when failed data push isn't re-pushed) + pushShouldConfirmOnlyAccepted?: boolean; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean; // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. diff --git a/src/sync/index.js b/src/sync/index.js index fcce9b80d..768fd4ae6 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -53,11 +53,14 @@ export type SyncPullResult = | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> -export type SyncRejectedIds = { [TableName]: RecordId[] } +export type SyncIds = { [TableName]: RecordId[] } export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }> -export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }> +export type SyncPushResult = $Exact<{ + experimentalRejectedIds?: SyncIds, + experimentalAcceptedIds?: SyncIds, +}> type SyncConflict = $Exact<{ local: DirtyRaw, remote: DirtyRaw, resolved: DirtyRaw }> export type SyncLog = { @@ -67,7 +70,7 @@ export type SyncLog = { migration?: ?MigrationSyncChanges, newLastPulledAt?: number, resolvedConflicts?: SyncConflict[], - rejectedIds?: SyncRejectedIds, + rejectedIds?: SyncIds, finishedAt?: Date, remoteChangeCount?: number, localChangeCount?: number, From ddf945b7228fa0015ebf4c91da2ac34266b27fe3 Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:51:14 +0100 Subject: [PATCH 3/7] lint&flow # Conflicts: # src/sync/impl/helpers.js --- src/sync/impl/helpers.js | 30 ++++++++++++++++++++++-------- src/sync/impl/markAsSynced.js | 4 ++-- src/sync/impl/synchronize.js | 2 +- src/sync/index.js | 5 +++++ 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/sync/impl/helpers.js b/src/sync/impl/helpers.js index ecfc06b2b..32394d12b 100644 --- a/src/sync/impl/helpers.js +++ b/src/sync/impl/helpers.js @@ -4,9 +4,15 @@ import { values } from '../../utils/fp' import areRecordsEqual from '../../utils/fp/areRecordsEqual' import { invariant } from '../../utils/common' -import type { Model, Collection, Database, TableName, SyncIds, RecordId } from '../..' +import type { Model, Collection, Database, TableName, RecordId } from '../..' import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../../RawRecord' -import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index' +import type { + SyncIds, + SyncLog, + SyncDatabaseChangeSet, + SyncShouldUpdateRecord, + SyncConflictResolver, +} from '../index' // Returns raw record with naive solution to a conflict based on local `_changed` field // This is a per-column resolution algorithm. All columns that were changed locally win @@ -150,8 +156,10 @@ export const changeSetCount: (SyncDatabaseChangeSet) => number = (changeset) => ) const extractChangeSetIds: (SyncDatabaseChangeSet) => { [TableName]: RecordId[] } = (changeset) => - Object.keys(changeset).reduce((acc, key) => { + Object.keys(changeset).reduce((acc: { [TableName]: RecordId[] }, key: string) => { + // $FlowFixMe const { created, updated, deleted } = changeset[key] + // $FlowFixMe acc[key] = [ ...created.map(it => it.id), ...updated.map(it => it.id), @@ -162,22 +170,28 @@ const extractChangeSetIds: (SyncDatabaseChangeSet) => { [TableName]: Record // Returns all rejected ids and is used when accepted ids are used export const findRejectedIds: - (SyncIds, SyncIds, SyncDatabaseChangeSet) => SyncIds = + (?SyncIds, ?SyncIds, SyncDatabaseChangeSet) => SyncIds = (experimentalRejectedIds, experimentalAcceptedIds, changeset) => { const localIds = extractChangeSetIds(changeset) - const acceptedIdsSets = Object.keys(changeset).reduce((acc, key) => { - acc[key] = new Set(experimentalAcceptedIds[key]) - return acc + const acceptedIdsSets = Object.keys(changeset).reduce( + (acc: { [TableName]: Set }, key: string) => { + // $FlowFixMe + acc[key] = new Set(experimentalAcceptedIds[key]) + return acc }, {}) - return Object.keys(changeset).reduce((acc, key) => { + return Object.keys(changeset).reduce((acc: { [TableName]: RecordId[] }, key: string) => { const rejectedIds = [ + // $FlowFixMe ...(experimentalRejectedIds ? experimentalRejectedIds[key] || [] : []), + // $FlowFixMe ...(localIds[key] || []), + // $FlowFixMe ].filter(it => !acceptedIdsSets[key].has(it)) if (rejectedIds.length > 0) { + // $FlowFixMe acc[key] = rejectedIds } return acc diff --git a/src/sync/impl/markAsSynced.js b/src/sync/impl/markAsSynced.js index 6ad5a0fc7..7e8e2d012 100644 --- a/src/sync/impl/markAsSynced.js +++ b/src/sync/impl/markAsSynced.js @@ -30,7 +30,7 @@ const recordsToMarkAsSynced = ( ) return } - const isAccepted = !allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id); + const isAccepted = !allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id) if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id) && isAccepted) { syncedRecords.push(record) } @@ -44,7 +44,7 @@ const destroyDeletedRecords = ( { changes }: SyncLocalChanges, allowOnlyAcceptedIds: boolean, allRejectedIds: SyncIds, - allAcceptedIds?: ?SyncIds, + allAcceptedIds: SyncIds, ): Promise[] => Object.keys(changes).map((_tableName) => { const tableName: TableName = (_tableName: any) diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index 9854b3f5d..712a5acee 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -141,7 +141,7 @@ export default async function synchronize({ } ensureSameDatabase(database, resetCount) - await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted, + await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false, pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds) log && (log.phase = 'marked local changes as synced') } diff --git a/src/sync/index.js b/src/sync/index.js index 768fd4ae6..30b07a68b 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -100,6 +100,11 @@ export type SyncArgs = $Exact<{ // If you don't want to change default behavior for a given record, return `resolved` as is // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. conflictResolver?: SyncConflictResolver, + // experimental customization that will cause to only set records as synced if we return id. + // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to + // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that + // unpredicted error will cause data loss (when failed data push isn't re-pushed) + pushShouldConfirmOnlyAccepted?: boolean; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean, // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. From 335c0cc2d5d4879aaf5d63d32fe013dcb944a41e Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Thu, 18 Jan 2024 12:19:23 +0100 Subject: [PATCH 4/7] Added SyncRejectedIds to allow time for migration --- src/sync/index.d.ts | 2 ++ src/sync/index.js | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index 04371ee3f..16e5eb552 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -29,6 +29,8 @@ export type SyncPullResult = export type SyncIds = { [tableName: TableName]: RecordId[] } +export type SyncRejectedIds = SyncIds + export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> export type SyncPushResult = $Exact<{ diff --git a/src/sync/index.js b/src/sync/index.js index 30b07a68b..1a8d02e20 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -55,6 +55,8 @@ export type SyncPullResult = export type SyncIds = { [TableName]: RecordId[] } +export type SyncRejectedIds = SyncIds + export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }> export type SyncPushResult = $Exact<{ From 204022ab544e4785bdda2325851609157bfc62ee Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Fri, 19 Jan 2024 18:00:13 +0100 Subject: [PATCH 5/7] removed SyncShouldUpdateRecord after merge --- src/sync/impl/helpers.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sync/impl/helpers.js b/src/sync/impl/helpers.js index 32394d12b..03985a93f 100644 --- a/src/sync/impl/helpers.js +++ b/src/sync/impl/helpers.js @@ -10,7 +10,6 @@ import type { SyncIds, SyncLog, SyncDatabaseChangeSet, - SyncShouldUpdateRecord, SyncConflictResolver, } from '../index' From 84a5f6b939461366d99fb96d76593816bf9b61e9 Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Wed, 17 Jan 2024 13:40:47 +0100 Subject: [PATCH 6/7] Added option to do push sync conflict resolution # Conflicts: # src/sync/impl/helpers.d.ts --- src/RawRecord/index.d.ts | 2 + src/sync/impl/__tests__/markAsSynced.test.js | 41 ++++++++++++++++++++ src/sync/impl/__tests__/synchronize.test.js | 34 ++++++++++++++++ src/sync/impl/applyRemote.js | 12 +----- src/sync/impl/helpers.d.ts | 16 +++++++- src/sync/impl/helpers.js | 20 +++++++++- src/sync/impl/markAsSynced.d.ts | 4 +- src/sync/impl/markAsSynced.js | 20 ++++++++-- src/sync/impl/synchronize.d.ts | 1 + src/sync/impl/synchronize.js | 4 +- src/sync/index.d.ts | 9 +++++ src/sync/index.js | 9 +++++ 12 files changed, 152 insertions(+), 20 deletions(-) diff --git a/src/RawRecord/index.d.ts b/src/RawRecord/index.d.ts index 54ce69bad..2a5b9e848 100644 --- a/src/RawRecord/index.d.ts +++ b/src/RawRecord/index.d.ts @@ -11,6 +11,8 @@ export type DirtyRaw = { [key: string]: any } type _RawRecord = { id: RecordId _status: SyncStatus + // _changed is used by default pull conflict resolution and determines columns for which local + // changes will override remote changes _changed: string } diff --git a/src/sync/impl/__tests__/markAsSynced.test.js b/src/sync/impl/__tests__/markAsSynced.test.js index 08e18ae1f..4c6a87bd6 100644 --- a/src/sync/impl/__tests__/markAsSynced.test.js +++ b/src/sync/impl/__tests__/markAsSynced.test.js @@ -226,4 +226,45 @@ describe('markLocalChangesAsSynced', () => { it.skip('only returns changed fields', async () => { // TODO: Possible future improvement? }) + describe('pushConflictResolver', () => { + it('marks local changes as synced', async () => { + const { database, tasks } = makeDatabase() + + await makeLocalChanges(database) + + await markLocalChangesAsSynced(database, await fetchLocalChanges(database), false, null, null, + (_table, local, remote, resolved) => { + if (local.id !== 'tCreated' || (remote && remote.changeMe !== true)) { + return resolved + } + resolved.name = remote.name + resolved._status = 'updated' + return resolved + }, + { + mock_tasks: [ + { + id: 'tCreated', + name: 'I shall prevail', + changeMe: true, + }, + ], + }) + + await expectSyncedAndMatches(tasks, 'tCreated', { + _status: 'updated', + name: 'I shall prevail', // concat of remote and local change + }) + + // should be untouched + await expectSyncedAndMatches(tasks, 'tUpdated', { + _status: 'synced', + name: 'local', + position: 100, + description: 'orig', + project_id: 'orig', + }) + + }) + }) }) diff --git a/src/sync/impl/__tests__/synchronize.test.js b/src/sync/impl/__tests__/synchronize.test.js index 475a111a9..7fc7be909 100644 --- a/src/sync/impl/__tests__/synchronize.test.js +++ b/src/sync/impl/__tests__/synchronize.test.js @@ -571,4 +571,38 @@ describe('synchronize', () => { it.skip(`only emits one collection batch change`, async () => { // TODO: unskip when batch change emissions are implemented }) + it(`allows push conflict resolution to be customized`, async () => { + const { database, tasks } = makeDatabase() + const task = tasks.prepareCreateFromDirtyRaw({ + id: 't1', + name: 'Task name', + position: 1, + is_completed: false, + project_id: 'p1', + }) + await database.write(() => database.batch(task)) + + const pushConflictResolver = jest.fn((_table, local, remote, resolved) => { + return resolved + }) + + await synchronize({ + database, + pullChanges: () => ({ + timestamp: 1500, + }), + pushChanges: async () => { + return { + pushResultSet: { + mock_tasks: [ + {id: 't1'}, + ], + }, + } + }, + pushConflictResolver, + }) + + expect(pushConflictResolver).toHaveBeenCalledTimes(1) + }) }) diff --git a/src/sync/impl/applyRemote.js b/src/sync/impl/applyRemote.js index c8a988172..8e619d53e 100644 --- a/src/sync/impl/applyRemote.js +++ b/src/sync/impl/applyRemote.js @@ -11,7 +11,6 @@ import type { Collection, Model, TableName, - DirtyRaw, Query, RawRecord, } from '../..' @@ -25,7 +24,7 @@ import type { SyncConflictResolver, SyncPullStrategy, } from '../index' -import { prepareCreateFromRaw, prepareUpdateFromRaw, recordFromRaw } from './helpers' +import { prepareCreateFromRaw, prepareUpdateFromRaw, recordFromRaw, validateRemoteRaw } from './helpers' type ApplyRemoteChangesContext = $Exact<{ db: Database, @@ -249,15 +248,6 @@ const getAllRecordsToApply = ( ) } -function validateRemoteRaw(raw: DirtyRaw): void { - // TODO: I think other code is actually resilient enough to handle illegal _status and _changed - // would be best to change that part to a warning - but tests are needed - invariant( - raw && typeof raw === 'object' && 'id' in raw && !('_status' in raw || '_changed' in raw), - `[Sync] Invalid raw record supplied to Sync. Records must be objects, must have an 'id' field, and must NOT have a '_status' or '_changed' fields`, - ) -} - function prepareApplyRemoteChangesToCollection( recordsToApply: RecordsToApplyRemoteChangesTo, collection: Collection, diff --git a/src/sync/impl/helpers.d.ts b/src/sync/impl/helpers.d.ts index 746efbc87..02ed0acd7 100644 --- a/src/sync/impl/helpers.d.ts +++ b/src/sync/impl/helpers.d.ts @@ -1,12 +1,20 @@ import type { Model, Collection, Database } from '../..' import type { RawRecord, DirtyRaw } from '../../RawRecord' -import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index' +import type { + SyncLog, + SyncDatabaseChangeSet, + SyncShouldUpdateRecord, + SyncConflictResolver, + SyncPushResultSet, +} from '../index' // Returns raw record with naive solution to a conflict based on local `_changed` field // This is a per-column resolution algorithm. All columns that were changed locally win // and will be applied on top of the remote version. export function resolveConflict(local: RawRecord, remote: DirtyRaw): DirtyRaw +export function validateRemoteRaw(raw: DirtyRaw): void + export function prepareCreateFromRaw( collection: Collection, dirtyRaw: DirtyRaw, @@ -19,7 +27,11 @@ export function prepareUpdateFromRaw( conflictResolver?: SyncConflictResolver, ): T -export function prepareMarkAsSynced(record: T): T +export function prepareMarkAsSynced( + record: T, + pushConflictResolver?: SyncConflictResolver, + remoteDirtyRaw?: DirtyRaw, +): T export function ensureSameDatabase(database: Database, initialResetCount: number): void diff --git a/src/sync/impl/helpers.js b/src/sync/impl/helpers.js index 03985a93f..1e24b479f 100644 --- a/src/sync/impl/helpers.js +++ b/src/sync/impl/helpers.js @@ -42,6 +42,15 @@ export function resolveConflict(local: RawRecord, remote: DirtyRaw): DirtyRaw { return resolved } +export function validateRemoteRaw(raw: DirtyRaw): void { + // TODO: I think other code is actually resilient enough to handle illegal _status and _changed + // would be best to change that part to a warning - but tests are needed + invariant( + raw && typeof raw === 'object' && 'id' in raw && !('_status' in raw || '_changed' in raw), + `[Sync] Invalid raw record supplied to Sync. Records must be objects, must have an 'id' field, and must NOT have a '_status' or '_changed' fields`, + ) +} + function replaceRaw(record: Model, dirtyRaw: DirtyRaw): void { record._raw = sanitizedRaw(dirtyRaw, record.collection.schema) } @@ -125,9 +134,16 @@ export function prepareUpdateFromRaw( }) } -export function prepareMarkAsSynced(record: T): T { +export function prepareMarkAsSynced( + record: T, + pushConflictResolver?: ?SyncConflictResolver, + remoteDirtyRaw?: ?DirtyRaw, +): T { // $FlowFixMe - const newRaw = Object.assign({}, record._raw, { _status: 'synced', _changed: '' }) // faster than object spread + let newRaw = Object.assign({}, record._raw, { _status: 'synced', _changed: '' }) // faster than object spread + if (pushConflictResolver) { + newRaw = pushConflictResolver(record.collection.table, record._raw, remoteDirtyRaw, newRaw) + } // $FlowFixMe return record.prepareUpdate(() => { replaceRaw(record, newRaw) diff --git a/src/sync/impl/markAsSynced.d.ts b/src/sync/impl/markAsSynced.d.ts index e41621e0d..5d5797e01 100644 --- a/src/sync/impl/markAsSynced.d.ts +++ b/src/sync/impl/markAsSynced.d.ts @@ -1,6 +1,6 @@ import type { Database, Model, TableName } from '../..' -import type { SyncLocalChanges, SyncIds } from '../index' +import type { SyncLocalChanges, SyncIds, SyncConflictResolver, SyncPushResultSet } from '../index' export default function markLocalChangesAsSynced( db: Database, @@ -8,4 +8,6 @@ export default function markLocalChangesAsSynced( allowOnlyAcceptedIds: boolean, rejectedIds?: SyncIds, allAcceptedIds?: SyncIds, + pushConflictResolver?: SyncConflictResolver, + remoteDirtyRaws?: SyncPushResultSet, ): Promise diff --git a/src/sync/impl/markAsSynced.js b/src/sync/impl/markAsSynced.js index 7e8e2d012..80d7e3144 100644 --- a/src/sync/impl/markAsSynced.js +++ b/src/sync/impl/markAsSynced.js @@ -4,8 +4,8 @@ import areRecordsEqual from '../../utils/fp/areRecordsEqual' import { logError } from '../../utils/common' import type { Database, Model, TableName } from '../..' -import { prepareMarkAsSynced } from './helpers' -import type { SyncLocalChanges, SyncIds } from '../index' +import { prepareMarkAsSynced, validateRemoteRaw } from './helpers' +import type { SyncLocalChanges, SyncIds, SyncConflictResolver, SyncPushResultSet } from '../index' const recordsToMarkAsSynced = ( { changes, affectedRecords }: SyncLocalChanges, @@ -61,13 +61,27 @@ export default function markLocalChangesAsSynced( allowOnlyAcceptedIds: boolean, rejectedIds?: ?SyncIds, allAcceptedIds?: ?SyncIds, + pushConflictResolver?: ?SyncConflictResolver, + remoteDirtyRaws?: ?SyncPushResultSet, ): Promise { return db.write(async () => { // update and destroy records concurrently await Promise.all([ db.batch( recordsToMarkAsSynced(syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, - allAcceptedIds || {}).map(prepareMarkAsSynced), + allAcceptedIds || {}).map(it => { + // if pushConflictResolver is not set, lookup by remote raws isn't necessary + if (!pushConflictResolver || !remoteDirtyRaws) { + return prepareMarkAsSynced(it, null, null) + } + const collectionRemoteDirtyRaws = remoteDirtyRaws[it.collection.modelClass.table] + if (!collectionRemoteDirtyRaws) { + return prepareMarkAsSynced(it, null, null) + } + const remoteDirtyRaw = collectionRemoteDirtyRaws.find(dirtyRaw => dirtyRaw.id === it.id) + remoteDirtyRaw && validateRemoteRaw(remoteDirtyRaw) + return prepareMarkAsSynced(it, pushConflictResolver, remoteDirtyRaw) + }), ), ...destroyDeletedRecords(db, syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, allAcceptedIds || {}), diff --git a/src/sync/impl/synchronize.d.ts b/src/sync/impl/synchronize.d.ts index ea9557b5c..ba0e4ce3f 100644 --- a/src/sync/impl/synchronize.d.ts +++ b/src/sync/impl/synchronize.d.ts @@ -10,6 +10,7 @@ export default function synchronize({ log, conflictResolver, pushShouldConfirmOnlyAccepted, + pushConflictResolver, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index 712a5acee..78e1c261f 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -25,6 +25,7 @@ export default async function synchronize({ log, conflictResolver, pushShouldConfirmOnlyAccepted, + pushConflictResolver, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise { @@ -142,7 +143,8 @@ export default async function synchronize({ ensureSameDatabase(database, resetCount) await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false, - pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds) + pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds, pushConflictResolver, + pushResult.pushResultSet) log && (log.phase = 'marked local changes as synced') } } else { diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index 16e5eb552..c4df63562 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -33,9 +33,12 @@ export type SyncRejectedIds = SyncIds export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> +export type SyncPushResultSet = { [tableName: TableName]: DirtyRaw[] } + export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncIds, experimentalAcceptedIds?: SyncIds, + pushResultSet?: SyncPushResultSet, }> type SyncConflict = $Exact<{ local: DirtyRaw; remote: DirtyRaw; resolved: DirtyRaw }> @@ -80,6 +83,12 @@ export type SyncArgs = $Exact<{ // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that // unpredicted error will cause data loss (when failed data push isn't re-pushed) pushShouldConfirmOnlyAccepted?: boolean; + // conflict resolver on push side of sync which also requires returned records from backend. + // This is also useful for multi-step sync where one must control in which state sync is and if it + // must be repeated. + // Note that by default _status will be still synced so update if required + // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. + pushConflictResolver?: SyncConflictResolver; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean; // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. diff --git a/src/sync/index.js b/src/sync/index.js index 1a8d02e20..732c624e6 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -59,9 +59,12 @@ export type SyncRejectedIds = SyncIds export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }> +export type SyncPushResultSet = { [tableName: TableName]: DirtyRaw[] } + export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncIds, experimentalAcceptedIds?: SyncIds, + pushResultSet?: SyncPushResultSet, }> type SyncConflict = $Exact<{ local: DirtyRaw, remote: DirtyRaw, resolved: DirtyRaw }> @@ -107,6 +110,12 @@ export type SyncArgs = $Exact<{ // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that // unpredicted error will cause data loss (when failed data push isn't re-pushed) pushShouldConfirmOnlyAccepted?: boolean; + // conflict resolver on push side of sync which also requires returned records from backend. + // This is also useful for multi-step sync where one must control in which state sync is and if it + // must be repeated. + // Note that by default _status will be still synced so update if required + // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. + pushConflictResolver?: SyncConflictResolver; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean, // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. From d3b4eae03b4bc06fe8b761cae9e2c95c4c9c25af Mon Sep 17 00:00:00 2001 From: primus11 <1196764+primus11@users.noreply.github.com> Date: Wed, 17 Jan 2024 15:08:51 +0100 Subject: [PATCH 7/7] Added option to do push-only sync When one works offline it usually makes sense to do both pull and push but on the other hand when one tries to push asap it might not always be desired to do full sync. One might for example want to push asap on web but not on native. Even though WatermelonDB was initially designed with having single endpoint this isn't always the case either because of trying to use existing systems, having no control over systems or because there is need to have multiple endpoints. Previously pushing asap would do full sync and this combined with having multiple endpoints can trigger 10s of connections even when it would be mostly enough to do just one push connection. Because of so many connections even if data is minimal this can cause long execution times. This PR essentially separates push part from synchronize and exposes it to lib-users. --- CHANGELOG-Unreleased.md | 5 + src/sync/impl/__tests__/synchronize.test.js | 16 ++- src/sync/impl/synchronize.d.ts | 10 +- src/sync/impl/synchronize.js | 107 ++++++++++++++------ src/sync/index.d.ts | 38 ++++--- src/sync/index.js | 78 ++++++++++---- 6 files changed, 189 insertions(+), 65 deletions(-) diff --git a/CHANGELOG-Unreleased.md b/CHANGELOG-Unreleased.md index 7c1dc252c..eba13ed75 100644 --- a/CHANGELOG-Unreleased.md +++ b/CHANGELOG-Unreleased.md @@ -2,6 +2,11 @@ ### BREAKING CHANGES +#### 2024-01-17 + +- SyncPushArgs was renamed to SyncPushChangesArgs to free SyncPushArgs which is now used for push sync. +- lastPulletAt in pushChanges is no longer forced to be defined + ### Deprecations ### New features diff --git a/src/sync/impl/__tests__/synchronize.test.js b/src/sync/impl/__tests__/synchronize.test.js index 7fc7be909..04daa5c08 100644 --- a/src/sync/impl/__tests__/synchronize.test.js +++ b/src/sync/impl/__tests__/synchronize.test.js @@ -14,7 +14,7 @@ import { emptyPull, } from './helpers' -import { synchronize, hasUnsyncedChanges } from '../../index' +import { synchronize, optimisticSyncPush, hasUnsyncedChanges } from '../../index' import { fetchLocalChanges, getLastPulledAt } from '../index' const observeDatabase = (database) => { @@ -151,6 +151,20 @@ describe('synchronize', () => { expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) expect(log.localChangeCount).toBe(10) }) + it('can do push-only sync', async () => { + const { database } = makeDatabase() + + await makeLocalChanges(database) + const localChanges = await fetchLocalChanges(database) + + const pushChanges = jest.fn() + const log = {} + await optimisticSyncPush({ database, pushChanges, log }) + + expect(pushChanges).toHaveBeenCalledWith({ changes: localChanges.changes, lastPulledAt: null }) + expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) + expect(log.localChangeCount).toBe(10) + }) it('can pull changes', async () => { const { database, projects, tasks } = makeDatabase() diff --git a/src/sync/impl/synchronize.d.ts b/src/sync/impl/synchronize.d.ts index ba0e4ce3f..0dfbc8e05 100644 --- a/src/sync/impl/synchronize.d.ts +++ b/src/sync/impl/synchronize.d.ts @@ -1,4 +1,4 @@ -import type { SyncArgs } from '../index' +import type { SyncArgs, OptimisticSyncPushArgs } from '../index' export default function synchronize({ database, @@ -14,3 +14,11 @@ export default function synchronize({ _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise + +export function optimisticSyncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, +}: OptimisticSyncPushArgs): Promise diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index 78e1c261f..3bf6e0de1 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -12,7 +12,52 @@ import { getMigrationInfo, } from './index' import { ensureSameDatabase, isChangeSetEmpty, changeSetCount, findRejectedIds } from './helpers' -import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index' +import type { + SyncArgs, + OptimisticSyncPushArgs, + SyncPushArgs, + Timestamp, + SyncPullStrategy, +} from '../index' + +async function syncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, + resetCount, + lastPulledAt, +}: SyncPushArgs): Promise { + if (pushChanges) { + log && (log.phase = 'ready to fetch local changes') + + const localChanges = await fetchLocalChanges(database) + log && (log.localChangeCount = changeSetCount(localChanges.changes)) + log && (log.phase = 'fetched local changes') + + ensureSameDatabase(database, resetCount) + if (!isChangeSetEmpty(localChanges.changes)) { + log && (log.phase = 'ready to push') + const pushResult = + (await pushChanges({ changes: localChanges.changes, lastPulledAt })) || {} + log && (log.phase = 'pushed') + log && (log.rejectedIds = pushResult.experimentalRejectedIds) + if (log && pushShouldConfirmOnlyAccepted) { + log.rejectedIds = findRejectedIds(pushResult.experimentalRejectedIds, + pushResult.experimentalAcceptedIds, localChanges.changes) + } + + ensureSameDatabase(database, resetCount) + await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false, + pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds, pushConflictResolver, + pushResult.pushResultSet) + log && (log.phase = 'marked local changes as synced') + } + } else { + log && (log.phase = 'pushChanges not defined') + } +} export default async function synchronize({ database, @@ -122,35 +167,39 @@ export default async function synchronize({ }, 'sync-synchronize-apply') // push phase - if (pushChanges) { - log && (log.phase = 'ready to fetch local changes') - - const localChanges = await fetchLocalChanges(database) - log && (log.localChangeCount = changeSetCount(localChanges.changes)) - log && (log.phase = 'fetched local changes') - - ensureSameDatabase(database, resetCount) - if (!isChangeSetEmpty(localChanges.changes)) { - log && (log.phase = 'ready to push') - const pushResult = - (await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {} - log && (log.phase = 'pushed') - log && (log.rejectedIds = pushResult.experimentalRejectedIds) - if (log && pushShouldConfirmOnlyAccepted) { - log.rejectedIds = findRejectedIds(pushResult.experimentalRejectedIds, - pushResult.experimentalAcceptedIds, localChanges.changes) - } - - ensureSameDatabase(database, resetCount) - await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false, - pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds, pushConflictResolver, - pushResult.pushResultSet) - log && (log.phase = 'marked local changes as synced') - } - } else { - log && (log.phase = 'pushChanges not defined') - } + await syncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, + resetCount, + lastPulledAt: newLastPulledAt, + }) log && (log.finishedAt = new Date()) log && (log.phase = 'done') } + +export async function optimisticSyncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, +}: OptimisticSyncPushArgs): Promise { + const resetCount = database._resetCount + + const lastPulledAt = await getLastPulledAt(database) + log && (log.lastPulledAt = lastPulledAt) + + await syncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, + resetCount, + lastPulledAt, + }) +} \ No newline at end of file diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index c4df63562..b053cc5d4 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -31,7 +31,7 @@ export type SyncIds = { [tableName: TableName]: RecordId[] } export type SyncRejectedIds = SyncIds -export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> +export type SyncPushChangesArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> export type SyncPushResultSet = { [tableName: TableName]: DirtyRaw[] } @@ -64,20 +64,10 @@ export type SyncConflictResolver = ( resolved: DirtyRaw, ) => DirtyRaw -export type SyncArgs = $Exact<{ +export type OptimisticSyncPushArgs = $Exact<{ database: Database; - pullChanges: (_: SyncPullArgs) => Promise; - pushChanges?: (_: SyncPushArgs) => Promise; - // version at which support for migration syncs was added - the version BEFORE first syncable migration - migrationsEnabledAtVersion?: SchemaVersion; - sendCreatedAsUpdated?: boolean; + pushChanges?: (_: SyncPushChangesArgs) => Promise; log?: SyncLog; - // Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple - // columns and want to have them updated consistently, or to implement partial sync - // It's called for every record being updated locally, so be sure that this function is FAST. - // If you don't want to change default behavior for a given record, return `resolved` as is - // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. - conflictResolver?: SyncConflictResolver; // experimental customization that will cause to only set records as synced if we return id. // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that @@ -89,6 +79,26 @@ export type SyncArgs = $Exact<{ // Note that by default _status will be still synced so update if required // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. pushConflictResolver?: SyncConflictResolver; +}> + +export type SyncPushArgs = $Exact<{OptimisticSyncPushArgs}> & $Exact<{ + resetCount: number; + lastPulledAt: Timestamp; +}> + +export type SyncArgs = $Exact<{OptimisticSyncPushArgs}> & $Exact<{ + database: Database; + pullChanges: (_: SyncPullArgs) => Promise; + // version at which support for migration syncs was added - the version BEFORE first syncable migration + migrationsEnabledAtVersion?: SchemaVersion; + sendCreatedAsUpdated?: boolean; + log?: SyncLog; + // Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple + // columns and want to have them updated consistently, or to implement partial sync + // It's called for every record being updated locally, so be sure that this function is FAST. + // If you don't want to change default behavior for a given record, return `resolved` as is + // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. + conflictResolver?: SyncConflictResolver; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean; // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. @@ -109,4 +119,6 @@ export type SyncArgs = $Exact<{ export function synchronize(args: SyncArgs): Promise +export function optimisticSyncPush(args: OptimisticSyncPushArgs): Promise + export function hasUnsyncedChanges({ database }: $Exact<{ database: Database }>): Promise diff --git a/src/sync/index.js b/src/sync/index.js index 732c624e6..379a93944 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -57,7 +57,7 @@ export type SyncIds = { [TableName]: RecordId[] } export type SyncRejectedIds = SyncIds -export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }> +export type SyncPushChangesArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt?: ?Timestamp }> export type SyncPushResultSet = { [tableName: TableName]: DirtyRaw[] } @@ -83,6 +83,12 @@ export type SyncLog = { error?: Error, } +export type SyncShouldUpdateRecord = ( + table: TableName, + local: DirtyRaw, + remote: DirtyRaw, +) => boolean + export type SyncConflictResolver = ( table: TableName, local: DirtyRaw, @@ -90,21 +96,10 @@ export type SyncConflictResolver = ( resolved: DirtyRaw, ) => DirtyRaw -// TODO: JSDoc'ify this -export type SyncArgs = $Exact<{ - database: Database, - pullChanges: (SyncPullArgs) => Promise, - pushChanges?: (SyncPushArgs) => Promise, - // version at which support for migration syncs was added - the version BEFORE first syncable migration - migrationsEnabledAtVersion?: SchemaVersion, - sendCreatedAsUpdated?: boolean, - log?: SyncLog, - // Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple - // columns and want to have them updated consistently, or to implement partial sync - // It's called for every record being updated locally, so be sure that this function is FAST. - // If you don't want to change default behavior for a given record, return `resolved` as is - // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. - conflictResolver?: SyncConflictResolver, +export type OptimisticSyncPushArgs = $Exact<{ + database: Database; + pushChanges?: (_: SyncPushChangesArgs) => Promise; + log?: SyncLog; // experimental customization that will cause to only set records as synced if we return id. // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that @@ -116,22 +111,48 @@ export type SyncArgs = $Exact<{ // Note that by default _status will be still synced so update if required // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. pushConflictResolver?: SyncConflictResolver; +}> + +export type SyncPushArgs = $Exact<{ + ...OptimisticSyncPushArgs; + resetCount: number; + lastPulledAt?: ?Timestamp; +}> + +// TODO: JSDoc'ify this +export type SyncArgs = $Exact<{ + ...OptimisticSyncPushArgs; + database: Database; + pullChanges: (_: SyncPullArgs) => Promise; + // version at which support for migration syncs was added - the version BEFORE first syncable migration + migrationsEnabledAtVersion?: SchemaVersion; + sendCreatedAsUpdated?: boolean; + log?: SyncLog; + // Advanced (unsafe) customization point. Useful when doing per record conflict resolution and can + // determine directly from remote and local if we can keep local. + shouldUpdateRecord?: SyncShouldUpdateRecord; + // Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple + // columns and want to have them updated consistently, or to implement partial sync + // It's called for every record being updated locally, so be sure that this function is FAST. + // If you don't want to change default behavior for a given record, return `resolved` as is + // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. + conflictResolver?: SyncConflictResolver; // commits changes in multiple batches, and not one - temporary workaround for memory issue - _unsafeBatchPerCollection?: boolean, + _unsafeBatchPerCollection?: boolean; // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. // This can only be used on initial (login) sync, not for incremental syncs. // This can only be used with SQLiteAdapter with JSI enabled. // The exact API may change between versions of WatermelonDB. // See documentation for more details. - unsafeTurbo?: boolean, - // Called after changes are pulled with whatever was returned by pullChanges, minus `changes`. Useful + unsafeTurbo?: boolean; + // Called after pullChanges with whatever was returned by pullChanges, minus `changes`. Useful // when using turbo mode - onDidPullChanges?: (Object) => Promise, + onDidPullChanges?: (_: Object) => Promise; // Called after pullChanges is done, but before these changes are applied. Some stats about the pulled // changes are passed as arguments. An advanced user can use this for example to show some UI to the user // when processing a very large sync (could be useful for replacement syncs). Note that remote change count // is NaN in turbo mode. - onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise, + onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise; }> /** @@ -149,6 +170,21 @@ export async function synchronize(args: SyncArgs): Promise { } } +/** + * Does database push-only synchronize with a remote server + * + * See docs for more details + */ +export async function optimisticSyncPush(args: OptimisticSyncPushArgs): Promise { + try { + const optimisticSyncPushImpl = require('./impl/synchronize').optimisticSyncPush + await optimisticSyncPushImpl(args) + } catch (error) { + args.log && (args.log.error = error) + throw error + } +} + /** * Returns `true` if database has any unsynced changes. *