Skip to content

Commit 3fa499d

Browse files
authored
Merge pull request #286 from powersync-ja/fix-reconnect-after-checksum-failure
Fix reconnecting after checksum failure
2 parents 5b9b08c + ba335b8 commit 3fa499d

File tree

3 files changed

+72
-12
lines changed

3 files changed

+72
-12
lines changed

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync {
3535
final InternalConnector connector;
3636
final ResolvedSyncOptions options;
3737

38-
final Logger logger = isolateLogger;
38+
final Logger logger;
3939

4040
final Stream<void> crudUpdateTriggerStream;
4141

@@ -68,14 +68,16 @@ class StreamingSyncImplementation implements StreamingSync {
6868
required http.Client client,
6969
Mutex? syncMutex,
7070
Mutex? crudMutex,
71+
Logger? logger,
7172

7273
/// A unique identifier for this streaming sync implementation
7374
/// A good value is typically the DB file path which it will mutate when syncing.
7475
String? identifier = "unknown",
7576
}) : _client = client,
7677
syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"),
7778
crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"),
78-
_userAgentHeaders = userAgentHeaders();
79+
_userAgentHeaders = userAgentHeaders(),
80+
logger = logger ?? isolateLogger;
7981

8082
Duration get _retryDelay => options.retryDelay;
8183

@@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync {
122124
@override
123125
Future<void> streamingSync() async {
124126
try {
127+
assert(_abort == null);
125128
_abort = AbortController();
126129
clientId = await adapter.getClientId();
127130
_crudLoop();
@@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync {
310313
var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream);
311314

312315
Future<void>? credentialsInvalidation;
313-
bool haveInvalidated = false;
316+
bool shouldStopIteration = false;
314317

315318
// Trigger a CRUD upload on reconnect
316319
_internalCrudTriggerController.add(null);
@@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync {
336339
case StreamingSyncCheckpointComplete():
337340
final result = await _applyCheckpoint(targetCheckpoint!, _abort);
338341
if (result.abort) {
342+
shouldStopIteration = true;
339343
return;
340344
}
341345
case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority):
@@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync {
345349
// This means checksums failed. Start again with a new checkpoint.
346350
// TODO: better back-off
347351
// await new Promise((resolve) => setTimeout(resolve, 50));
352+
shouldStopIteration = true;
348353
return;
349354
} else if (!result.ready) {
350355
// If we have pending uploads, we can't complete new checkpoints
@@ -398,13 +403,14 @@ class StreamingSyncImplementation implements StreamingSync {
398403
if (tokenExpiresIn == 0) {
399404
// Token expired already - stop the connection immediately
400405
connector.prefetchCredentials(invalidate: true).ignore();
406+
shouldStopIteration = true;
401407
break;
402408
} else if (tokenExpiresIn <= 30) {
403409
// Token expires soon - refresh it in the background
404410
credentialsInvalidation ??=
405411
connector.prefetchCredentials().then((_) {
406412
// Token has been refreshed - we should restart the connection.
407-
haveInvalidated = true;
413+
shouldStopIteration = true;
408414
// trigger next loop iteration ASAP, don't wait for another
409415
// message from the server.
410416
if (!aborted) {
@@ -421,7 +427,7 @@ class StreamingSyncImplementation implements StreamingSync {
421427
}
422428

423429
await for (var line in merged) {
424-
if (aborted || haveInvalidated) {
430+
if (aborted || shouldStopIteration) {
425431
break;
426432
}
427433

@@ -434,10 +440,10 @@ class StreamingSyncImplementation implements StreamingSync {
434440
break;
435441
case TokenRefreshComplete():
436442
// We have a new token, so stop the iteration.
437-
haveInvalidated = true;
443+
shouldStopIteration = true;
438444
}
439445

440-
if (haveInvalidated) {
446+
if (shouldStopIteration) {
441447
// Stop this connection, so that a new one will be started
442448
break;
443449
}

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import 'package:async/async.dart';
44
import 'package:logging/logging.dart';
55
import 'package:powersync_core/powersync_core.dart';
66
import 'package:powersync_core/sqlite3_common.dart';
7-
import 'package:powersync_core/src/log_internal.dart';
87
import 'package:powersync_core/src/sync/streaming_sync.dart';
98
import 'package:powersync_core/src/sync/protocol.dart';
109
import 'package:test/test.dart';
@@ -25,6 +24,7 @@ void main() {
2524
late CommonDatabase raw;
2625
late PowerSyncDatabase database;
2726
late MockSyncService syncService;
27+
late Logger logger;
2828

2929
late StreamingSync syncClient;
3030
var credentialsCallbackCount = 0;
@@ -34,7 +34,7 @@ void main() {
3434
final (client, server) = inMemoryServer();
3535
server.mount(syncService.router.call);
3636

37-
syncClient = database.connectWithMockService(
37+
final thisSyncClient = syncClient = database.connectWithMockService(
3838
client,
3939
TestConnector(() async {
4040
credentialsCallbackCount++;
@@ -44,10 +44,17 @@ void main() {
4444
expiresAt: DateTime.now(),
4545
);
4646
}, uploadData: (db) => uploadData(db)),
47+
options: const SyncOptions(retryDelay: Duration(milliseconds: 200)),
48+
logger: logger,
4749
);
50+
51+
addTearDown(() async {
52+
await thisSyncClient.abort();
53+
});
4854
}
4955

5056
setUp(() async {
57+
logger = Logger.detached('powersync.active')..level = Level.ALL;
5158
credentialsCallbackCount = 0;
5259
syncService = MockSyncService();
5360

@@ -58,17 +65,16 @@ void main() {
5865
});
5966

6067
tearDown(() async {
61-
await syncClient.abort();
6268
await database.close();
6369
await syncService.stop();
6470
});
6571

6672
Future<StreamQueue<SyncStatus>> waitForConnection(
6773
{bool expectNoWarnings = true}) async {
6874
if (expectNoWarnings) {
69-
isolateLogger.onRecord.listen((e) {
75+
logger.onRecord.listen((e) {
7076
if (e.level >= Level.WARNING) {
71-
fail('Unexpected log: $e');
77+
fail('Unexpected log: $e, ${e.stackTrace}');
7278
}
7379
});
7480
}
@@ -700,6 +706,52 @@ void main() {
700706

701707
expect(syncService.controller.hasListener, isFalse);
702708
});
709+
710+
test('closes connection after failed checksum', () async {
711+
final status = await waitForConnection(expectNoWarnings: false);
712+
syncService.addLine({
713+
'checkpoint': Checkpoint(
714+
lastOpId: '4',
715+
writeCheckpoint: null,
716+
checksums: [checksum(bucket: 'a', checksum: 10)],
717+
)
718+
});
719+
720+
await expectLater(status, emits(isSyncStatus(downloading: true)));
721+
syncService.addLine({
722+
'checkpoint_complete': {'last_op_id': '10'}
723+
});
724+
725+
await pumpEventQueue();
726+
expect(syncService.controller.hasListener, isFalse);
727+
syncService.endCurrentListener();
728+
729+
// Should reconnect after delay.
730+
await Future<void>.delayed(const Duration(milliseconds: 500));
731+
expect(syncService.controller.hasListener, isTrue);
732+
});
733+
734+
test('closes connection after token expires', () async {
735+
final status = await waitForConnection(expectNoWarnings: false);
736+
syncService.addLine({
737+
'checkpoint': Checkpoint(
738+
lastOpId: '4',
739+
writeCheckpoint: null,
740+
checksums: [checksum(bucket: 'a', checksum: 10)],
741+
)
742+
});
743+
744+
await expectLater(status, emits(isSyncStatus(downloading: true)));
745+
syncService.addKeepAlive(0);
746+
747+
await pumpEventQueue();
748+
expect(syncService.controller.hasListener, isFalse);
749+
syncService.endCurrentListener();
750+
751+
// Should reconnect after delay.
752+
await Future<void>.delayed(const Duration(milliseconds: 500));
753+
expect(syncService.controller.hasListener, isTrue);
754+
});
703755
});
704756
}
705757

packages/powersync_core/test/utils/abstract_test_utils.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,15 @@ extension MockSync on PowerSyncDatabase {
149149
StreamingSyncImplementation connectWithMockService(
150150
Client client,
151151
PowerSyncBackendConnector connector, {
152+
Logger? logger,
152153
SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)),
153154
}) {
154155
final impl = StreamingSyncImplementation(
155156
adapter: BucketStorage(this),
156157
client: client,
157158
options: ResolvedSyncOptions(options),
158159
connector: InternalConnector.wrap(connector, this),
160+
logger: logger,
159161
crudUpdateTriggerStream: database
160162
.onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)),
161163
);

0 commit comments

Comments
 (0)