From 271e23d1d1690a58680d5159ba21864b006ae687 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 30 May 2025 17:53:42 +0200 Subject: [PATCH] Also apply cap to be sure --- .../lib/src/sync/sync_status.dart | 30 +++++++++++++++++-- .../test/in_memory_sync_test.dart | 29 ++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index fceece50..1a74b9f7 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -1,3 +1,5 @@ +import 'dart:math'; + import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; @@ -241,15 +243,36 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { factory InternalSyncDownloadProgress.forNewCheckpoint( Map localProgress, Checkpoint target) { final buckets = {}; + for (final bucket in target.checksums) { final savedProgress = localProgress[bucket.bucket]; + final atLast = savedProgress?.atLast ?? 0; + final sinceLast = savedProgress?.sinceLast ?? 0; buckets[bucket.bucket] = ( priority: BucketPriority._(bucket.priority), - atLast: savedProgress?.atLast ?? 0, - sinceLast: savedProgress?.sinceLast ?? 0, + atLast: atLast, + sinceLast: sinceLast, targetCount: bucket.count ?? 0, ); + + if (bucket.count case final knownCount?) { + if (knownCount < atLast + sinceLast) { + // Either due to a defrag / sync rule deploy or a compaction + // operation, the size of the bucket shrank so much that the local ops + // exceed the ops in the updated bucket. We can't possibly report + // progress in this case (it would overshoot 100%). + return InternalSyncDownloadProgress({ + for (final bucket in target.checksums) + bucket.bucket: ( + priority: BucketPriority(bucket.priority), + atLast: 0, + sinceLast: 0, + targetCount: knownCount, + ) + }); + } + } } return InternalSyncDownloadProgress(buckets); @@ -282,7 +305,8 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { newBucketStates[dataForBucket.bucket] = ( priority: previous.priority, atLast: previous.atLast, - sinceLast: previous.sinceLast + dataForBucket.data.length, + sinceLast: min(previous.sinceLast + dataForBucket.data.length, + previous.targetCount - previous.atLast), targetCount: previous.targetCount, ); } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 3ab8ae76..89af5727 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -595,6 +595,35 @@ void main() { emits(isSyncStatus(downloading: false, downloadProgress: isNull))); }); + test('interrupt and defrag', () async { + var status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + addDataLine('a', 5); + await expectProgress(status, total: progress(5, 10)); + + // A sync rule deploy could reset buckets, making the new bucket smaller + // than the existing one. + await syncClient.abort(); + syncService.endCurrentListener(); + createSyncClient(); + status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '14', + checksums: [bucket('a', 4)], + ) + }); + + // In this special case, don't report 5/4 as progress + await expectProgress(status, total: progress(0, 4)); + }); + test('different priorities', () async { var status = await waitForConnection(); Future checkProgress(Object prio0, Object prio2) async {