From 269edfe73dfb03827e6410e304d7ee8d7fd26c9b Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Wed, 11 May 2022 16:05:55 -0700 Subject: [PATCH 1/2] Allow for complete messages to be captured in a subscription --- .../src/Providers/AWSAppSyncRealTimeProvider/index.ts | 10 +++++++++- packages/pubsub/src/PubSub.ts | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 47a7e0d389b..0775dd07005 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -396,7 +396,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { subscriptionFailedCallback, } = this.subscriptionObserverMap.get(id) || {}; - logger.debug({ id, observer, query, variables }); + logger.debug({ id, observer, query, variables, type }); if (type === MESSAGE_TYPES.GQL_DATA && payload && payload.data) { if (observer) { @@ -407,6 +407,14 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { return; } + if (type === MESSAGE_TYPES.GQL_COMPLETE) { + logger.debug(`connection completed for id: ${id}`); + if (observer) { + observer.complete(); + } + return; + } + if (type === MESSAGE_TYPES.GQL_START_ACK) { logger.debug( `subscription ready for ${JSON.stringify({ query, variables })}` diff --git a/packages/pubsub/src/PubSub.ts b/packages/pubsub/src/PubSub.ts index a63091e0b54..d952ef69a50 100644 --- a/packages/pubsub/src/PubSub.ts +++ b/packages/pubsub/src/PubSub.ts @@ -184,7 +184,7 @@ export class PubSubClass { start: console.error, next: value => observer.next({ provider, value }), error: error => observer.error({ provider, error }), - // complete: observer.complete, // TODO: when all completed, complete the outer one + complete: observer.complete, // TODO: when all completed, complete the outer one }) ); From 42a2683630657bd56a51fa03be26935ea2210c07 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 12 May 2022 09:10:34 -0700 Subject: [PATCH 2/2] Update tests for web socket complete messages --- .../AWSAppSyncRealTimeProvider.test.ts | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index 029278c20f3..73a400024ef 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -513,6 +513,60 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); + test('subscription is complete when a connection is formed and a complete message is received', async () => { + expect.assertions(1); + + const mockComplete = jest.fn(); + const observer = provider.subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + const subscription = observer.subscribe({ + // Succeed only when the complete message comes through + complete: mockComplete, + // Closing a hot connection (for cleanup) makes it blow up the test stack + error: () => {}, + }); + await fakeWebSocketInterface?.standardConnectionHandshake(); + await fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_COMPLETE, + payload: {}, + }); + + expect(mockComplete).toBeCalled(); + }); + + test('subscription is complete when a connection is formed and a complete message is received after connection ack', async () => { + expect.assertions(1); + + const mockComplete = jest.fn(); + const observer = provider.subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + const subscription = observer.subscribe({ + // Succeed only when the complete message comes through + complete: mockComplete, + // Closing a hot connection (for cleanup) makes it blow up the test stack + error: () => {}, + }); + await fakeWebSocketInterface?.standardConnectionHandshake(); + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('start_ack', { + data: JSON.stringify({ + type: MESSAGE_TYPES.GQL_START_ACK, + payload: { connectionTimeoutMs: 100 }, + }), + }) + ); + await fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_COMPLETE, + payload: {}, + }); + + expect(mockComplete).toBeCalled(); + }); + test('socket is closed when subscription is closed', async () => { expect.assertions(1);