diff --git a/.changeset/poor-donuts-call.md b/.changeset/poor-donuts-call.md new file mode 100644 index 000000000..73f433748 --- /dev/null +++ b/.changeset/poor-donuts-call.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fix reported progress around compactions / defrags on the sync service. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 46f04bd73..ca0370f21 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -666,7 +666,10 @@ The next upload iteration will be delayed.`); if (progressForBucket) { updatedProgress[data.bucket] = { ...progressForBucket, - sinceLast: progressForBucket.sinceLast + data.data.length + sinceLast: Math.min( + progressForBucket.sinceLast + data.data.length, + progressForBucket.targetCount - progressForBucket.atLast + ) }; } } @@ -730,17 +733,36 @@ The next upload iteration will be delayed.`); private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) { const localProgress = await this.options.adapter.getBucketOperationProgress(); const progress: InternalProgressInformation = {}; + let invalidated = false; for (const bucket of checkpoint.buckets) { const savedProgress = localProgress[bucket.bucket]; + const atLast = savedProgress?.atLast ?? 0; + const sinceLast = savedProgress?.sinceLast ?? 0; + progress[bucket.bucket] = { // The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service // will use by default. priority: bucket.priority ?? 3, - atLast: savedProgress?.atLast ?? 0, - sinceLast: savedProgress?.sinceLast ?? 0, + atLast: atLast, + sinceLast: sinceLast, targetCount: bucket.count ?? 0 }; + + if (bucket.count != null && bucket.count < atLast + sinceLast) { + // Either due to a defrag / sync rule deploy or a compaction operation, the size + // of the bucket shrank so much that the local ops exceed the ops in the updated + // bucket. We can't prossibly report progress in this case (it would overshoot 100%). + invalidated = true; + } + } + + if (invalidated) { + for (const bucket in progress) { + const bucketProgress = progress[bucket]; + bucketProgress.atLast = 0; + bucketProgress.sinceLast = 0; + } } this.updateSyncStatus({ diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index cb4d5a603..a1acc8e58 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -169,58 +169,108 @@ describe('Sync', () => { }); mockSyncServiceTest('different priorities', async ({ syncService }) => { - let database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); - - syncService.pushLine({ - checkpoint: { - last_op_id: '10', - buckets: [ - bucket('a', 5, {priority: 0}), - bucket('b', 5, {priority: 2}), - ] - } - }); - - // Should be at 0/10 for total progress (which is the same as the progress for prio 2), and a 0/5 towards prio 0. - await waitForProgress(database, [0, 10], [[0, [0, 5]], [2, [0, 10]]]); - - pushDataLine(syncService, 'a', 5); - await waitForProgress(database, [5, 10], [[0, [5, 5]], [2, [5, 10]]]); - - pushCheckpointComplete(syncService, 0); - await waitForProgress(database, [5, 10], [[0, [5, 5]], [2, [5, 10]]]); + let database = await syncService.createDatabase(); + database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); - pushDataLine(syncService, 'b', 2); - await waitForProgress(database, [7, 10], [[0, [5, 5]], [2, [7, 10]]]); + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [bucket('a', 5, { priority: 0 }), bucket('b', 5, { priority: 2 })] + } + }); - // Before syncing b fully, send a new checkpoint - syncService.pushLine({ - checkpoint: { - last_op_id: '14', - buckets: [ - bucket('a', 8, {priority: 0}), - bucket('b', 6, {priority: 2}), - ] - } - }); - await waitForProgress(database, [7, 14], [[0, [5, 8]], [2, [7, 14]]]); + // Should be at 0/10 for total progress (which is the same as the progress for prio 2), and a 0/5 towards prio 0. + await waitForProgress( + database, + [0, 10], + [ + [0, [0, 5]], + [2, [0, 10]] + ] + ); - pushDataLine(syncService, 'a', 3); - await waitForProgress(database, [10, 14], [[0, [8, 8]], [2, [10, 14]]]); + pushDataLine(syncService, 'a', 5); + await waitForProgress( + database, + [5, 10], + [ + [0, [5, 5]], + [2, [5, 10]] + ] + ); - pushCheckpointComplete(syncService, 0); - await waitForProgress(database, [10, 14], [[0, [8, 8]], [2, [10, 14]]]); + pushCheckpointComplete(syncService, 0); + await waitForProgress( + database, + [5, 10], + [ + [0, [5, 5]], + [2, [5, 10]] + ] + ); + + pushDataLine(syncService, 'b', 2); + await waitForProgress( + database, + [7, 10], + [ + [0, [5, 5]], + [2, [7, 10]] + ] + ); + + // Before syncing b fully, send a new checkpoint + syncService.pushLine({ + checkpoint: { + last_op_id: '14', + buckets: [bucket('a', 8, { priority: 0 }), bucket('b', 6, { priority: 2 })] + } + }); + await waitForProgress( + database, + [7, 14], + [ + [0, [5, 8]], + [2, [7, 14]] + ] + ); + + pushDataLine(syncService, 'a', 3); + await waitForProgress( + database, + [10, 14], + [ + [0, [8, 8]], + [2, [10, 14]] + ] + ); - pushDataLine(syncService, 'b', 4); - await waitForProgress(database, [14, 14], [[0, [8, 8]], [2, [14, 14]]]); + pushCheckpointComplete(syncService, 0); + await waitForProgress( + database, + [10, 14], + [ + [0, [8, 8]], + [2, [10, 14]] + ] + ); + + pushDataLine(syncService, 'b', 4); + await waitForProgress( + database, + [14, 14], + [ + [0, [8, 8]], + [2, [14, 14]] + ] + ); - pushCheckpointComplete(syncService); - await waitForSyncStatus(database, (s) => s.downloadProgress == null); + pushCheckpointComplete(syncService); + await waitForSyncStatus(database, (s) => s.downloadProgress == null); }); - mockSyncServiceTest('uses correct state when reconnecting', async ({syncService}) => { + mockSyncServiceTest('uses correct state when reconnecting', async ({ syncService }) => { let database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); @@ -228,10 +278,7 @@ describe('Sync', () => { syncService.pushLine({ checkpoint: { last_op_id: '10', - buckets: [ - bucket('a', 5, {priority: 0}), - bucket('b', 5, {priority: 3}), - ] + buckets: [bucket('a', 5, { priority: 0 }), bucket('b', 5, { priority: 3 })] } }); @@ -239,7 +286,7 @@ describe('Sync', () => { pushDataLine(syncService, 'a', 5); pushDataLine(syncService, 'b', 1); pushCheckpointComplete(syncService, 0); - await database.waitForFirstSync({priority: 0}); + await database.waitForFirstSync({ priority: 0 }); await database.close(); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); @@ -248,19 +295,56 @@ describe('Sync', () => { await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); expect(syncService.connectedListeners[0].buckets).toStrictEqual([ - {"name": "a", "after": "10"}, - {"name": "b", "after": "6"}, + { name: 'a', after: '10' }, + { name: 'b', after: '6' } ]); }); + + mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => { + let database = await syncService.createDatabase(); + database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [bucket('a', 10)] + } + }); + + await waitForProgress(database, [0, 10]); + pushDataLine(syncService, 'a', 5); + await waitForProgress(database, [5, 10]); + + // Re-open database + await database.close(); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); + database = await syncService.createDatabase(); + database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + // A sync rule deploy could reset buckets, making the new bucket smaller than the existing one. + syncService.pushLine({ + checkpoint: { + last_op_id: '14', + buckets: [bucket('a', 4)] + } + }); + + // In this special case, don't report 5/4 as progress. + await waitForProgress(database, [0, 4]); + pushCheckpointComplete(syncService); + await waitForSyncStatus(database, (s) => s.downloadProgress == null); + }); }); }); -function bucket(name: string, count: number, options: {priority: number} = {priority: 3}): BucketChecksum { +function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { return { bucket: name, count, checksum: 0, - priority: options.priority, + priority: options.priority }; }