From c193870839eeb5b07b03ff53e4df765f3153847e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 30 May 2025 17:05:14 +0200 Subject: [PATCH 1/4] Fix reconnecting after checksum failure --- .../lib/src/sync/streaming_sync.dart | 19 +++++---- .../test/in_memory_sync_test.dart | 42 ++++++++++++++++--- .../test/utils/abstract_test_utils.dart | 2 + 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 24284751..7e9ff2e4 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync { final InternalConnector connector; final ResolvedSyncOptions options; - final Logger logger = isolateLogger; + final Logger logger; final Stream crudUpdateTriggerStream; @@ -68,6 +68,7 @@ class StreamingSyncImplementation implements StreamingSync { required http.Client client, Mutex? syncMutex, Mutex? crudMutex, + Logger? logger, /// A unique identifier for this streaming sync implementation /// A good value is typically the DB file path which it will mutate when syncing. @@ -75,7 +76,8 @@ class StreamingSyncImplementation implements StreamingSync { }) : _client = client, syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"), crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"), - _userAgentHeaders = userAgentHeaders(); + _userAgentHeaders = userAgentHeaders(), + logger = logger ?? isolateLogger; Duration get _retryDelay => options.retryDelay; @@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync { @override Future streamingSync() async { try { + assert(_abort == null); _abort = AbortController(); clientId = await adapter.getClientId(); _crudLoop(); @@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync { var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream); Future? credentialsInvalidation; - bool haveInvalidated = false; + bool shouldStopIteration = false; // Trigger a CRUD upload on reconnect _internalCrudTriggerController.add(null); @@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync { case StreamingSyncCheckpointComplete(): final result = await _applyCheckpoint(targetCheckpoint!, _abort); if (result.abort) { + shouldStopIteration = true; return; } case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority): @@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync { // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off // await new Promise((resolve) => setTimeout(resolve, 50)); + shouldStopIteration = true; return; } else if (!result.ready) { // If we have pending uploads, we can't complete new checkpoints @@ -404,7 +409,7 @@ class StreamingSyncImplementation implements StreamingSync { credentialsInvalidation ??= connector.prefetchCredentials().then((_) { // Token has been refreshed - we should restart the connection. - haveInvalidated = true; + shouldStopIteration = true; // trigger next loop iteration ASAP, don't wait for another // message from the server. if (!aborted) { @@ -421,7 +426,7 @@ class StreamingSyncImplementation implements StreamingSync { } await for (var line in merged) { - if (aborted || haveInvalidated) { + if (aborted || shouldStopIteration) { break; } @@ -434,10 +439,10 @@ class StreamingSyncImplementation implements StreamingSync { break; case TokenRefreshComplete(): // We have a new token, so stop the iteration. - haveInvalidated = true; + shouldStopIteration = true; } - if (haveInvalidated) { + if (shouldStopIteration) { // Stop this connection, so that a new one will be started break; } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 89af5727..6f72ae8f 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -4,7 +4,6 @@ import 'package:async/async.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; -import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; @@ -25,6 +24,7 @@ void main() { late CommonDatabase raw; late PowerSyncDatabase database; late MockSyncService syncService; + late Logger logger; late StreamingSync syncClient; var credentialsCallbackCount = 0; @@ -34,7 +34,7 @@ void main() { final (client, server) = inMemoryServer(); server.mount(syncService.router.call); - syncClient = database.connectWithMockService( + final thisSyncClient = syncClient = database.connectWithMockService( client, TestConnector(() async { credentialsCallbackCount++; @@ -44,10 +44,17 @@ void main() { expiresAt: DateTime.now(), ); }, uploadData: (db) => uploadData(db)), + options: const SyncOptions(retryDelay: Duration(milliseconds: 200)), + logger: logger, ); + + addTearDown(() async { + await thisSyncClient.abort(); + }); } setUp(() async { + logger = Logger.detached('powersync.active')..level = Level.ALL; credentialsCallbackCount = 0; syncService = MockSyncService(); @@ -58,7 +65,6 @@ void main() { }); tearDown(() async { - await syncClient.abort(); await database.close(); await syncService.stop(); }); @@ -66,9 +72,9 @@ void main() { Future> waitForConnection( {bool expectNoWarnings = true}) async { if (expectNoWarnings) { - isolateLogger.onRecord.listen((e) { + logger.onRecord.listen((e) { if (e.level >= Level.WARNING) { - fail('Unexpected log: $e'); + fail('Unexpected log: $e, ${e.stackTrace}'); } }); } @@ -684,7 +690,7 @@ void main() { }); }); - test('stopping closes connections', () async { + test('stopping tions', () async { final status = await waitForConnection(); syncService.addLine({ @@ -700,6 +706,30 @@ void main() { expect(syncService.controller.hasListener, isFalse); }); + + test('closes connection after failed checksum', () async { + final status = await waitForConnection(expectNoWarnings: false); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '4', + writeCheckpoint: null, + checksums: [checksum(bucket: 'a', checksum: 10)], + ) + }); + + await expectLater(status, emits(isSyncStatus(downloading: true))); + syncService.addLine({ + 'checkpoint_complete': {'last_op_id': '10'} + }); + + await pumpEventQueue(); + expect(syncService.controller.hasListener, isFalse); + syncService.endCurrentListener(); + + // Should reconnect after delay. + await Future.delayed(const Duration(milliseconds: 500)); + expect(syncService.controller.hasListener, isTrue); + }); }); } diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 3ea4a319..95f71211 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -149,6 +149,7 @@ extension MockSync on PowerSyncDatabase { StreamingSyncImplementation connectWithMockService( Client client, PowerSyncBackendConnector connector, { + Logger? logger, SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)), }) { final impl = StreamingSyncImplementation( @@ -156,6 +157,7 @@ extension MockSync on PowerSyncDatabase { client: client, options: ResolvedSyncOptions(options), connector: InternalConnector.wrap(connector, this), + logger: logger, crudUpdateTriggerStream: database .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), ); From 2cb71429f309299889b4922cdfc10297d777dcea Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 30 May 2025 17:08:10 +0200 Subject: [PATCH 2/4] Also ensure stop when token expired --- packages/powersync_core/lib/src/sync/streaming_sync.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 7e9ff2e4..8b4c0cb0 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -403,6 +403,7 @@ class StreamingSyncImplementation implements StreamingSync { if (tokenExpiresIn == 0) { // Token expired already - stop the connection immediately connector.prefetchCredentials(invalidate: true).ignore(); + shouldStopIteration = true; break; } else if (tokenExpiresIn <= 30) { // Token expires soon - refresh it in the background From 3794d696505a60fd550fdd5310ddc0ca74a9d652 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 30 May 2025 17:09:18 +0200 Subject: [PATCH 3/4] Another test for expired tokens --- .../test/in_memory_sync_test.dart | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 6f72ae8f..883ab222 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -730,6 +730,28 @@ void main() { await Future.delayed(const Duration(milliseconds: 500)); expect(syncService.controller.hasListener, isTrue); }); + + test('closes connection after token expires', () async { + final status = await waitForConnection(expectNoWarnings: false); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '4', + writeCheckpoint: null, + checksums: [checksum(bucket: 'a', checksum: 10)], + ) + }); + + await expectLater(status, emits(isSyncStatus(downloading: true))); + syncService.addKeepAlive(0); + + await pumpEventQueue(); + expect(syncService.controller.hasListener, isFalse); + syncService.endCurrentListener(); + + // Should reconnect after delay. + await Future.delayed(const Duration(milliseconds: 500)); + expect(syncService.controller.hasListener, isTrue); + }); }); } From ba335b8e2b369a90b5a658b2332d3d5368d0a2a8 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 30 May 2025 17:30:33 +0200 Subject: [PATCH 4/4] Revert typo --- packages/powersync_core/test/in_memory_sync_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 883ab222..d5bf6e5d 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -690,7 +690,7 @@ void main() { }); }); - test('stopping tions', () async { + test('stopping closes connections', () async { final status = await waitForConnection(); syncService.addLine({