Skip to content

Commit 271e23d

Browse files
committed
Also apply cap to be sure
1 parent 6e4ac91 commit 271e23d

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

packages/powersync_core/lib/src/sync/sync_status.dart

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:math';
2+
13
import 'package:collection/collection.dart';
24
import 'package:meta/meta.dart';
35

@@ -241,15 +243,36 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations {
241243
factory InternalSyncDownloadProgress.forNewCheckpoint(
242244
Map<String, LocalOperationCounters> localProgress, Checkpoint target) {
243245
final buckets = <String, BucketProgress>{};
246+
244247
for (final bucket in target.checksums) {
245248
final savedProgress = localProgress[bucket.bucket];
249+
final atLast = savedProgress?.atLast ?? 0;
250+
final sinceLast = savedProgress?.sinceLast ?? 0;
246251

247252
buckets[bucket.bucket] = (
248253
priority: BucketPriority._(bucket.priority),
249-
atLast: savedProgress?.atLast ?? 0,
250-
sinceLast: savedProgress?.sinceLast ?? 0,
254+
atLast: atLast,
255+
sinceLast: sinceLast,
251256
targetCount: bucket.count ?? 0,
252257
);
258+
259+
if (bucket.count case final knownCount?) {
260+
if (knownCount < atLast + sinceLast) {
261+
// Either due to a defrag / sync rule deploy or a compaction
262+
// operation, the size of the bucket shrank so much that the local ops
263+
// exceed the ops in the updated bucket. We can't possibly report
264+
// progress in this case (it would overshoot 100%).
265+
return InternalSyncDownloadProgress({
266+
for (final bucket in target.checksums)
267+
bucket.bucket: (
268+
priority: BucketPriority(bucket.priority),
269+
atLast: 0,
270+
sinceLast: 0,
271+
targetCount: knownCount,
272+
)
273+
});
274+
}
275+
}
253276
}
254277

255278
return InternalSyncDownloadProgress(buckets);
@@ -282,7 +305,8 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations {
282305
newBucketStates[dataForBucket.bucket] = (
283306
priority: previous.priority,
284307
atLast: previous.atLast,
285-
sinceLast: previous.sinceLast + dataForBucket.data.length,
308+
sinceLast: min(previous.sinceLast + dataForBucket.data.length,
309+
previous.targetCount - previous.atLast),
286310
targetCount: previous.targetCount,
287311
);
288312
}

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,35 @@ void main() {
595595
emits(isSyncStatus(downloading: false, downloadProgress: isNull)));
596596
});
597597

598+
test('interrupt and defrag', () async {
599+
var status = await waitForConnection();
600+
syncService.addLine({
601+
'checkpoint': Checkpoint(
602+
lastOpId: '10',
603+
checksums: [bucket('a', 10)],
604+
)
605+
});
606+
await expectProgress(status, total: progress(0, 10));
607+
addDataLine('a', 5);
608+
await expectProgress(status, total: progress(5, 10));
609+
610+
// A sync rule deploy could reset buckets, making the new bucket smaller
611+
// than the existing one.
612+
await syncClient.abort();
613+
syncService.endCurrentListener();
614+
createSyncClient();
615+
status = await waitForConnection();
616+
syncService.addLine({
617+
'checkpoint': Checkpoint(
618+
lastOpId: '14',
619+
checksums: [bucket('a', 4)],
620+
)
621+
});
622+
623+
// In this special case, don't report 5/4 as progress
624+
await expectProgress(status, total: progress(0, 4));
625+
});
626+
598627
test('different priorities', () async {
599628
var status = await waitForConnection();
600629
Future<void> checkProgress(Object prio0, Object prio2) async {

0 commit comments

Comments
 (0)