Skip to content

Commit dc33017

Browse files
authored
fix(datastore): Restart Sync Engine when network on/off (#5218)
1 parent 04406a5 commit dc33017

File tree

19 files changed

+213
-29
lines changed

19 files changed

+213
-29
lines changed

packages/amplify/amplify_flutter/lib/amplify_flutter.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ library amplify_flutter;
66
import 'package:amplify_core/amplify_core.dart';
77
import 'package:amplify_flutter/src/amplify_impl.dart';
88

9-
export 'package:amplify_core/amplify_core.dart' hide Amplify;
9+
export 'package:amplify_core/amplify_core.dart' hide Amplify, WebSocketOptions;
1010
export 'package:amplify_secure_storage/amplify_secure_storage.dart';
1111

1212
/// Top level singleton Amplify object.

packages/amplify_core/lib/amplify_core.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ export 'src/state_machine/transition.dart';
6464
export 'src/types/analytics/analytics_types.dart';
6565

6666
/// API
67-
export 'src/types/api/api_types.dart';
67+
export 'src/types/api/api_types.dart' hide WebSocketOptions;
68+
// ignore: invalid_export_of_internal_element
69+
export 'src/types/api/api_types.dart' show WebSocketOptions;
6870

6971
/// App path provider
7072
export 'src/types/app_path_provider/app_path_provider.dart';

packages/amplify_core/lib/src/types/api/api_types.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export 'graphql/graphql_response.dart';
2222
export 'graphql/graphql_response_error.dart';
2323
export 'graphql/graphql_subscription_operation.dart';
2424
export 'graphql/graphql_subscription_options.dart';
25+
export 'graphql/web_socket_options.dart';
2526
export 'hub/api_hub_event.dart';
2627
export 'hub/api_subscription_hub_event.dart';
2728
// Types
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import 'package:meta/meta.dart'; // Importing the 'meta' package to use the @internal annotation
2+
3+
/// An internal class to control websocket features after API plugin has been initialized.
4+
@internal
5+
class WebSocketOptions {
6+
/// Private constructor to prevent instantiation
7+
WebSocketOptions._();
8+
9+
/// Private static boolean field
10+
static bool _autoReconnect = true;
11+
12+
/// Static getter method for the boolean field
13+
@internal
14+
static bool get autoReconnect => _autoReconnect;
15+
16+
/// Static setter method for the boolean field
17+
@internal
18+
static set autoReconnect(bool value) {
19+
_autoReconnect = value;
20+
}
21+
}

packages/amplify_datastore/android/src/main/kotlin/com/amazonaws/amplify/amplify_datastore/pigeons/NativePluginBindings.kt

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/amplify_datastore/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/amplify_datastore/example/ios/Runner/Info.plist

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/amplify_datastore/example/lib/main.dart

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,12 @@ class _MyAppState extends State<MyApp> {
162162
void listenToHub() {
163163
setState(() {
164164
hubSubscription = Amplify.Hub.listen(HubChannel.DataStore, (msg) {
165-
if (msg.type case DataStoreHubEventType.networkStatus) {
166-
print('Network status message: $msg');
165+
final payload = msg.payload;
166+
if (payload is NetworkStatusEvent) {
167+
print('Network status active: ${payload.active}');
167168
return;
168169
}
169-
print(msg);
170+
print(msg.type);
170171
});
171172
_listeningToHub = true;
172173
});
@@ -317,6 +318,9 @@ class _MyAppState extends State<MyApp> {
317318
displayQueryButtons(
318319
_isAmplifyConfigured, _queriesToView, updateQueriesToView),
319320

321+
// Start/Stop/Clear buttons
322+
displaySyncButtons(),
323+
320324
Padding(padding: EdgeInsets.all(5.0)),
321325
Text("Listen to DataStore Hub"),
322326
Switch(

packages/amplify_datastore/example/lib/queries_display_widgets.dart

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,35 @@ Widget getWidgetToDisplayComment(
175175
}),
176176
);
177177
}
178+
179+
Widget displaySyncButtons() {
180+
return Row(mainAxisAlignment: MainAxisAlignment.center, children: [
181+
VerticalDivider(
182+
color: Colors.white,
183+
width: 5,
184+
),
185+
ElevatedButton.icon(
186+
onPressed: () {
187+
Amplify.DataStore.start();
188+
},
189+
icon: Icon(Icons.play_arrow),
190+
label: const Text("Start"),
191+
),
192+
divider,
193+
ElevatedButton.icon(
194+
onPressed: () {
195+
Amplify.DataStore.stop();
196+
},
197+
icon: Icon(Icons.stop),
198+
label: const Text("Stop"),
199+
),
200+
divider,
201+
ElevatedButton.icon(
202+
onPressed: () {
203+
Amplify.DataStore.clear();
204+
},
205+
icon: Icon(Icons.delete_sweep),
206+
label: const Text("Clear"),
207+
),
208+
]);
209+
}

packages/amplify_datastore/ios/Classes/FlutterApiPlugin.swift

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation
88
private let apiAuthFactory: APIAuthProviderFactory
99
private let nativeApiPlugin: NativeApiPlugin
1010
private let nativeSubscriptionEvents: PassthroughSubject<NativeGraphQLSubscriptionResponse, Never>
11-
private var cancellables = AtomicDictionary<AnyCancellable, Void>()
11+
private var cancellables = AtomicDictionary<AnyCancellable?, Void>()
1212
private var endpoints: [String: String]
13+
private var networkMonitor: AmplifyNetworkMonitor
1314

1415
init(
1516
apiAuthProviderFactory: APIAuthProviderFactory,
@@ -21,6 +22,23 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation
2122
self.nativeApiPlugin = nativeApiPlugin
2223
self.nativeSubscriptionEvents = subscriptionEventBus
2324
self.endpoints = endpoints
25+
self.networkMonitor = AmplifyNetworkMonitor()
26+
27+
// Listen to network events and send a notification to Flutter side when disconnected.
28+
// This enables Flutter to clean up the websocket/subscriptions.
29+
do {
30+
let cancellable = try reachabilityPublisher()?.sink(receiveValue: { reachabilityUpdate in
31+
if !reachabilityUpdate.isOnline {
32+
DispatchQueue.main.async {
33+
self.nativeApiPlugin.deviceOffline {}
34+
}
35+
}
36+
})
37+
cancellables.set(value: (), forKey: cancellable) // the subscription is bind with class instance lifecycle, it should be released when stream is finished or unsubscribed
38+
39+
} catch {
40+
print("Failed to create reachability publisher: \(error)")
41+
}
2442
}
2543

2644
public func defaultAuthType() throws -> AWSAuthorizationType {
@@ -122,6 +140,11 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation
122140
errors.contains(where: self.isUnauthorizedError(graphQLError:)) {
123141
return Fail(error: APIError.operationError("Unauthorized", "", nil)).eraseToAnyPublisher()
124142
}
143+
if case .data(.failure(let graphQLResponseError)) = event,
144+
case .error(let errors) = graphQLResponseError,
145+
errors.contains(where: self.isFlutterNetworkError(graphQLError:)){
146+
return Fail(error: APIError.networkError("FlutterNetworkException", nil, URLError(.networkConnectionLost))).eraseToAnyPublisher()
147+
}
125148
return Just(event).setFailureType(to: Error.self).eraseToAnyPublisher()
126149
}
127150
.eraseToAnyPublisher()
@@ -182,6 +205,13 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation
182205
}
183206
return errorTypeValue == "Unauthorized"
184207
}
208+
209+
private func isFlutterNetworkError(graphQLError: GraphQLError) -> Bool {
210+
guard case let .string(errorTypeValue) = graphQLError.extensions?["errorType"] else {
211+
return false
212+
}
213+
return errorTypeValue == "FlutterNetworkException"
214+
}
185215

186216
func asyncQuery(nativeRequest: NativeGraphQLRequest) async -> NativeGraphQLResponse {
187217
await withCheckedContinuation { continuation in
@@ -236,14 +266,23 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation
236266
public func patch(request: RESTRequest) async throws -> RESTTask.Success {
237267
preconditionFailure("method not supported")
238268
}
239-
269+
240270
public func reachabilityPublisher(for apiName: String?) throws -> AnyPublisher<ReachabilityUpdate, Never>? {
241-
preconditionFailure("method not supported")
271+
return networkMonitor.publisher
272+
.compactMap { event in
273+
switch event {
274+
case (.offline, .online):
275+
return ReachabilityUpdate(isOnline: true)
276+
case (.online, .offline):
277+
return ReachabilityUpdate(isOnline: false)
278+
default:
279+
return nil
280+
}
281+
}
282+
.eraseToAnyPublisher()
242283
}
243284

244285
public func reachabilityPublisher() throws -> AnyPublisher<ReachabilityUpdate, Never>? {
245-
return nil
286+
return try reachabilityPublisher(for: nil)
246287
}
247-
248-
249288
}

packages/amplify_datastore/ios/Classes/SwiftAmplifyDataStorePlugin.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,6 @@ public class SwiftAmplifyDataStorePlugin: NSObject, FlutterPlugin, NativeAmplify
145145
nil
146146
)
147147
}
148-
// TODO: Migrate to Async Swift v2
149-
// AmplifyAWSServiceConfiguration.addUserAgentPlatform(.flutter, version: "\(version) /datastore")
150148
try Amplify.configure(with : .data(data))
151149
return completion(.success(()))
152150
} catch let error as ConfigurationError {

packages/amplify_datastore/ios/Classes/api/GraphQLResponse+Decode.swift

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,14 @@ extension GraphQLResponse {
141141
uniquingKeysWith: { _, a in a }
142142
)
143143
}
144-
144+
145+
if error.message?.stringValue?.contains("NetworkException") == true {
146+
extensions = extensions.merging(
147+
["errorType": "FlutterNetworkException"],
148+
uniquingKeysWith: { _, a in a }
149+
)
150+
}
151+
145152
return (try? jsonEncoder.encode(error))
146153
.flatMap { try? jsonDecoder.decode(GraphQLError.self, from: $0) }
147154
.map {

packages/amplify_datastore/ios/Classes/pigeons/NativePluginBindings.swift

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/amplify_datastore/lib/amplify_datastore.dart

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,9 @@ class NativeAmplifyApi
350350
Future<NativeGraphQLSubscriptionResponse> subscribe(
351351
NativeGraphQLRequest request) async {
352352
final flutterRequest = nativeRequestToGraphQLRequest(request);
353-
353+
// Turn off then default reconnection behavior to allow native side to trigger reconnect
354+
// ignore: invalid_use_of_internal_member
355+
WebSocketOptions.autoReconnect = false;
354356
final operation = Amplify.API.subscribe(flutterRequest,
355357
onEstablished: () => sendNativeStartAckEvent(flutterRequest.id));
356358

@@ -376,6 +378,28 @@ class NativeAmplifyApi
376378
}
377379
}
378380

381+
@override
382+
Future<void> deviceOffline() async {
383+
await _notifySubscriptionsDisconnected();
384+
}
385+
386+
Future<void> _notifySubscriptionsDisconnected() async {
387+
_subscriptionsCache.forEach((subId, stream) async {
388+
// Send Swift subscriptions an expected error message when network is lost.
389+
// Swift side is expecting this string to transform into the correct error type.
390+
// This will cause the Sync Engine to enter retry mode and in order to recover it
391+
// later we must unsubscribe and close the websocket.
392+
GraphQLResponseError error = GraphQLResponseError(
393+
message: 'FlutterNetworkException - Network disconnected',
394+
);
395+
sendSubscriptionStreamErrorEvent(subId, error.toJson());
396+
// Note: the websocket will still be closing after this line.
397+
// There may be a small delay in shutdown.
398+
await unsubscribe(subId);
399+
await stream.cancel();
400+
});
401+
}
402+
379403
/// Amplify.DataStore.Stop() callback
380404
///
381405
/// Clean up subscriptions on stop.

packages/amplify_datastore/lib/src/native_plugin.g.dart

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/amplify_datastore/pigeons/native_plugin.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ abstract class NativeApiPlugin {
4242
@async
4343
void unsubscribe(String subscriptionId);
4444

45+
@async
46+
void deviceOffline();
47+
4548
@async
4649
void onStop();
4750
}

packages/api/amplify_api_dart/lib/amplify_api_dart.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
/// Amplify API for Dart
55
library amplify_api_dart;
66

7-
export 'package:amplify_core/src/types/api/api_types.dart';
7+
export 'package:amplify_core/src/types/api/api_types.dart'
8+
hide WebSocketOptions;
89

910
export 'src/api_plugin_impl.dart';
1011

packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/subscriptions_bloc.dart

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,7 @@ class SubscriptionBloc<T>
142142
}
143143

144144
Stream<WsSubscriptionState<T>> _complete(SubscriptionComplete event) async* {
145-
assert(
146-
_currentState is SubscriptionListeningState,
147-
'State should always be listening when completed.',
148-
);
149-
yield (_currentState as SubscriptionListeningState<T>).complete();
145+
yield _currentState.complete();
150146
await close();
151147
}
152148

0 commit comments

Comments
 (0)