From 21b41659b4dd997887bc84882b262365909c5165 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 1 Nov 2024 11:04:31 +0200 Subject: [PATCH 1/8] add abort signal support to our async iterables --- src/execution/PromiseCanceller.ts | 44 +++- .../__tests__/PromiseCanceller-test.ts | 240 ++++++++++++++++-- src/execution/__tests__/abort-signal-test.ts | 193 +++++++++++++- src/execution/execute.ts | 26 +- 4 files changed, 457 insertions(+), 46 deletions(-) diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts index 60c3e3b6a3..b5af32cfce 100644 --- a/src/execution/PromiseCanceller.ts +++ b/src/execution/PromiseCanceller.ts @@ -1,8 +1,8 @@ import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; /** - * A PromiseCanceller object can be used to cancel multiple promises - * using a single AbortSignal. + * A PromiseCanceller object can be used to trigger multiple responses + * in response to a single AbortSignal. * * @internal */ @@ -28,14 +28,21 @@ export class PromiseCanceller { this.abortSignal.removeEventListener('abort', this.abort); } - withCancellation(originalPromise: Promise): Promise { + cancellablePromise( + originalPromise: Promise, + onCancel?: (() => unknown) | undefined, + ): Promise { if (this.abortSignal.aborted) { + onCancel?.(); // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors return Promise.reject(this.abortSignal.reason); } const { promise, resolve, reject } = promiseWithResolvers(); - const abort = () => reject(this.abortSignal.reason); + const abort = () => { + onCancel?.(); + reject(this.abortSignal.reason); + }; this._aborts.add(abort); originalPromise.then( (resolved) => { @@ -50,4 +57,33 @@ export class PromiseCanceller { return promise; } + + cancellableIterable(iterable: AsyncIterable): AsyncIterable { + const iterator = iterable[Symbol.asyncIterator](); + + if (iterator.return) { + const _return = iterator.return.bind(iterator); + const _returnIgnoringErrors = async (): Promise> => { + _return().catch(() => { + /* c8 ignore next */ + // ignore + }); + return Promise.resolve({ value: undefined, done: true }); + }; + + return { + [Symbol.asyncIterator]: () => ({ + next: () => + this.cancellablePromise(iterator.next(), _returnIgnoringErrors), + return: () => this.cancellablePromise(_return()), + }), + }; + } + + return { + [Symbol.asyncIterator]: () => ({ + next: () => this.cancellablePromise(iterator.next()), + }), + }; + } } diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts index 91fe6c40e5..500b05f343 100644 --- a/src/execution/__tests__/PromiseCanceller-test.ts +++ b/src/execution/__tests__/PromiseCanceller-test.ts @@ -1,3 +1,4 @@ +import { expect } from 'chai'; import { describe, it } from 'mocha'; import { expectPromise } from '../../__testUtils__/expectPromise.js'; @@ -5,52 +6,237 @@ import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { PromiseCanceller } from '../PromiseCanceller.js'; describe('PromiseCanceller', () => { - it('works to cancel an already resolved promise', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; + describe('cancellablePromise', () => { + it('works to cancel an already resolved promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; - const promiseCanceller = new PromiseCanceller(abortSignal); + const promiseCanceller = new PromiseCanceller(abortSignal); - const promise = Promise.resolve(1); + const promise = Promise.resolve(1); - const withCancellation = promiseCanceller.withCancellation(promise); + const withCancellation = promiseCanceller.cancellablePromise(promise); - abortController.abort(new Error('Cancelled!')); + abortController.abort(new Error('Cancelled!')); - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel an already resolved promise after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = Promise.resolve(1); + + const withCancellation = promiseCanceller.cancellablePromise(promise); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = promiseCanceller.cancellablePromise(promise); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); - it('works to cancel a hanging promise', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; + it('works to cancel a hanging promise created after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; - const promiseCanceller = new PromiseCanceller(abortSignal); + abortController.abort(new Error('Cancelled!')); - const promise = new Promise(() => { - /* never resolves */ + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = promiseCanceller.cancellablePromise(promise); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to trigger onCancel when cancelling a hanging promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + let onCancelCalled = false; + const onCancel = () => { + onCancelCalled = true; + }; + + const withCancellation = promiseCanceller.cancellablePromise( + promise, + onCancel, + ); + + expect(onCancelCalled).to.equal(false); + + abortController.abort(new Error('Cancelled!')); + + expect(onCancelCalled).to.equal(true); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); }); - const withCancellation = promiseCanceller.withCancellation(promise); + it('works to trigger onCancel when cancelling a hanging promise created after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); - abortController.abort(new Error('Cancelled!')); + let onCancelCalled = false; + const onCancel = () => { + onCancelCalled = true; + }; - await expectPromise(withCancellation).toRejectWith('Cancelled!'); + const withCancellation = promiseCanceller.cancellablePromise( + promise, + onCancel, + ); + + expect(onCancelCalled).to.equal(true); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); }); - it('works to cancel a hanging promise created after abort signal triggered', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; + describe('cancellableAsyncIterable', () => { + it('works to abort a next call', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const asyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ value: 1, done: false }), + }), + }; + + const cancellableAsyncIterable = + promiseCanceller.cancellableIterable(asyncIterable); + + const nextPromise = + cancellableAsyncIterable[Symbol.asyncIterator]().next(); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(nextPromise).toRejectWith('Cancelled!'); + }); + + it('works to abort a next call when already aborted', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const asyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ value: 1, done: false }), + }), + }; + + const cancellableAsyncIterable = + promiseCanceller.cancellableIterable(asyncIterable); + + const nextPromise = + cancellableAsyncIterable[Symbol.asyncIterator]().next(); - abortController.abort(new Error('Cancelled!')); + await expectPromise(nextPromise).toRejectWith('Cancelled!'); + }); + + it('works to call return', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + let returned = false; + const asyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ value: 1, done: false }), + return: () => { + returned = true; + return Promise.resolve({ value: undefined, done: true }); + }, + }), + }; + + const cancellableAsyncIterable = + promiseCanceller.cancellableIterable(asyncIterable); + + abortController.abort(new Error('Cancelled!')); + + expect(returned).to.equal(false); + + const nextPromise = + cancellableAsyncIterable[Symbol.asyncIterator]().next(); - const promiseCanceller = new PromiseCanceller(abortSignal); + expect(returned).to.equal(true); - const promise = new Promise(() => { - /* never resolves */ + await expectPromise(nextPromise).toRejectWith('Cancelled!'); }); - const withCancellation = promiseCanceller.withCancellation(promise); + it('works to call return when already aborted', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; - await expectPromise(withCancellation).toRejectWith('Cancelled!'); + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + let returned = false; + const asyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ value: 1, done: false }), + return: () => { + returned = true; + return Promise.resolve({ value: undefined, done: true }); + }, + }), + }; + + const cancellableAsyncIterable = + promiseCanceller.cancellableIterable(asyncIterable); + + expect(returned).to.equal(false); + + const nextPromise = + cancellableAsyncIterable[Symbol.asyncIterator]().next(); + + expect(returned).to.equal(true); + + await expectPromise(nextPromise).toRejectWith('Cancelled!'); + }); }); }); diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index d12253b517..e07b1e8c5b 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -1,9 +1,12 @@ -import { expect } from 'chai'; +import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; +import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; +import { isAsyncIterable } from '../../jsutils/isAsyncIterable.js'; + import type { DocumentNode } from '../../language/ast.js'; import { parse } from '../../language/parser.js'; @@ -400,6 +403,56 @@ describe('Execute: Cancellation', () => { }); }); + it('should stop the execution when aborted despite a hanging async item', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + items + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + todo: () => ({ + id: '1', + async *items() { + yield await new Promise(() => { + /* will never resolve */ + }); /* c8 ignore start */ + } /* c8 ignore stop */, + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: { + todo: { + id: '1', + items: null, + }, + }, + errors: [ + { + message: 'This operation was aborted', + path: ['todo', 'items'], + locations: [{ line: 5, column: 11 }], + }, + ], + }); + }); + it('should stop the execution when aborted with proper null bubbling', async () => { const abortController = new AbortController(); const document = parse(` @@ -610,6 +663,63 @@ describe('Execute: Cancellation', () => { ]); }); + it('should stop streamed execution when aborted', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + items @stream + } + } + `); + + const resultPromise = complete( + document, + { + todo: { + id: '1', + items: [Promise.resolve('item')], + }, + }, + abortController.signal, + ); + + abortController.abort(); + + const result = await resultPromise; + + expectJSON(result).toDeepEqual([ + { + data: { + todo: { + id: '1', + items: [], + }, + }, + pending: [{ id: '0', path: ['todo', 'items'] }], + hasNext: true, + }, + { + incremental: [ + { + items: [null], + errors: [ + { + message: 'This operation was aborted', + path: ['todo', 'items', 0], + locations: [{ line: 5, column: 11 }], + }, + ], + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: false, + }, + ]); + }); + it('should stop the execution when aborted mid-mutation', async () => { const abortController = new AbortController(); const document = parse(` @@ -693,7 +803,7 @@ describe('Execute: Cancellation', () => { } `); - const resultPromise = subscribe({ + const subscriptionPromise = subscribe({ document, schema, abortSignal: abortController.signal, @@ -707,7 +817,7 @@ describe('Execute: Cancellation', () => { abortController.abort(); - const result = await resultPromise; + const result = await subscriptionPromise; expectJSON(result).toDeepEqual({ errors: [ @@ -719,4 +829,81 @@ describe('Execute: Cancellation', () => { ], }); }); + + it('should stop the execution when aborted during subscription', async () => { + const abortController = new AbortController(); + const document = parse(` + subscription { + foo + } + `); + + const subscription = subscribe({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + async *foo() { + yield await Promise.resolve({ foo: 'foo' }); + yield await Promise.resolve({ foo: 'foo' }); /* c8 ignore start */ + } /* c8 ignore stop */, + }, + }); + + assert(isAsyncIterable(subscription)); + + expectJSON(await subscription.next()).toDeepEqual({ + value: { + data: { + foo: 'foo', + }, + }, + done: false, + }); + + abortController.abort(); + + await expectPromise(subscription.next()).toRejectWith( + 'This operation was aborted', + ); + }); + + it('should stop the execution when aborted during subscription returned asynchronously', async () => { + const abortController = new AbortController(); + const document = parse(` + subscription { + foo + } + `); + + async function* foo() { + yield await Promise.resolve({ foo: 'foo' }); + } + + const subscription = await subscribe({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + foo: Promise.resolve(foo()), + }, + }); + + assert(isAsyncIterable(subscription)); + + expectJSON(await subscription.next()).toDeepEqual({ + value: { + data: { + foo: 'foo', + }, + }, + done: false, + }); + + abortController.abort(); + + await expectPromise(subscription.next()).toRejectWith( + 'This operation was aborted', + ); + }); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1258abf279..959be4cd0a 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -878,7 +878,7 @@ function executeField( fieldDetailsList, info, path, - promiseCanceller?.withCancellation(result) ?? result, + promiseCanceller?.cancellablePromise(result) ?? result, incrementalContext, deferMap, ); @@ -1386,7 +1386,9 @@ function completeListValue( const itemType = returnType.ofType; if (isAsyncIterable(result)) { - const asyncIterator = result[Symbol.asyncIterator](); + const maybeCancellableIterable = + exeContext.promiseCanceller?.cancellableIterable(result) ?? result; + const asyncIterator = maybeCancellableIterable[Symbol.asyncIterator](); return completeAsyncIteratorValue( exeContext, @@ -1597,7 +1599,7 @@ async function completePromisedListItemValue( deferMap: ReadonlyMap | undefined, ): Promise { try { - const resolved = await (exeContext.promiseCanceller?.withCancellation( + const resolved = await (exeContext.promiseCanceller?.cancellablePromise( item, ) ?? item); let completed = completeValue( @@ -2263,17 +2265,16 @@ function executeSubscription( // used to represent an authenticated user, or request-specific caches. const result = resolveFn(rootValue, args, contextValue, info, abortSignal); + const promiseCanceller = abortSignal + ? new PromiseCanceller(abortSignal) + : undefined; + if (isPromise(result)) { - const promiseCanceller = abortSignal - ? new PromiseCanceller(abortSignal) - : undefined; - const promise = promiseCanceller?.withCancellation(result) ?? result; + const promise = promiseCanceller?.cancellablePromise(result) ?? result; return promise.then(assertEventStream).then( (resolved) => { - // TODO: add test case - /* c8 ignore next */ promiseCanceller?.disconnect(); - return resolved; + return promiseCanceller?.cancellableIterable(resolved) ?? resolved; }, (error: unknown) => { promiseCanceller?.disconnect(); @@ -2282,7 +2283,8 @@ function executeSubscription( ); } - return assertEventStream(result); + const eventStream = assertEventStream(result); + return promiseCanceller?.cancellableIterable(eventStream) ?? eventStream; } catch (error) { throw locatedError(error, fieldNodes, pathToArray(path)); } @@ -2648,7 +2650,7 @@ function completeStreamItem( fieldDetailsList, info, itemPath, - exeContext.promiseCanceller?.withCancellation(item) ?? item, + exeContext.promiseCanceller?.cancellablePromise(item) ?? item, incrementalContext, new Map(), ).then( From b42febbe654a77617993af809e5ddb9aeefa3d1c Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 3 Nov 2024 13:30:05 +0200 Subject: [PATCH 2/8] abort without a pending next --- src/execution/PromiseCanceller.ts | 42 +++++++++--- .../__tests__/PromiseCanceller-test.ts | 67 ++----------------- src/execution/__tests__/abort-signal-test.ts | 39 ++++++++++- 3 files changed, 72 insertions(+), 76 deletions(-) diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts index b5af32cfce..70585d3bb0 100644 --- a/src/execution/PromiseCanceller.ts +++ b/src/execution/PromiseCanceller.ts @@ -28,19 +28,14 @@ export class PromiseCanceller { this.abortSignal.removeEventListener('abort', this.abort); } - cancellablePromise( - originalPromise: Promise, - onCancel?: (() => unknown) | undefined, - ): Promise { + cancellablePromise(originalPromise: Promise): Promise { if (this.abortSignal.aborted) { - onCancel?.(); // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors return Promise.reject(this.abortSignal.reason); } const { promise, resolve, reject } = promiseWithResolvers(); const abort = () => { - onCancel?.(); reject(this.abortSignal.reason); }; this._aborts.add(abort); @@ -62,20 +57,45 @@ export class PromiseCanceller { const iterator = iterable[Symbol.asyncIterator](); if (iterator.return) { + const _next = iterator.next.bind(iterator); const _return = iterator.return.bind(iterator); - const _returnIgnoringErrors = async (): Promise> => { + + const abort = () => { _return().catch(() => { /* c8 ignore next */ // ignore }); - return Promise.resolve({ value: undefined, done: true }); }; + if (this.abortSignal.aborted) { + abort(); + // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors + const onMethod = () => Promise.reject(this.abortSignal.reason); + return { + [Symbol.asyncIterator]: () => ({ + next: onMethod, + return: onMethod, + }), + }; + } + + this._aborts.add(abort); + const on = (method: () => Promise>) => async () => { + try { + const iteration = await this.cancellablePromise(method()); + if (iteration.done) { + this._aborts.delete(abort); + } + return iteration; + } catch (error) { + this._aborts.delete(abort); + throw error; + } + }; return { [Symbol.asyncIterator]: () => ({ - next: () => - this.cancellablePromise(iterator.next(), _returnIgnoringErrors), - return: () => this.cancellablePromise(_return()), + next: on(_next), + return: on(_return), }), }; } diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts index 500b05f343..3800afdeb2 100644 --- a/src/execution/__tests__/PromiseCanceller-test.ts +++ b/src/execution/__tests__/PromiseCanceller-test.ts @@ -70,62 +70,6 @@ describe('PromiseCanceller', () => { await expectPromise(withCancellation).toRejectWith('Cancelled!'); }); - - it('works to trigger onCancel when cancelling a hanging promise', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = new Promise(() => { - /* never resolves */ - }); - - let onCancelCalled = false; - const onCancel = () => { - onCancelCalled = true; - }; - - const withCancellation = promiseCanceller.cancellablePromise( - promise, - onCancel, - ); - - expect(onCancelCalled).to.equal(false); - - abortController.abort(new Error('Cancelled!')); - - expect(onCancelCalled).to.equal(true); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); - - it('works to trigger onCancel when cancelling a hanging promise created after abort signal triggered', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - abortController.abort(new Error('Cancelled!')); - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = new Promise(() => { - /* never resolves */ - }); - - let onCancelCalled = false; - const onCancel = () => { - onCancelCalled = true; - }; - - const withCancellation = promiseCanceller.cancellablePromise( - promise, - onCancel, - ); - - expect(onCancelCalled).to.equal(true); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); }); describe('cancellableAsyncIterable', () => { @@ -197,13 +141,11 @@ describe('PromiseCanceller', () => { abortController.abort(new Error('Cancelled!')); - expect(returned).to.equal(false); + expect(returned).to.equal(true); const nextPromise = cancellableAsyncIterable[Symbol.asyncIterator]().next(); - expect(returned).to.equal(true); - await expectPromise(nextPromise).toRejectWith('Cancelled!'); }); @@ -218,7 +160,8 @@ describe('PromiseCanceller', () => { let returned = false; const asyncIterable = { [Symbol.asyncIterator]: () => ({ - next: () => Promise.resolve({ value: 1, done: false }), + /* c8 ignore next */ + next: () => Promise.resolve({ value: undefined, done: true }), return: () => { returned = true; return Promise.resolve({ value: undefined, done: true }); @@ -229,13 +172,11 @@ describe('PromiseCanceller', () => { const cancellableAsyncIterable = promiseCanceller.cancellableIterable(asyncIterable); - expect(returned).to.equal(false); + expect(returned).to.equal(true); const nextPromise = cancellableAsyncIterable[Symbol.asyncIterator]().next(); - expect(returned).to.equal(true); - await expectPromise(nextPromise).toRejectWith('Cancelled!'); }); }); diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index e07b1e8c5b..45fc68e2cc 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -795,7 +795,7 @@ describe('Execute: Cancellation', () => { }); }); - it('should stop the execution when aborted during subscription', async () => { + it('should stop the execution when aborted prior to return of the subscription resolver', async () => { const abortController = new AbortController(); const document = parse(` subscription { @@ -830,7 +830,7 @@ describe('Execute: Cancellation', () => { }); }); - it('should stop the execution when aborted during subscription', async () => { + it('should successfully wrap the subscription', async () => { const abortController = new AbortController(); const document = parse(` subscription { @@ -845,6 +845,41 @@ describe('Execute: Cancellation', () => { rootValue: { async *foo() { yield await Promise.resolve({ foo: 'foo' }); + }, + }, + }); + + assert(isAsyncIterable(subscription)); + + expectJSON(await subscription.next()).toDeepEqual({ + value: { + data: { + foo: 'foo', + }, + }, + done: false, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + value: undefined, + done: true, + }); + }); + + it('should stop the execution when aborted during subscription', async () => { + const abortController = new AbortController(); + const document = parse(` + subscription { + foo + } + `); + + const subscription = subscribe({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + async *foo() { yield await Promise.resolve({ foo: 'foo' }); /* c8 ignore start */ } /* c8 ignore stop */, }, From d91b66f8211987978696ad4d1578a56a518e0587 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 4 Nov 2024 13:49:56 +0200 Subject: [PATCH 3/8] Revert "abort without a pending next" This reverts commit b42febbe654a77617993af809e5ddb9aeefa3d1c. --- src/execution/PromiseCanceller.ts | 42 +++--------- .../__tests__/PromiseCanceller-test.ts | 67 +++++++++++++++++-- src/execution/__tests__/abort-signal-test.ts | 39 +---------- 3 files changed, 76 insertions(+), 72 deletions(-) diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts index 70585d3bb0..b5af32cfce 100644 --- a/src/execution/PromiseCanceller.ts +++ b/src/execution/PromiseCanceller.ts @@ -28,14 +28,19 @@ export class PromiseCanceller { this.abortSignal.removeEventListener('abort', this.abort); } - cancellablePromise(originalPromise: Promise): Promise { + cancellablePromise( + originalPromise: Promise, + onCancel?: (() => unknown) | undefined, + ): Promise { if (this.abortSignal.aborted) { + onCancel?.(); // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors return Promise.reject(this.abortSignal.reason); } const { promise, resolve, reject } = promiseWithResolvers(); const abort = () => { + onCancel?.(); reject(this.abortSignal.reason); }; this._aborts.add(abort); @@ -57,45 +62,20 @@ export class PromiseCanceller { const iterator = iterable[Symbol.asyncIterator](); if (iterator.return) { - const _next = iterator.next.bind(iterator); const _return = iterator.return.bind(iterator); - - const abort = () => { + const _returnIgnoringErrors = async (): Promise> => { _return().catch(() => { /* c8 ignore next */ // ignore }); + return Promise.resolve({ value: undefined, done: true }); }; - if (this.abortSignal.aborted) { - abort(); - // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors - const onMethod = () => Promise.reject(this.abortSignal.reason); - return { - [Symbol.asyncIterator]: () => ({ - next: onMethod, - return: onMethod, - }), - }; - } - - this._aborts.add(abort); - const on = (method: () => Promise>) => async () => { - try { - const iteration = await this.cancellablePromise(method()); - if (iteration.done) { - this._aborts.delete(abort); - } - return iteration; - } catch (error) { - this._aborts.delete(abort); - throw error; - } - }; return { [Symbol.asyncIterator]: () => ({ - next: on(_next), - return: on(_return), + next: () => + this.cancellablePromise(iterator.next(), _returnIgnoringErrors), + return: () => this.cancellablePromise(_return()), }), }; } diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts index 3800afdeb2..500b05f343 100644 --- a/src/execution/__tests__/PromiseCanceller-test.ts +++ b/src/execution/__tests__/PromiseCanceller-test.ts @@ -70,6 +70,62 @@ describe('PromiseCanceller', () => { await expectPromise(withCancellation).toRejectWith('Cancelled!'); }); + + it('works to trigger onCancel when cancelling a hanging promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + let onCancelCalled = false; + const onCancel = () => { + onCancelCalled = true; + }; + + const withCancellation = promiseCanceller.cancellablePromise( + promise, + onCancel, + ); + + expect(onCancelCalled).to.equal(false); + + abortController.abort(new Error('Cancelled!')); + + expect(onCancelCalled).to.equal(true); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to trigger onCancel when cancelling a hanging promise created after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + let onCancelCalled = false; + const onCancel = () => { + onCancelCalled = true; + }; + + const withCancellation = promiseCanceller.cancellablePromise( + promise, + onCancel, + ); + + expect(onCancelCalled).to.equal(true); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); }); describe('cancellableAsyncIterable', () => { @@ -141,11 +197,13 @@ describe('PromiseCanceller', () => { abortController.abort(new Error('Cancelled!')); - expect(returned).to.equal(true); + expect(returned).to.equal(false); const nextPromise = cancellableAsyncIterable[Symbol.asyncIterator]().next(); + expect(returned).to.equal(true); + await expectPromise(nextPromise).toRejectWith('Cancelled!'); }); @@ -160,8 +218,7 @@ describe('PromiseCanceller', () => { let returned = false; const asyncIterable = { [Symbol.asyncIterator]: () => ({ - /* c8 ignore next */ - next: () => Promise.resolve({ value: undefined, done: true }), + next: () => Promise.resolve({ value: 1, done: false }), return: () => { returned = true; return Promise.resolve({ value: undefined, done: true }); @@ -172,11 +229,13 @@ describe('PromiseCanceller', () => { const cancellableAsyncIterable = promiseCanceller.cancellableIterable(asyncIterable); - expect(returned).to.equal(true); + expect(returned).to.equal(false); const nextPromise = cancellableAsyncIterable[Symbol.asyncIterator]().next(); + expect(returned).to.equal(true); + await expectPromise(nextPromise).toRejectWith('Cancelled!'); }); }); diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index 45fc68e2cc..e07b1e8c5b 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -795,7 +795,7 @@ describe('Execute: Cancellation', () => { }); }); - it('should stop the execution when aborted prior to return of the subscription resolver', async () => { + it('should stop the execution when aborted during subscription', async () => { const abortController = new AbortController(); const document = parse(` subscription { @@ -830,42 +830,6 @@ describe('Execute: Cancellation', () => { }); }); - it('should successfully wrap the subscription', async () => { - const abortController = new AbortController(); - const document = parse(` - subscription { - foo - } - `); - - const subscription = subscribe({ - document, - schema, - abortSignal: abortController.signal, - rootValue: { - async *foo() { - yield await Promise.resolve({ foo: 'foo' }); - }, - }, - }); - - assert(isAsyncIterable(subscription)); - - expectJSON(await subscription.next()).toDeepEqual({ - value: { - data: { - foo: 'foo', - }, - }, - done: false, - }); - - expectJSON(await subscription.next()).toDeepEqual({ - value: undefined, - done: true, - }); - }); - it('should stop the execution when aborted during subscription', async () => { const abortController = new AbortController(); const document = parse(` @@ -880,6 +844,7 @@ describe('Execute: Cancellation', () => { abortSignal: abortController.signal, rootValue: { async *foo() { + yield await Promise.resolve({ foo: 'foo' }); yield await Promise.resolve({ foo: 'foo' }); /* c8 ignore start */ } /* c8 ignore stop */, }, From a521d6928a0359a5316805334e533eae8485b04f Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 4 Nov 2024 13:50:02 +0200 Subject: [PATCH 4/8] clean up PromiseCanceller when used with subscriptions --- src/execution/PromiseCanceller.ts | 21 +-- .../__tests__/PromiseCanceller-test.ts | 121 ------------------ src/execution/__tests__/abort-signal-test.ts | 49 ++++++- .../__tests__/mapAsyncIterable-test.ts | 48 +++++++ src/execution/execute.ts | 39 ++++-- src/execution/mapAsyncIterable.ts | 24 +++- 6 files changed, 143 insertions(+), 159 deletions(-) diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts index b5af32cfce..3bde6df85d 100644 --- a/src/execution/PromiseCanceller.ts +++ b/src/execution/PromiseCanceller.ts @@ -28,19 +28,14 @@ export class PromiseCanceller { this.abortSignal.removeEventListener('abort', this.abort); } - cancellablePromise( - originalPromise: Promise, - onCancel?: (() => unknown) | undefined, - ): Promise { + cancellablePromise(originalPromise: Promise): Promise { if (this.abortSignal.aborted) { - onCancel?.(); // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors return Promise.reject(this.abortSignal.reason); } const { promise, resolve, reject } = promiseWithResolvers(); const abort = () => { - onCancel?.(); reject(this.abortSignal.reason); }; this._aborts.add(abort); @@ -61,20 +56,14 @@ export class PromiseCanceller { cancellableIterable(iterable: AsyncIterable): AsyncIterable { const iterator = iterable[Symbol.asyncIterator](); + const _next = iterator.next.bind(iterator); + if (iterator.return) { const _return = iterator.return.bind(iterator); - const _returnIgnoringErrors = async (): Promise> => { - _return().catch(() => { - /* c8 ignore next */ - // ignore - }); - return Promise.resolve({ value: undefined, done: true }); - }; return { [Symbol.asyncIterator]: () => ({ - next: () => - this.cancellablePromise(iterator.next(), _returnIgnoringErrors), + next: () => this.cancellablePromise(_next()), return: () => this.cancellablePromise(_return()), }), }; @@ -82,7 +71,7 @@ export class PromiseCanceller { return { [Symbol.asyncIterator]: () => ({ - next: () => this.cancellablePromise(iterator.next()), + next: () => this.cancellablePromise(_next()), }), }; } diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts index 500b05f343..5800c4ceac 100644 --- a/src/execution/__tests__/PromiseCanceller-test.ts +++ b/src/execution/__tests__/PromiseCanceller-test.ts @@ -1,4 +1,3 @@ -import { expect } from 'chai'; import { describe, it } from 'mocha'; import { expectPromise } from '../../__testUtils__/expectPromise.js'; @@ -70,62 +69,6 @@ describe('PromiseCanceller', () => { await expectPromise(withCancellation).toRejectWith('Cancelled!'); }); - - it('works to trigger onCancel when cancelling a hanging promise', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = new Promise(() => { - /* never resolves */ - }); - - let onCancelCalled = false; - const onCancel = () => { - onCancelCalled = true; - }; - - const withCancellation = promiseCanceller.cancellablePromise( - promise, - onCancel, - ); - - expect(onCancelCalled).to.equal(false); - - abortController.abort(new Error('Cancelled!')); - - expect(onCancelCalled).to.equal(true); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); - - it('works to trigger onCancel when cancelling a hanging promise created after abort signal triggered', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - abortController.abort(new Error('Cancelled!')); - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = new Promise(() => { - /* never resolves */ - }); - - let onCancelCalled = false; - const onCancel = () => { - onCancelCalled = true; - }; - - const withCancellation = promiseCanceller.cancellablePromise( - promise, - onCancel, - ); - - expect(onCancelCalled).to.equal(true); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); }); describe('cancellableAsyncIterable', () => { @@ -174,69 +117,5 @@ describe('PromiseCanceller', () => { await expectPromise(nextPromise).toRejectWith('Cancelled!'); }); - - it('works to call return', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - const promiseCanceller = new PromiseCanceller(abortSignal); - - let returned = false; - const asyncIterable = { - [Symbol.asyncIterator]: () => ({ - next: () => Promise.resolve({ value: 1, done: false }), - return: () => { - returned = true; - return Promise.resolve({ value: undefined, done: true }); - }, - }), - }; - - const cancellableAsyncIterable = - promiseCanceller.cancellableIterable(asyncIterable); - - abortController.abort(new Error('Cancelled!')); - - expect(returned).to.equal(false); - - const nextPromise = - cancellableAsyncIterable[Symbol.asyncIterator]().next(); - - expect(returned).to.equal(true); - - await expectPromise(nextPromise).toRejectWith('Cancelled!'); - }); - - it('works to call return when already aborted', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - abortController.abort(new Error('Cancelled!')); - - const promiseCanceller = new PromiseCanceller(abortSignal); - - let returned = false; - const asyncIterable = { - [Symbol.asyncIterator]: () => ({ - next: () => Promise.resolve({ value: 1, done: false }), - return: () => { - returned = true; - return Promise.resolve({ value: undefined, done: true }); - }, - }), - }; - - const cancellableAsyncIterable = - promiseCanceller.cancellableIterable(asyncIterable); - - expect(returned).to.equal(false); - - const nextPromise = - cancellableAsyncIterable[Symbol.asyncIterator]().next(); - - expect(returned).to.equal(true); - - await expectPromise(nextPromise).toRejectWith('Cancelled!'); - }); }); }); diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index e07b1e8c5b..3c2f41553f 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -795,7 +795,7 @@ describe('Execute: Cancellation', () => { }); }); - it('should stop the execution when aborted during subscription', async () => { + it('should stop the execution when aborted prior to return of a subscription resolver', async () => { const abortController = new AbortController(); const document = parse(` subscription { @@ -830,6 +830,44 @@ describe('Execute: Cancellation', () => { }); }); + it('should successfully wrap the subscription', async () => { + const abortController = new AbortController(); + const document = parse(` + subscription { + foo + } + `); + + async function* foo() { + yield await Promise.resolve({ foo: 'foo' }); + } + + const subscription = await subscribe({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + foo: Promise.resolve(foo()), + }, + }); + + assert(isAsyncIterable(subscription)); + + expectJSON(await subscription.next()).toDeepEqual({ + value: { + data: { + foo: 'foo', + }, + }, + done: false, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('should stop the execution when aborted during subscription', async () => { const abortController = new AbortController(); const document = parse(` @@ -838,15 +876,16 @@ describe('Execute: Cancellation', () => { } `); + async function* foo() { + yield await Promise.resolve({ foo: 'foo' }); + } + const subscription = subscribe({ document, schema, abortSignal: abortController.signal, rootValue: { - async *foo() { - yield await Promise.resolve({ foo: 'foo' }); - yield await Promise.resolve({ foo: 'foo' }); /* c8 ignore start */ - } /* c8 ignore stop */, + foo: foo(), }, }); diff --git a/src/execution/__tests__/mapAsyncIterable-test.ts b/src/execution/__tests__/mapAsyncIterable-test.ts index dee53aa486..599e15f05e 100644 --- a/src/execution/__tests__/mapAsyncIterable-test.ts +++ b/src/execution/__tests__/mapAsyncIterable-test.ts @@ -89,6 +89,54 @@ describe('mapAsyncIterable', () => { }); }); + it('calls done when completes', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + let done = false; + const doubles = mapAsyncIterable( + source(), + (x) => Promise.resolve(x + x), + () => { + done = true; + }, + ); + + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 4, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 6, done: false }); + expect(done).to.equal(false); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + expect(done).to.equal(true); + }); + + it('calls done when completes with error', async () => { + async function* source() { + yield 1; + throw new Error('Oops'); + } + + let done = false; + const doubles = mapAsyncIterable( + source(), + (x) => Promise.resolve(x + x), + () => { + done = true; + }, + ); + + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(done).to.equal(false); + await expectPromise(doubles.next()).toRejectWith('Oops'); + expect(done).to.equal(true); + }); + it('allows returning early from mapped async generator', async () => { async function* source() { try { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 959be4cd0a..cdd5bf4c6a 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2099,6 +2099,13 @@ export function subscribe( return mapSourceToResponse(validatedExecutionArgs, resultOrStream); } +/** + * + * For each payload yielded from a subscription, map it over the normal + * GraphQL `execute` function, with `payload` as the rootValue. + * This implements the "MapSourceToResponseEvent" algorithm described in + * the GraphQL specification.. + */ function mapSourceToResponse( validatedExecutionArgs: ValidatedExecutionArgs, resultOrStream: ExecutionResult | AsyncIterable, @@ -2107,10 +2114,22 @@ function mapSourceToResponse( return resultOrStream; } - // For each payload yielded from a subscription, map it over the normal - // GraphQL `execute` function, with `payload` as the rootValue. - // This implements the "MapSourceToResponseEvent" algorithm described in - // the GraphQL specification.. + const abortSignal = validatedExecutionArgs.abortSignal; + if (abortSignal) { + const promiseCanceller = new PromiseCanceller(abortSignal); + return mapAsyncIterable( + promiseCanceller?.cancellableIterable(resultOrStream), + (payload: unknown) => { + const perEventExecutionArgs: ValidatedExecutionArgs = { + ...validatedExecutionArgs, + rootValue: payload, + }; + return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); + }, + () => promiseCanceller.disconnect(), + ); + } + return mapAsyncIterable(resultOrStream, (payload: unknown) => { const perEventExecutionArgs: ValidatedExecutionArgs = { ...validatedExecutionArgs, @@ -2265,16 +2284,16 @@ function executeSubscription( // used to represent an authenticated user, or request-specific caches. const result = resolveFn(rootValue, args, contextValue, info, abortSignal); - const promiseCanceller = abortSignal - ? new PromiseCanceller(abortSignal) - : undefined; - if (isPromise(result)) { + const promiseCanceller = abortSignal + ? new PromiseCanceller(abortSignal) + : undefined; + const promise = promiseCanceller?.cancellablePromise(result) ?? result; return promise.then(assertEventStream).then( (resolved) => { promiseCanceller?.disconnect(); - return promiseCanceller?.cancellableIterable(resolved) ?? resolved; + return resolved; }, (error: unknown) => { promiseCanceller?.disconnect(); @@ -2284,7 +2303,7 @@ function executeSubscription( } const eventStream = assertEventStream(result); - return promiseCanceller?.cancellableIterable(eventStream) ?? eventStream; + return eventStream; } catch (error) { throw locatedError(error, fieldNodes, pathToArray(path)); } diff --git a/src/execution/mapAsyncIterable.ts b/src/execution/mapAsyncIterable.ts index 0f6fd78c2d..e0f942fd53 100644 --- a/src/execution/mapAsyncIterable.ts +++ b/src/execution/mapAsyncIterable.ts @@ -7,18 +7,28 @@ import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; export function mapAsyncIterable( iterable: AsyncGenerator | AsyncIterable, callback: (value: T) => PromiseOrValue, + onDone?: (() => void) | undefined, ): AsyncGenerator { const iterator = iterable[Symbol.asyncIterator](); async function mapResult( - result: IteratorResult, + promise: Promise>, ): Promise> { - if (result.done) { - return result; + let value: T; + try { + const result = await promise; + if (result.done) { + onDone?.(); + return result; + } + value = result.value; + } catch (error) { + onDone?.(); + throw error; } try { - return { value: await callback(result.value), done: false }; + return { value: await callback(value), done: false }; } catch (error) { /* c8 ignore start */ // FIXME: add test case @@ -36,17 +46,17 @@ export function mapAsyncIterable( return { async next() { - return mapResult(await iterator.next()); + return mapResult(iterator.next()); }, async return(): Promise> { // If iterator.return() does not exist, then type R must be undefined. return typeof iterator.return === 'function' - ? mapResult(await iterator.return()) + ? mapResult(iterator.return()) : { value: undefined as any, done: true }; }, async throw(error?: unknown) { if (typeof iterator.throw === 'function') { - return mapResult(await iterator.throw(error)); + return mapResult(iterator.throw(error)); } throw error; }, From 7187e327d8066f0503c0c9689a6703466b446f78 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 4 Nov 2024 14:05:13 +0200 Subject: [PATCH 5/8] simplify abort function to arrow function --- src/execution/PromiseCanceller.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts index 3bde6df85d..16b438e17f 100644 --- a/src/execution/PromiseCanceller.ts +++ b/src/execution/PromiseCanceller.ts @@ -35,9 +35,7 @@ export class PromiseCanceller { } const { promise, resolve, reject } = promiseWithResolvers(); - const abort = () => { - reject(this.abortSignal.reason); - }; + const abort = () => reject(this.abortSignal.reason); this._aborts.add(abort); originalPromise.then( (resolved) => { From 80a60f4a8b96f36ac906690c55269f3229251bbe Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 4 Nov 2024 14:14:12 +0200 Subject: [PATCH 6/8] consolidate --- src/execution/execute.ts | 46 ++++++++++++++++------------------------ 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index cdd5bf4c6a..2b7c96893e 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2099,13 +2099,6 @@ export function subscribe( return mapSourceToResponse(validatedExecutionArgs, resultOrStream); } -/** - * - * For each payload yielded from a subscription, map it over the normal - * GraphQL `execute` function, with `payload` as the rootValue. - * This implements the "MapSourceToResponseEvent" algorithm described in - * the GraphQL specification.. - */ function mapSourceToResponse( validatedExecutionArgs: ValidatedExecutionArgs, resultOrStream: ExecutionResult | AsyncIterable, @@ -2115,28 +2108,25 @@ function mapSourceToResponse( } const abortSignal = validatedExecutionArgs.abortSignal; - if (abortSignal) { - const promiseCanceller = new PromiseCanceller(abortSignal); - return mapAsyncIterable( - promiseCanceller?.cancellableIterable(resultOrStream), - (payload: unknown) => { - const perEventExecutionArgs: ValidatedExecutionArgs = { - ...validatedExecutionArgs, - rootValue: payload, - }; - return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); - }, - () => promiseCanceller.disconnect(), - ); - } - return mapAsyncIterable(resultOrStream, (payload: unknown) => { - const perEventExecutionArgs: ValidatedExecutionArgs = { - ...validatedExecutionArgs, - rootValue: payload, - }; - return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); - }); + const promiseCanceller = abortSignal + ? new PromiseCanceller(abortSignal) + : undefined; + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + // This implements the "MapSourceToResponseEvent" algorithm described in + // the GraphQL specification.. + return mapAsyncIterable( + promiseCanceller?.cancellableIterable(resultOrStream) ?? resultOrStream, + (payload: unknown) => { + const perEventExecutionArgs: ValidatedExecutionArgs = { + ...validatedExecutionArgs, + rootValue: payload, + }; + return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); + }, + () => promiseCanceller?.disconnect(), + ); } export function executeSubscriptionEvent( From 29f24409db2a3c43a4c4b2cc400ebbaf154aa326 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 4 Nov 2024 14:17:04 +0200 Subject: [PATCH 7/8] prettify some whitespace --- src/execution/execute.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 2b7c96893e..9b22fd1cfa 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2108,10 +2108,10 @@ function mapSourceToResponse( } const abortSignal = validatedExecutionArgs.abortSignal; - const promiseCanceller = abortSignal ? new PromiseCanceller(abortSignal) : undefined; + // For each payload yielded from a subscription, map it over the normal // GraphQL `execute` function, with `payload` as the rootValue. // This implements the "MapSourceToResponseEvent" algorithm described in From fe08f46f5963c6b004a23878e0b653e8d83f9c13 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 4 Nov 2024 14:21:24 +0200 Subject: [PATCH 8/8] simplify return --- src/execution/execute.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 9b22fd1cfa..d847835d98 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2292,8 +2292,7 @@ function executeSubscription( ); } - const eventStream = assertEventStream(result); - return eventStream; + return assertEventStream(result); } catch (error) { throw locatedError(error, fieldNodes, pathToArray(path)); }