diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 7469a36fa5..142e992705 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -28,7 +28,7 @@ export class IncrementalGraph { private _completedQueue: Array; private _nextQueue: Array< - (iterable: IteratorResult>) => void + (iterable: Iterable | undefined) => void >; constructor() { @@ -70,37 +70,32 @@ export class IncrementalGraph { } } - completedIncrementalData() { - return { - [Symbol.asyncIterator]() { - return this; - }, - next: (): Promise< - IteratorResult> - > => { - const firstResult = this._completedQueue.shift(); - if (firstResult !== undefined) { - return Promise.resolve({ - value: this._yieldCurrentCompletedIncrementalData(firstResult), - done: false, - }); - } - const { promise, resolve } = - promiseWithResolvers< - IteratorResult> - >(); - this._nextQueue.push(resolve); - return promise; - }, - return: (): Promise< - IteratorResult> - > => { - for (const resolve of this._nextQueue) { - resolve({ value: undefined, done: true }); - } - return Promise.resolve({ value: undefined, done: true }); - }, - }; + *currentCompletedBatch(): Generator { + let completed; + while ((completed = this._completedQueue.shift()) !== undefined) { + yield completed; + } + if (this._rootNodes.size === 0) { + for (const resolve of this._nextQueue) { + resolve(undefined); + } + } + } + + nextCompletedBatch(): Promise< + Iterable | undefined + > { + const { promise, resolve } = promiseWithResolvers< + Iterable | undefined + >(); + this._nextQueue.push(resolve); + return promise; + } + + abort(): void { + for (const resolve of this._nextQueue) { + resolve(undefined); + } } hasNext(): boolean { @@ -157,11 +152,6 @@ export class IncrementalGraph { subsequentResultRecord: SubsequentResultRecord, ): void { this._rootNodes.delete(subsequentResultRecord); - if (this._rootNodes.size === 0) { - for (const resolve of this._nextQueue) { - resolve({ value: undefined, done: true }); - } - } } private _addIncrementalDataRecords( @@ -332,19 +322,13 @@ export class IncrementalGraph { first: IncrementalDataRecordResult, ): Generator { yield first; - let completed; - while ((completed = this._completedQueue.shift()) !== undefined) { - yield completed; - } + yield* this.currentCompletedBatch(); } private _enqueue(completed: IncrementalDataRecordResult): void { const next = this._nextQueue.shift(); if (next !== undefined) { - next({ - value: this._yieldCurrentCompletedIncrementalData(completed), - done: false, - }); + next(this._yieldCurrentCompletedIncrementalData(completed)); return; } this._completedQueue.push(completed); diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index a625b0e098..dd27033ed8 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -135,14 +135,10 @@ class IncrementalPublisher { completed: [], }; - const completedIncrementalData = - this._incrementalGraph.completedIncrementalData(); - // use the raw iterator rather than 'for await ... of' so as not to trigger the - // '.return()' method on the iterator when exiting the loop with the next value - const asyncIterator = completedIncrementalData[Symbol.asyncIterator](); - let iteration = await asyncIterator.next(); - while (!iteration.done) { - for (const completedResult of iteration.value) { + let batch: Iterable | undefined = + this._incrementalGraph.currentCompletedBatch(); + do { + for (const completedResult of batch) { this._handleCompletedIncrementalData(completedResult, context); } @@ -151,7 +147,6 @@ class IncrementalPublisher { const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { - // eslint-disable-next-line require-atomic-updates isDone = true; } @@ -173,8 +168,8 @@ class IncrementalPublisher { } // eslint-disable-next-line no-await-in-loop - iteration = await asyncIterator.next(); - } + batch = await this._incrementalGraph.nextCompletedBatch(); + } while (batch !== undefined); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; @@ -184,6 +179,7 @@ class IncrementalPublisher { IteratorResult > => { isDone = true; + this._incrementalGraph.abort(); await this._returnAsyncIterators(); return { value: undefined, done: true }; }; @@ -192,6 +188,7 @@ class IncrementalPublisher { error?: unknown, ): Promise> => { isDone = true; + this._incrementalGraph.abort(); await this._returnAsyncIterators(); return Promise.reject(error); }; @@ -363,8 +360,6 @@ class IncrementalPublisher { } private async _returnAsyncIterators(): Promise { - await this._incrementalGraph.completedIncrementalData().return(); - const cancellableStreams = this._context.cancellableStreams; if (cancellableStreams === undefined) { return; diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 15ad4028a5..49c8e064fe 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -2331,16 +2331,12 @@ describe('Execute: stream directive', () => { }); it('Returns underlying async iterables when returned generator is returned', async () => { let returned = false; - let index = 0; const iterable = { [Symbol.asyncIterator]: () => ({ - next: () => { - const friend = friends[index++]; - if (friend == null) { - return Promise.resolve({ done: true, value: undefined }); - } - return Promise.resolve({ done: false, value: friend }); - }, + next: () => + new Promise(() => { + /* never resolves */ + }), return: () => { returned = true; }, @@ -2349,11 +2345,8 @@ describe('Execute: stream directive', () => { const document = parse(` query { - friendList @stream(initialCount: 1) { + friendList @stream(initialCount: 0) { id - ... @defer { - name - } } } `); @@ -2371,21 +2364,16 @@ describe('Execute: stream directive', () => { const result1 = executeResult.initialResult; expectJSON(result1).toDeepEqual({ data: { - friendList: [ - { - id: '1', - }, - ], + friendList: [], }, - pending: [ - { id: '0', path: ['friendList', 0] }, - { id: '1', path: ['friendList'] }, - ], + pending: [{ id: '0', path: ['friendList'] }], hasNext: true, }); + + const result2Promise = iterator.next(); const returnPromise = iterator.return(); - const result2 = await iterator.next(); + const result2 = await result2Promise; expectJSON(result2).toDeepEqual({ done: true, value: undefined,