Skip to content

Allow disconnecting in fetchCredentials callback #279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/src/abort_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class AbortController {
_abortRequested.complete();
}

await _abortCompleter.future;
await onCompletion;
}

/// Signal that an abort has completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class PowerSyncDatabaseImpl
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params,
}) async {
final dbRef = database.isolateConnectionFactory();
Expand Down Expand Up @@ -157,7 +158,7 @@ class PowerSyncDatabaseImpl
await waitForShutdown();
}

receiveMessages.listen((data) async {
Future<void> handleMessage(Object? data) async {
if (data is List) {
String action = data[0] as String;
if (action == "getCredentials") {
Expand Down Expand Up @@ -192,7 +193,14 @@ class PowerSyncDatabaseImpl
record.level, record.message, record.error, record.stackTrace);
}
}
});
}

// This function is called in a Zone marking the connection lock as locked.
// This is used to prevent reentrant calls to the lock (which would be a
// deadlock). However, the lock is returned as soon as this function
// returns - and handleMessage may run later. So, make sure we run those
// callbacks in the parent zone.
receiveMessages.listen(asyncWorkZone.bindUnaryCallback(handleMessage));

receiveUnhandledErrors.listen((message) async {
// Sample error:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class PowerSyncDatabaseImpl
{required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params}) {
throw UnimplementedError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {

clientParams = params;
var thisConnectAborter = AbortController();
final zone = Zone.current;

late void Function() retryHandler;

Expand All @@ -296,6 +297,9 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
crudThrottleTime: crudThrottleTime,
params: params,
abort: thisConnectAborter,
// Run follow-up async tasks in the parent zone, a new one is introduced
// while we hold the lock (and async tasks won't hold the sync lock).
asyncWorkZone: zone,
);

thisConnectAborter.onCompletion.whenComplete(retryHandler);
Expand Down Expand Up @@ -347,6 +351,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params,
});

Expand All @@ -372,7 +377,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
_abortActiveSync = null;
} else {
/// Wait for the abort to complete. Continue updating the sync status after completed
await disconnector.onAbort;
await disconnector.onCompletion;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PowerSyncDatabaseImpl
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params,
}) async {
final crudStream =
Expand Down
40 changes: 40 additions & 0 deletions packages/powersync_core/test/streaming_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:logging/logging.dart';
import 'package:powersync_core/powersync_core.dart';
import 'package:test/test.dart';

import 'server/sync_server/in_memory_sync_server.dart';
import 'test_server.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';
Expand Down Expand Up @@ -61,6 +62,45 @@ void main() {
server.close();
});

test('can disconnect in fetchCredentials', () async {
final service = MockSyncService();
final server = await createServer(mockSyncService: service);
final ignoreLogger = Logger.detached('powersync.test');

final pdb =
await testUtils.setupPowerSync(path: path, logger: ignoreLogger);
pdb.retryDelay = Duration(milliseconds: 50);
final connector = TestConnector(expectAsync0(() async {
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
}));

await pdb.connect(connector: connector);
while (server.connectionCount != 1) {
await Future<void>.delayed(const Duration(milliseconds: 100));
}

service.addKeepAlive(60);

final didDisconnect = Completer<void>();

connector.fetchCredentialsCallback = expectAsync0(() async {
didDisconnect.complete(pdb.disconnect());

throw 'deliberate disconnect';
});

service.addKeepAlive(0);
await didDisconnect.future;
expect(pdb.currentStatus.connected, isFalse);
// The error should be cleared after calling disconnect
expect(pdb.currentStatus.downloadError, isNull);

// Wait for a short while to make sure the database doesn't reconnect.
for (var i = 0; i < 10; i++) {
expect(pdb.currentStatus.connecting, isFalse);
}
});

test('can connect as initial operation', () async {
final server = await createServer();
final ignoreLogger = Logger.detached('powersync.test');
Expand Down
16 changes: 11 additions & 5 deletions packages/powersync_core/test/test_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:shelf_router/shelf_router.dart';

class TestServer {
import 'server/sync_server/in_memory_sync_server.dart';

final class TestServer {
late HttpServer server;
Router app = Router();
int maxConnectionCount = 0;
int tokenExpiresIn;

TestServer({this.tokenExpiresIn = 65});

Future<void> init() async {
Future<void> init({MockSyncService? mockSyncService}) async {
app.post('/sync/stream', handleSyncStream);
// Open on an arbitrary open port
server = await shelf_io.serve(app.call, 'localhost', 0);
server = await shelf_io.serve(
mockSyncService?.router.call ?? app.call, 'localhost', 0);
}

String get endpoint {
Expand All @@ -34,6 +37,9 @@ class TestServer {
return server.connectionsInfo();
}

/// The default response if no [MockSyncService] has been passed to [init].
///
/// This will emit keepalive messages frequently.
Future<Response> handleSyncStream(Request request) async {
maxConnectionCount = max(connectionCount, maxConnectionCount);

Expand Down Expand Up @@ -61,9 +67,9 @@ class TestServer {
}
}

Future<TestServer> createServer() async {
Future<TestServer> createServer({MockSyncService? mockSyncService}) async {
var server = TestServer();
await server.init();
await server.init(mockSyncService: mockSyncService);
return server;
}

Expand Down
12 changes: 6 additions & 6 deletions packages/powersync_core/test/utils/abstract_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,21 @@ abstract class AbstractTestUtils {
}

class TestConnector extends PowerSyncBackendConnector {
final Future<PowerSyncCredentials> Function() _fetchCredentials;
final Future<void> Function(PowerSyncDatabase)? _uploadData;
Future<PowerSyncCredentials> Function() fetchCredentialsCallback;
Future<void> Function(PowerSyncDatabase)? uploadDataCallback;

TestConnector(this._fetchCredentials,
TestConnector(this.fetchCredentialsCallback,
{Future<void> Function(PowerSyncDatabase)? uploadData})
: _uploadData = uploadData;
: uploadDataCallback = uploadData;

@override
Future<PowerSyncCredentials?> fetchCredentials() {
return _fetchCredentials();
return fetchCredentialsCallback();
}

@override
Future<void> uploadData(PowerSyncDatabase database) async {
await _uploadData?.call(database);
await uploadDataCallback?.call(database);
}
}

Expand Down