Skip to content

Commit f7c0c1b

Browse files
tyllarkEquartey
authored andcommitted
fix(api): Reconnect WebSocket when resuming app from a paused state (#5567)
* fix(api): Reconnect WebSocket when resuming app from a paused state
1 parent ba8e7eb commit f7c0c1b

File tree

11 files changed

+365
-2
lines changed

11 files changed

+365
-2
lines changed

packages/api/amplify_api/lib/amplify_api.dart

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,8 @@ library amplify_api;
55

66
export 'package:amplify_api/src/api_plugin_impl.dart';
77
export 'package:amplify_api_dart/amplify_api_dart.dart'
8-
hide AmplifyAPIDart, ConnectivityPlatform, ConnectivityStatus;
8+
hide
9+
AmplifyAPIDart,
10+
ConnectivityPlatform,
11+
ProcessLifeCycle,
12+
ConnectivityStatus;

packages/api/amplify_api/lib/src/api_plugin_impl.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
import 'package:amplify_api/src/connectivity_plus_platform.dart';
5+
import 'package:amplify_api/src/flutter_life_cycle.dart';
56
import 'package:amplify_api_dart/amplify_api_dart.dart';
67
import 'package:amplify_core/amplify_core.dart';
78

@@ -14,6 +15,7 @@ class AmplifyAPI extends AmplifyAPIDart with AWSDebuggable {
1415
super.options,
1516
}) : super(
1617
connectivity: const ConnectivityPlusPlatform(),
18+
processLifeCycle: FlutterLifeCycle(),
1719
);
1820

1921
@override
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'dart:async';
5+
6+
import 'package:amplify_api_dart/amplify_api_dart.dart';
7+
import 'package:flutter/widgets.dart';
8+
import 'package:meta/meta.dart';
9+
10+
/// {@template amplify_api.flutter_life_cycle}
11+
/// Creates a stream of [ProcessStatus] mapped from [AppLifecycleListener](https://api.flutter.dev/flutter/widgets/AppLifecycleListener-class.html).
12+
/// {@endtemplate}
13+
@internal
14+
class FlutterLifeCycle extends ProcessLifeCycle {
15+
/// {@macro amplify_api.flutter_life_cycle}
16+
FlutterLifeCycle() {
17+
AppLifecycleListener(
18+
onStateChange: _onStateChange,
19+
);
20+
}
21+
22+
final _stateController =
23+
StreamController<ProcessStatus>.broadcast(sync: true);
24+
25+
@override
26+
Stream<ProcessStatus> get onStateChanged => _stateController.stream;
27+
28+
void _onStateChange(AppLifecycleState state) {
29+
switch (state) {
30+
case AppLifecycleState.detached:
31+
_stateController.add(ProcessStatus.detached);
32+
case AppLifecycleState.paused:
33+
_stateController.add(ProcessStatus.paused);
34+
case AppLifecycleState.hidden:
35+
_stateController.add(ProcessStatus.hidden);
36+
case AppLifecycleState.inactive:
37+
_stateController.add(ProcessStatus.inactive);
38+
case AppLifecycleState.resumed:
39+
_stateController.add(ProcessStatus.resumed);
40+
}
41+
}
42+
}

packages/api/amplify_api_dart/lib/amplify_api_dart.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ export 'src/graphql/model_helpers/model_subscriptions.dart';
1919

2020
/// Network connectivity util not needed by consumers of Flutter package amplify_api.
2121
export 'src/graphql/web_socket/types/connectivity_platform.dart';
22+
export 'src/graphql/web_socket/types/process_life_cycle.dart';

packages/api/amplify_api_dart/lib/src/api_plugin_impl.dart

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.da
1111
import 'package:amplify_api_dart/src/graphql/web_socket/services/web_socket_service.dart';
1212
import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart';
1313
import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart';
14+
import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart';
1415
import 'package:amplify_api_dart/src/util/amplify_api_config.dart';
1516
import 'package:amplify_api_dart/src/util/amplify_authorization_rest_client.dart';
1617
import 'package:amplify_core/amplify_core.dart';
@@ -30,8 +31,10 @@ class AmplifyAPIDart extends APIPluginInterface with AWSDebuggable {
3031
AmplifyAPIDart({
3132
APIPluginOptions options = const APIPluginOptions(),
3233
ConnectivityPlatform connectivity = const ConnectivityPlatform(),
34+
ProcessLifeCycle processLifeCycle = const ProcessLifeCycle(),
3335
}) : _options = options,
34-
_connectivity = connectivity {
36+
_connectivity = connectivity,
37+
_processLifeCycle = processLifeCycle {
3538
_options.authProviders.forEach(registerAuthProvider);
3639
}
3740

@@ -43,6 +46,9 @@ class AmplifyAPIDart extends APIPluginInterface with AWSDebuggable {
4346
/// Creates a stream representing network connectivity at the hardware level.
4447
final ConnectivityPlatform _connectivity;
4548

49+
/// Creates a stream representing the process life cycle state.
50+
final ProcessLifeCycle _processLifeCycle;
51+
4652
/// A map of the keys from the Amplify API config with auth modes to HTTP clients
4753
/// to use for requests to that endpoint/auth mode. e.g. { "myEndpoint.AWS_IAM": AWSHttpClient}
4854
final Map<String, AWSHttpClient> _clientPool = {};
@@ -277,6 +283,7 @@ class AmplifyAPIDart extends APIPluginInterface with AWSDebuggable {
277283
wsService: AmplifyWebSocketService(),
278284
subscriptionOptions: _options.subscriptionOptions,
279285
connectivity: _connectivity,
286+
processLifeCycle: _processLifeCycle,
280287
);
281288
}
282289

packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import 'package:amplify_api_dart/src/graphql/web_socket/services/web_socket_serv
99
import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart';
1010
import 'package:amplify_api_dart/src/graphql/web_socket/state/ws_subscriptions_state.dart';
1111
import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart';
12+
import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart';
1213
import 'package:amplify_api_dart/src/graphql/web_socket/types/subscriptions_event.dart';
1314
import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart';
1415
import 'package:amplify_core/amplify_core.dart' hide SubscriptionEvent;
@@ -33,8 +34,10 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
3334
required WebSocketService wsService,
3435
required GraphQLSubscriptionOptions subscriptionOptions,
3536
required ConnectivityPlatform connectivity,
37+
required ProcessLifeCycle processLifeCycle,
3638
AWSHttpClient? pollClientOverride,
3739
}) : _connectivity = connectivity,
40+
_processLifeCycle = processLifeCycle,
3841
_pollClient = pollClientOverride ?? AWSHttpClient() {
3942
final subBlocs = <String, SubscriptionBloc<Object?>>{};
4043

@@ -49,6 +52,7 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
4952
);
5053
final blocStream = _wsEventStream.asyncExpand(_eventTransformer);
5154
_networkSubscription = _getConnectivityStream();
55+
_processLifeCycleSubscription = _getProcessLifecycleStream();
5256
_stateSubscription = blocStream.listen(_emit);
5357
add(const InitEvent());
5458
}
@@ -81,10 +85,14 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
8185
late final Stream<WebSocketEvent> _wsEventStream = _wsEventController.stream;
8286
late final StreamSubscription<WebSocketState> _stateSubscription;
8387
late final StreamSubscription<ConnectivityStatus> _networkSubscription;
88+
late final StreamSubscription<ProcessStatus> _processLifeCycleSubscription;
8489

8590
/// Creates a stream representing network connectivity at the hardware level.
8691
final ConnectivityPlatform _connectivity;
8792

93+
/// Creates a stream representing the process life cycle state.
94+
final ProcessLifeCycle _processLifeCycle;
95+
8896
/// The underlying event stream, used only in testing.
8997
@visibleForTesting
9098
Stream<WebSocketEvent> get wsEventStream => _wsEventStream;
@@ -164,6 +172,8 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
164172
yield* _networkLoss();
165173
} else if (event is NetworkFoundEvent) {
166174
yield* _networkFound();
175+
} else if (event is ProcessResumeEvent) {
176+
yield* _processResumed();
167177
} else if (event is PollSuccessEvent) {
168178
yield* _pollSuccess();
169179
} else if (event is PollFailedEvent) {
@@ -328,6 +338,16 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
328338
yield* const Stream.empty();
329339
}
330340

341+
Stream<WebSocketState> _processResumed() async* {
342+
final state = _currentState;
343+
if (state is ConnectedState) {
344+
yield state.reconnecting(networkState: NetworkState.disconnected);
345+
add(const ReconnectEvent());
346+
}
347+
// TODO(dnys1): Yield broken on web debug build.
348+
yield* const Stream.empty();
349+
}
350+
331351
/// Handle successful polls
332352
Stream<WebSocketState> _pollSuccess() async* {
333353
// TODO(dnys1): Yield broken on web debug build.
@@ -467,6 +487,7 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
467487
await Future.wait<void>([
468488
// TODO(equartey): https://github.com/fluttercommunity/plus_plugins/issues/1382
469489
if (!isWindows()) _networkSubscription.cancel(),
490+
_processLifeCycleSubscription.cancel(),
470491
Future.value(_pollClient.close()),
471492
_stateSubscription.cancel(),
472493
_wsEventController.close(),
@@ -507,6 +528,41 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin {
507528
);
508529
}
509530

531+
/// Process life cycle stream monitors when the process resumes from a paused state.
532+
StreamSubscription<ProcessStatus> _getProcessLifecycleStream() {
533+
var prev = ProcessStatus.detached;
534+
return _processLifeCycle.onStateChanged.listen(
535+
(state) {
536+
if (_isResuming(state, prev)) {
537+
// ignore: invalid_use_of_internal_member
538+
if (!WebSocketOptions.autoReconnect) {
539+
_shutdownWithException(
540+
const NetworkException(
541+
'Unable to recover network connection, web socket will close.',
542+
recoverySuggestion: 'Avoid pausing the process.',
543+
),
544+
StackTrace.current,
545+
);
546+
} else {
547+
add(const ProcessResumeEvent());
548+
}
549+
}
550+
551+
prev = state;
552+
},
553+
onError: (Object e, StackTrace st) =>
554+
logger.error('Error in process life cycle stream $e, $st'),
555+
);
556+
}
557+
558+
bool _isResuming(ProcessStatus current, ProcessStatus previous) {
559+
if (previous != ProcessStatus.paused) return false;
560+
561+
return current == ProcessStatus.hidden ||
562+
current == ProcessStatus.inactive ||
563+
current == ProcessStatus.resumed;
564+
}
565+
510566
Future<void> _poll() async {
511567
try {
512568
final res = await _sendPollRequest();
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
/// Possible process life cycle states
5+
enum ProcessStatus {
6+
/// Engine is running without a view.
7+
detached,
8+
9+
/// Application is not visible to the user or responding to user input.
10+
paused,
11+
12+
/// All views of an application are hidden.
13+
hidden,
14+
15+
/// A view of the application is visible, but none have input.
16+
inactive,
17+
18+
/// Default running mode.
19+
resumed,
20+
}
21+
22+
/// {@template amplify_api_dart.process_life_cycle}
23+
/// Used to create a stream representing the process life cycle state.
24+
///
25+
/// The generated stream is empty.
26+
/// {@endtemplate}
27+
class ProcessLifeCycle {
28+
/// {@macro amplify_api_dart.process_life_cycle}
29+
const ProcessLifeCycle();
30+
31+
/// Generates a new stream of [ProcessStatus].
32+
Stream<ProcessStatus> get onStateChanged => const Stream.empty();
33+
}

packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/web_socket_event.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,19 @@ class NetworkLossEvent extends NetworkEvent {
8888
String get runtimeTypeName => 'NetworkLossEvent';
8989
}
9090

91+
/// Discrete class for when the process is resumed
92+
/// Triggers when AppLifecycleListener detects the process has been resumed.
93+
class ProcessResumeEvent extends WebSocketEvent {
94+
/// Create a process resumed event
95+
const ProcessResumeEvent();
96+
97+
@override
98+
String get runtimeTypeName => 'ProcessResumeEvent';
99+
100+
@override
101+
Map<String, Object?> toJson() => const {};
102+
}
103+
91104
/// Triggers when a successful ping to AppSync is made
92105
class PollSuccessEvent extends WebSocketEvent {
93106
/// Create a successful Poll event

packages/api/amplify_api_dart/test/graphql_test.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ void main() {
280280
'payload': {'data': mockSubscriptionData},
281281
};
282282

283+
mockProcessLifeCycleController = StreamController<ProcessStatus>();
283284
mockWebSocketService = MockWebSocketService();
284285
const subscriptionOptions = GraphQLSubscriptionOptions(
285286
pollInterval: Duration(seconds: 1),
@@ -292,6 +293,7 @@ void main() {
292293
subscriptionOptions: subscriptionOptions,
293294
pollClientOverride: mockClient.client,
294295
connectivity: const ConnectivityPlatform(),
296+
processLifeCycle: const MockProcessLifeCycle(),
295297
);
296298

297299
sendMockConnectionAck(mockWebSocketBloc!, mockWebSocketService!);
@@ -599,6 +601,7 @@ void main() {
599601
});
600602

601603
test('should have correct state flow during a failure', () async {
604+
mockProcessLifeCycleController = StreamController<ProcessStatus>();
602605
mockWebSocketService = MockWebSocketService();
603606
const subscriptionOptions = GraphQLSubscriptionOptions(
604607
pollInterval: Duration(seconds: 1),
@@ -613,6 +616,7 @@ void main() {
613616
subscriptionOptions: subscriptionOptions,
614617
pollClientOverride: mockClient.client,
615618
connectivity: const ConnectivityPlatform(),
619+
processLifeCycle: const MockProcessLifeCycle(),
616620
);
617621

618622
final blocReady = Completer<void>();

packages/api/amplify_api_dart/test/util.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.da
1010
import 'package:amplify_api_dart/src/graphql/web_socket/services/web_socket_service.dart';
1111
import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart';
1212
import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart';
13+
import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart';
1314
import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart';
1415
import 'package:amplify_core/amplify_core.dart';
1516
import 'package:amplify_core/src/config/amplify_outputs/data/data_outputs.dart';
@@ -329,6 +330,16 @@ class MockConnectivity extends ConnectivityPlatform {
329330
mockNetworkStreamController.stream;
330331
}
331332

333+
late StreamController<ProcessStatus> mockProcessLifeCycleController;
334+
335+
class MockProcessLifeCycle extends ProcessLifeCycle {
336+
const MockProcessLifeCycle();
337+
338+
@override
339+
Stream<ProcessStatus> get onStateChanged =>
340+
mockProcessLifeCycleController.stream;
341+
}
342+
332343
/// Ensures a query predicate converts to JSON correctly.
333344
void testQueryPredicateTranslation(
334345
QueryPredicate? queryPredicate,

0 commit comments

Comments
 (0)