@@ -7,6 +7,7 @@ import 'dart:convert';
7
7
import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.dart' ;
8
8
import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart' ;
9
9
import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart' ;
10
+ import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart' ;
10
11
import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart' ;
11
12
import 'package:amplify_core/amplify_core.dart' ;
12
13
import 'package:test/test.dart' ;
@@ -54,6 +55,7 @@ void main() {
54
55
if (! noConnectivity) {
55
56
mockNetworkStreamController = StreamController <ConnectivityStatus >();
56
57
}
58
+ mockProcessLifeCycleController = StreamController <ProcessStatus >();
57
59
mockPollClient = MockPollClient ();
58
60
service = MockWebSocketService ();
59
61
@@ -66,6 +68,7 @@ void main() {
66
68
connectivity: noConnectivity
67
69
? const ConnectivityPlatform ()
68
70
: const MockConnectivity (),
71
+ processLifeCycle: const MockProcessLifeCycle (),
69
72
);
70
73
71
74
sendMockConnectionAck (bloc! , service! );
@@ -307,6 +310,143 @@ void main() {
307
310
mockPollClient.induceTimeout = false ;
308
311
});
309
312
313
+ test ('should reconnect when process unpauses' , () async {
314
+ var dataCompleter = Completer <String >();
315
+ final subscribeEvent = SubscribeEvent (
316
+ subscriptionRequest,
317
+ () {
318
+ service! .channel.sink.add (mockDataString);
319
+ },
320
+ );
321
+
322
+ final bloc = getWebSocketBloc ();
323
+
324
+ expect (
325
+ bloc.stream,
326
+ emitsInOrder (
327
+ [
328
+ isA <DisconnectedState >(),
329
+ isA <ConnectingState >(),
330
+ isA <ConnectedState >(),
331
+ isA <ReconnectingState >(),
332
+ isA <ConnectingState >(),
333
+ isA <ConnectedState >(),
334
+ ],
335
+ ),
336
+ );
337
+
338
+ bloc.subscribe (subscribeEvent).listen (
339
+ expectAsync1 (
340
+ (event) {
341
+ expect (event.data, json.encode (mockSubscriptionData));
342
+ dataCompleter.complete (event.data);
343
+ },
344
+ count: 2 ,
345
+ ),
346
+ );
347
+
348
+ await dataCompleter.future;
349
+ dataCompleter = Completer <String >();
350
+
351
+ mockProcessLifeCycleController
352
+ ..add (ProcessStatus .paused)
353
+ ..add (ProcessStatus .resumed);
354
+
355
+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
356
+
357
+ service! .channel.sink.add (mockDataString);
358
+ await dataCompleter.future;
359
+ });
360
+
361
+ test ('should throttle reconnect after repeated unpausing' , () async {
362
+ final blocReady = Completer <void >();
363
+ final subscribeEvent = SubscribeEvent (
364
+ subscriptionRequest,
365
+ blocReady.complete,
366
+ );
367
+
368
+ final bloc = getWebSocketBloc ();
369
+
370
+ expect (
371
+ bloc.stream,
372
+ emitsInOrder (
373
+ [
374
+ isA <DisconnectedState >(),
375
+ isA <ConnectingState >(),
376
+ isA <ConnectedState >(),
377
+ isA <ReconnectingState >(),
378
+ isA <ConnectingState >(),
379
+ isA <ConnectedState >(),
380
+ ],
381
+ ),
382
+ reason:
383
+ 'Bloc should debounce multiple reconnection triggers while unpausing.' ,
384
+ );
385
+
386
+ bloc.subscribe (
387
+ subscribeEvent,
388
+ );
389
+
390
+ await blocReady.future;
391
+
392
+ mockProcessLifeCycleController
393
+ ..add (ProcessStatus .paused)
394
+ ..add (ProcessStatus .resumed)
395
+ ..add (ProcessStatus .paused)
396
+ ..add (ProcessStatus .resumed)
397
+ ..add (ProcessStatus .paused)
398
+ ..add (ProcessStatus .resumed)
399
+ ..add (ProcessStatus .paused)
400
+ ..add (ProcessStatus .resumed)
401
+ ..add (ProcessStatus .paused)
402
+ ..add (ProcessStatus .resumed);
403
+ });
404
+
405
+ test ('should reconnect multiple times after unpausing' , () async {
406
+ final blocReady = Completer <void >();
407
+ final subscribeEvent = SubscribeEvent (
408
+ subscriptionRequest,
409
+ blocReady.complete,
410
+ );
411
+
412
+ final bloc = getWebSocketBloc ()
413
+ ..subscribe (
414
+ subscribeEvent,
415
+ );
416
+
417
+ await blocReady.future;
418
+
419
+ mockProcessLifeCycleController..add (ProcessStatus .paused)
420
+ ..add (ProcessStatus .resumed);
421
+
422
+ await expectLater (bloc.stream, emitsThrough (isA <ReconnectingState >()));
423
+
424
+ mockProcessLifeCycleController..add (ProcessStatus .paused)
425
+ ..add (ProcessStatus .resumed);
426
+
427
+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
428
+
429
+ mockProcessLifeCycleController..add (ProcessStatus .paused)
430
+ ..add (ProcessStatus .resumed);
431
+
432
+ await expectLater (bloc.stream, emitsThrough (isA <ReconnectingState >()));
433
+
434
+ mockProcessLifeCycleController..add (ProcessStatus .paused)
435
+ ..add (ProcessStatus .resumed);
436
+
437
+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
438
+
439
+ mockProcessLifeCycleController..add (ProcessStatus .paused)
440
+ ..add (ProcessStatus .resumed);
441
+
442
+ await expectLater (bloc.stream, emitsThrough (isA <ReconnectingState >()));
443
+
444
+ mockProcessLifeCycleController..add (ProcessStatus .paused)
445
+ ..add (ProcessStatus .resumed);
446
+
447
+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
448
+ });
449
+
310
450
test (
311
451
'subscribe() ignores a WebSocket message that comes while the bloc is disconnected' ,
312
452
() async {
@@ -348,13 +488,15 @@ void main() {
348
488
349
489
final badService = MockWebSocketService (badInit: true );
350
490
mockNetworkStreamController = StreamController <ConnectivityStatus >();
491
+ mockProcessLifeCycleController = StreamController <ProcessStatus >();
351
492
final bloc = WebSocketBloc (
352
493
config: testApiKeyConfig,
353
494
authProviderRepo: getTestAuthProviderRepo (),
354
495
wsService: badService,
355
496
subscriptionOptions: subscriptionOptions,
356
497
pollClientOverride: mockPollClient.client,
357
498
connectivity: const MockConnectivity (),
499
+ processLifeCycle: const MockProcessLifeCycle (),
358
500
);
359
501
360
502
expect (
0 commit comments