Skip to content

Fix reconnecting after checksum failure #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync {
final InternalConnector connector;
final ResolvedSyncOptions options;

final Logger logger = isolateLogger;
final Logger logger;

final Stream<void> crudUpdateTriggerStream;

Expand Down Expand Up @@ -68,14 +68,16 @@ class StreamingSyncImplementation implements StreamingSync {
required http.Client client,
Mutex? syncMutex,
Mutex? crudMutex,
Logger? logger,

/// A unique identifier for this streaming sync implementation
/// A good value is typically the DB file path which it will mutate when syncing.
String? identifier = "unknown",
}) : _client = client,
syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"),
crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"),
_userAgentHeaders = userAgentHeaders();
_userAgentHeaders = userAgentHeaders(),
logger = logger ?? isolateLogger;

Duration get _retryDelay => options.retryDelay;

Expand Down Expand Up @@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync {
@override
Future<void> streamingSync() async {
try {
assert(_abort == null);
_abort = AbortController();
clientId = await adapter.getClientId();
_crudLoop();
Expand Down Expand Up @@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync {
var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream);

Future<void>? credentialsInvalidation;
bool haveInvalidated = false;
bool shouldStopIteration = false;

// Trigger a CRUD upload on reconnect
_internalCrudTriggerController.add(null);
Expand All @@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync {
case StreamingSyncCheckpointComplete():
final result = await _applyCheckpoint(targetCheckpoint!, _abort);
if (result.abort) {
shouldStopIteration = true;
return;
}
case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority):
Expand All @@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
shouldStopIteration = true;
return;
} else if (!result.ready) {
// If we have pending uploads, we can't complete new checkpoints
Expand Down Expand Up @@ -398,13 +403,14 @@ class StreamingSyncImplementation implements StreamingSync {
if (tokenExpiresIn == 0) {
// Token expired already - stop the connection immediately
connector.prefetchCredentials(invalidate: true).ignore();
shouldStopIteration = true;
break;
} else if (tokenExpiresIn <= 30) {
// Token expires soon - refresh it in the background
credentialsInvalidation ??=
connector.prefetchCredentials().then((_) {
// Token has been refreshed - we should restart the connection.
haveInvalidated = true;
shouldStopIteration = true;
// trigger next loop iteration ASAP, don't wait for another
// message from the server.
if (!aborted) {
Expand All @@ -421,7 +427,7 @@ class StreamingSyncImplementation implements StreamingSync {
}

await for (var line in merged) {
if (aborted || haveInvalidated) {
if (aborted || shouldStopIteration) {
break;
}

Expand All @@ -434,10 +440,10 @@ class StreamingSyncImplementation implements StreamingSync {
break;
case TokenRefreshComplete():
// We have a new token, so stop the iteration.
haveInvalidated = true;
shouldStopIteration = true;
}

if (haveInvalidated) {
if (shouldStopIteration) {
// Stop this connection, so that a new one will be started
break;
}
Expand Down
62 changes: 57 additions & 5 deletions packages/powersync_core/test/in_memory_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:powersync_core/powersync_core.dart';
import 'package:powersync_core/sqlite3_common.dart';
import 'package:powersync_core/src/log_internal.dart';
import 'package:powersync_core/src/sync/streaming_sync.dart';
import 'package:powersync_core/src/sync/protocol.dart';
import 'package:test/test.dart';
Expand All @@ -25,6 +24,7 @@ void main() {
late CommonDatabase raw;
late PowerSyncDatabase database;
late MockSyncService syncService;
late Logger logger;

late StreamingSync syncClient;
var credentialsCallbackCount = 0;
Expand All @@ -34,7 +34,7 @@ void main() {
final (client, server) = inMemoryServer();
server.mount(syncService.router.call);

syncClient = database.connectWithMockService(
final thisSyncClient = syncClient = database.connectWithMockService(
client,
TestConnector(() async {
credentialsCallbackCount++;
Expand All @@ -44,10 +44,17 @@ void main() {
expiresAt: DateTime.now(),
);
}, uploadData: (db) => uploadData(db)),
options: const SyncOptions(retryDelay: Duration(milliseconds: 200)),
logger: logger,
);

addTearDown(() async {
await thisSyncClient.abort();
});
}

setUp(() async {
logger = Logger.detached('powersync.active')..level = Level.ALL;
credentialsCallbackCount = 0;
syncService = MockSyncService();

Expand All @@ -58,17 +65,16 @@ void main() {
});

tearDown(() async {
await syncClient.abort();
await database.close();
await syncService.stop();
});

Future<StreamQueue<SyncStatus>> waitForConnection(
{bool expectNoWarnings = true}) async {
if (expectNoWarnings) {
isolateLogger.onRecord.listen((e) {
logger.onRecord.listen((e) {
if (e.level >= Level.WARNING) {
fail('Unexpected log: $e');
fail('Unexpected log: $e, ${e.stackTrace}');
}
});
}
Expand Down Expand Up @@ -700,6 +706,52 @@ void main() {

expect(syncService.controller.hasListener, isFalse);
});

test('closes connection after failed checksum', () async {
final status = await waitForConnection(expectNoWarnings: false);
syncService.addLine({
'checkpoint': Checkpoint(
lastOpId: '4',
writeCheckpoint: null,
checksums: [checksum(bucket: 'a', checksum: 10)],
)
});

await expectLater(status, emits(isSyncStatus(downloading: true)));
syncService.addLine({
'checkpoint_complete': {'last_op_id': '10'}
});

await pumpEventQueue();
expect(syncService.controller.hasListener, isFalse);
syncService.endCurrentListener();

// Should reconnect after delay.
await Future<void>.delayed(const Duration(milliseconds: 500));
expect(syncService.controller.hasListener, isTrue);
});

test('closes connection after token expires', () async {
final status = await waitForConnection(expectNoWarnings: false);
syncService.addLine({
'checkpoint': Checkpoint(
lastOpId: '4',
writeCheckpoint: null,
checksums: [checksum(bucket: 'a', checksum: 10)],
)
});

await expectLater(status, emits(isSyncStatus(downloading: true)));
syncService.addKeepAlive(0);

await pumpEventQueue();
expect(syncService.controller.hasListener, isFalse);
syncService.endCurrentListener();

// Should reconnect after delay.
await Future<void>.delayed(const Duration(milliseconds: 500));
expect(syncService.controller.hasListener, isTrue);
});
});
}

Expand Down
2 changes: 2 additions & 0 deletions packages/powersync_core/test/utils/abstract_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,15 @@ extension MockSync on PowerSyncDatabase {
StreamingSyncImplementation connectWithMockService(
Client client,
PowerSyncBackendConnector connector, {
Logger? logger,
SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)),
}) {
final impl = StreamingSyncImplementation(
adapter: BucketStorage(this),
client: client,
options: ResolvedSyncOptions(options),
connector: InternalConnector.wrap(connector, this),
logger: logger,
crudUpdateTriggerStream: database
.onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)),
);
Expand Down