Skip to content

Fix race condition for checkpoints during uploads #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 88 additions & 56 deletions packages/powersync_core/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class StreamingSyncImplementation implements StreamingSync {
bool _safeToClose = true;

final Mutex syncMutex, crudMutex;
Completer<void>? _activeCrudUpload;

final Map<String, String> _userAgentHeaders;

Expand Down Expand Up @@ -177,7 +178,7 @@ class StreamingSyncImplementation implements StreamingSync {
}

Future<void> crudLoop() async {
await uploadAllCrud();
await _uploadAllCrud();

// Trigger a CRUD upload whenever the upstream trigger fires
// as-well-as whenever the sync stream reconnects.
Expand All @@ -187,11 +188,13 @@ class StreamingSyncImplementation implements StreamingSync {
// The stream here is closed on abort.
await for (var _ in mergeStreams(
[crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
await uploadAllCrud();
await _uploadAllCrud();
}
}

Future<void> uploadAllCrud() async {
Future<void> _uploadAllCrud() {
assert(_activeCrudUpload == null);
final completer = _activeCrudUpload = Completer();
return crudMutex.lock(() async {
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
CrudEntry? checkedCrudItem;
Expand Down Expand Up @@ -244,7 +247,11 @@ class StreamingSyncImplementation implements StreamingSync {
_updateStatus(uploading: false);
}
}
}, timeout: retryDelay);
}, timeout: retryDelay).whenComplete(() {
assert(identical(_activeCrudUpload, completer));
_activeCrudUpload = null;
completer.complete();
});
}

Future<String> getWriteCheckpoint() async {
Expand Down Expand Up @@ -336,7 +343,7 @@ class StreamingSyncImplementation implements StreamingSync {
return (initialRequests, localDescriptions);
}

Future<bool> streamingSyncIteration(
Future<void> streamingSyncIteration(
{AbortController? abortController}) async {
adapter.startSession();

Expand Down Expand Up @@ -379,51 +386,27 @@ class StreamingSyncImplementation implements StreamingSync {
await adapter.removeBuckets([...bucketsToDelete]);
_updateStatus(downloading: true);
case StreamingSyncCheckpointComplete():
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

final now = DateTime.now();
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: now,
priorityStatusEntries: [
if (appliedCheckpoint.checksums.isNotEmpty)
(
hasSynced: true,
lastSyncedAt: now,
priority: maxBy(
appliedCheckpoint.checksums
.map((cs) => BucketPriority(cs.priority)),
(priority) => priority,
compare: BucketPriority.comparator,
)!,
)
],
);
final result =
await _applyCheckpoint(targetCheckpoint!, abortController);
if (result.abort) {
return;
}

validatedCheckpoint = targetCheckpoint;
if (result.didApply) {
appliedCheckpoint = targetCheckpoint;
}
case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority):
final result = await adapter.syncLocalDatabase(targetCheckpoint!,
forPriority: bucketPriority);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
return;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
// If we have pending uploads, we can't complete new checkpoints
// outside of priority 0. We'll resolve this for a complete
// checkpoint later.
} else {
_updateStatusForPriority((
priority: BucketPriority(bucketPriority),
Expand Down Expand Up @@ -494,22 +477,13 @@ class StreamingSyncImplementation implements StreamingSync {
downloadError: _noError,
lastSyncedAt: DateTime.now());
} else if (validatedCheckpoint == targetCheckpoint) {
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
final result =
await _applyCheckpoint(targetCheckpoint!, abortController);
if (result.abort) {
return;
}
if (result.didApply) {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
}
}
}
Expand All @@ -519,7 +493,65 @@ class StreamingSyncImplementation implements StreamingSync {
break;
}
}
return true;
}

Future<({bool abort, bool didApply})> _applyCheckpoint(
Checkpoint targetCheckpoint, AbortController? abortController) async {
var result = await adapter.syncLocalDatabase(targetCheckpoint);
final pendingUpload = _activeCrudUpload;

if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return const (abort: true, didApply: false);
} else if (!result.ready && pendingUpload != null) {
// We have pending entries in the local upload queue or are waiting to
// confirm a write checkpoint, which prevented this checkpoint from
// applying. Wait for that to complete and try again.
isolateLogger.fine('Could not apply checkpoint due to local data. '
'Waiting for in-progress upload before retrying...');
await Future.any([
pendingUpload.future,
if (abortController case final controller?) controller.onAbort,
]);

if (abortController?.aborted == true) {
return const (abort: true, didApply: false);
}

// Try again now that uploads have completed.
result = await adapter.syncLocalDatabase(targetCheckpoint);
}

if (result.checkpointValid && result.ready) {
isolateLogger.fine('validated checkpoint: $targetCheckpoint');
final now = DateTime.now();
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: now,
priorityStatusEntries: [
if (targetCheckpoint.checksums.isNotEmpty)
(
hasSynced: true,
lastSyncedAt: now,
priority: maxBy(
targetCheckpoint.checksums
.map((cs) => BucketPriority(cs.priority)),
(priority) => priority,
compare: BucketPriority.comparator,
)!,
)
],
);

return const (abort: false, didApply: true);
} else {
isolateLogger.fine(
'Could not apply checkpoint. Waiting for next sync complete line');
return const (abort: false, didApply: false);
}
}

Stream<StreamingSyncLine> streamingSyncRequest(
Expand Down
93 changes: 92 additions & 1 deletion packages/powersync_core/test/in_memory_sync_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:powersync_core/powersync_core.dart';
Expand All @@ -22,6 +24,7 @@ void main() {
late MockSyncService syncService;
late StreamingSync syncClient;
var credentialsCallbackCount = 0;
Future<void> Function(PowerSyncDatabase) uploadData = (db) async {};

setUp(() async {
credentialsCallbackCount = 0;
Expand All @@ -42,7 +45,7 @@ void main() {
token: 'token$credentialsCallbackCount',
expiresAt: DateTime.now(),
);
}),
}, uploadData: (db) => uploadData(db)),
);
});

Expand Down Expand Up @@ -329,6 +332,94 @@ void main() {
expect(nextRequest.headers['Authorization'], 'Token token2');
expect(credentialsCallbackCount, 2);
});

test('handles checkpoints during the upload process', () async {
final status = await waitForConnection();

Future<void> expectCustomerRows(dynamic matcher) async {
final rows = await database.getAll('SELECT * FROM customers');
expect(rows, matcher);
}

final uploadStarted = Completer<void>();
final uploadFinished = Completer<void>();

uploadData = (db) async {
if (await db.getCrudBatch() case final batch?) {
uploadStarted.complete();
await uploadFinished.future;
batch.complete();
}
};

// Trigger an upload
await database.execute(
'INSERT INTO customers (id, name, email) VALUES (uuid(), ?, ?)',
['local', 'local@example.org']);
await expectCustomerRows(hasLength(1));
await uploadStarted.future;

// Pretend that the connector takes forever in uploadData, but the data
// gets uploaded before the method returns.
syncService.addLine({
'checkpoint': Checkpoint(
writeCheckpoint: '1',
lastOpId: '2',
checksums: [BucketChecksum(bucket: 'a', priority: 3, checksum: 0)],
)
});
await expectLater(status, emitsThrough(isSyncStatus(downloading: true)));

syncService
..addLine({
'data': {
'bucket': 'a',
'data': [
{
'checksum': 0,
'data': {'name': 'from local', 'email': 'local@example.org'},
'op': 'PUT',
'op_id': '1',
'object_id': '1',
'object_type': 'customers'
},
{
'checksum': 0,
'data': {'name': 'additional', 'email': ''},
'op': 'PUT',
'op_id': '2',
'object_id': '2',
'object_type': 'customers'
}
]
}
})
..addLine({
'checkpoint_complete': {'last_op_id': '2'}
});

// Despite receiving a valid checkpoint with two rows, it should not be
// visible because we have local data pending.
await expectCustomerRows(hasLength(1));

// Mark the upload as completed, this should trigger a write_checkpoint
// request.
final sentCheckpoint = Completer<void>();
syncService.writeCheckpoint = () {
sentCheckpoint.complete();
return {
'data': {'write_checkpoint': '1'}
};
};
uploadFinished.complete();
await sentCheckpoint.future;

// This should apply the checkpoint.
await expectLater(status, emitsThrough(isSyncStatus(downloading: false)));

// Meaning that the two rows are now visible.
await expectCustomerRows(hasLength(2));
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ final class MockSyncService {
Completer<Request> _listener = Completer();

final router = Router();
Object? Function() writeCheckpoint = () {
return {
'data': {'write_checkpoint': '10'}
};
};

MockSyncService() {
router
Expand All @@ -27,7 +32,7 @@ final class MockSyncService {
});
})
..get('/write-checkpoint2.json', (request) {
return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: {
return Response.ok(json.encode(writeCheckpoint()), headers: {
'Content-Type': 'application/json',
});
});
Expand Down