From b8c5c0e36fd0658fcbca6faefcac423dd4879da4 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 31 May 2024 16:38:35 +0300 Subject: [PATCH 01/13] refactor: extract incremental graph to separate file --- src/execution/IncrementalGraph.ts | 213 ++++++++++++++++++++++++++ src/execution/IncrementalPublisher.ts | 205 +++---------------------- src/execution/types.ts | 2 +- 3 files changed, 239 insertions(+), 181 deletions(-) create mode 100644 src/execution/IncrementalGraph.ts diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts new file mode 100644 index 0000000000..b998af5dda --- /dev/null +++ b/src/execution/IncrementalGraph.ts @@ -0,0 +1,213 @@ +import { isPromise } from '../jsutils/isPromise.js'; +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +import type { + DeferredFragmentRecord, + DeferredGroupedFieldSetResult, + IncrementalDataRecord, + IncrementalDataRecordResult, + ReconcilableDeferredGroupedFieldSetResult, + StreamItemsResult, + SubsequentResultRecord, +} from './types.js'; +import { + isDeferredFragmentRecord, + isDeferredGroupedFieldSetRecord, +} from './types.js'; + +/** + * @internal + */ +export class IncrementalGraph { + // these are assigned within the Promise executor called synchronously within the constructor + newCompletedResultAvailable!: Promise; + private _resolve!: () => void; + + private _pending: Set; + private _newPending: Set; + private _completedResultQueue: Array; + + constructor() { + this._pending = new Set(); + this._newPending = new Set(); + this._completedResultQueue = []; + this._reset(); + } + + addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + ): void { + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { + deferredFragmentRecord.expectedReconcilableResults++; + + this._addDeferredFragmentRecord(deferredFragmentRecord); + } + + const result = incrementalDataRecord.result; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => { + this._enqueueCompletedDeferredGroupedFieldSet(resolved); + }); + } else { + this._enqueueCompletedDeferredGroupedFieldSet(result); + } + + continue; + } + + const streamRecord = incrementalDataRecord.streamRecord; + if (streamRecord.id === undefined) { + this._newPending.add(streamRecord); + } + + const result = incrementalDataRecord.result; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => { + this._enqueueCompletedStreamItems(resolved); + }); + } else { + this._enqueueCompletedStreamItems(result); + } + } + } + + getNewPending(): ReadonlyArray { + const maybeEmptyNewPending = this._newPending; + const newPending = []; + for (const node of maybeEmptyNewPending) { + if (isDeferredFragmentRecord(node)) { + if (node.expectedReconcilableResults) { + this._pending.add(node); + newPending.push(node); + continue; + } + for (const child of node.children) { + this._addNonEmptyNewPending(child, newPending); + } + } else { + this._pending.add(node); + newPending.push(node); + } + } + this._newPending.clear(); + return newPending; + } + + *completedResults(): Generator { + let completedResult: IncrementalDataRecordResult | undefined; + while ( + (completedResult = this._completedResultQueue.shift()) !== undefined + ) { + yield completedResult; + } + } + + hasNext(): boolean { + return this._pending.size > 0; + } + + completeDeferredFragment( + deferredFragmentRecord: DeferredFragmentRecord, + ): Array | undefined { + const reconcilableResults = deferredFragmentRecord.reconcilableResults; + if ( + deferredFragmentRecord.expectedReconcilableResults !== + reconcilableResults.length + ) { + return; + } + this._pending.delete(deferredFragmentRecord); + for (const child of deferredFragmentRecord.children) { + this._newPending.add(child); + this._completedResultQueue.push(...child.results); + } + return reconcilableResults; + } + + removeSubsequentResultRecord( + subsequentResultRecord: SubsequentResultRecord, + ): void { + this._pending.delete(subsequentResultRecord); + } + + private _addDeferredFragmentRecord( + deferredFragmentRecord: DeferredFragmentRecord, + ): void { + const parent = deferredFragmentRecord.parent; + if (parent === undefined) { + // Below is equivalent and slightly faster version of: + // if (this._pending.has(deferredFragmentRecord)) { ... } + // as all released deferredFragmentRecords have ids. + if (deferredFragmentRecord.id !== undefined) { + return; + } + + this._newPending.add(deferredFragmentRecord); + return; + } + + if (parent.children.has(deferredFragmentRecord)) { + return; + } + + parent.children.add(deferredFragmentRecord); + + this._addDeferredFragmentRecord(parent); + } + + private _addNonEmptyNewPending( + deferredFragmentRecord: DeferredFragmentRecord, + newPending: Array, + ): void { + if (deferredFragmentRecord.expectedReconcilableResults) { + this._pending.add(deferredFragmentRecord); + newPending.push(deferredFragmentRecord); + return; + } + /* c8 ignore next 5 */ + // TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children. + for (const child of deferredFragmentRecord.children) { + this._addNonEmptyNewPending(child, newPending); + } + } + + private _enqueueCompletedDeferredGroupedFieldSet( + result: DeferredGroupedFieldSetResult, + ): void { + let hasPendingParent = false; + for (const deferredFragmentRecord of result.deferredFragmentRecords) { + if (deferredFragmentRecord.id !== undefined) { + hasPendingParent = true; + } + deferredFragmentRecord.results.push(result); + } + if (hasPendingParent) { + this._completedResultQueue.push(result); + this._trigger(); + } + } + + private _enqueueCompletedStreamItems(result: StreamItemsResult): void { + this._completedResultQueue.push(result); + this._trigger(); + } + + private _trigger() { + this._resolve(); + this._reset(); + } + + private _reset() { + const { promise: newCompletedResultAvailable, resolve } = + // promiseWithResolvers uses void only as a generic type parameter + // see: https://typescript-eslint.io/rules/no-invalid-void-type/ + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + promiseWithResolvers(); + this._resolve = resolve; + this.newCompletedResultAvailable = newCompletedResultAvailable; + } +} diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 0504238eae..92a3f199c3 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,11 +1,10 @@ import { invariant } from '../jsutils/invariant.js'; -import { isPromise } from '../jsutils/isPromise.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import { pathToArray } from '../jsutils/Path.js'; -import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { GraphQLError } from '../error/GraphQLError.js'; +import { IncrementalGraph } from './IncrementalGraph.js'; import type { CancellableStreamRecord, CompletedResult, @@ -13,7 +12,6 @@ import type { DeferredGroupedFieldSetResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, - IncrementalDataRecordResult, IncrementalDeferResult, IncrementalResult, IncrementalStreamResult, @@ -25,8 +23,6 @@ import type { } from './types.js'; import { isCancellableStreamRecord, - isDeferredFragmentRecord, - isDeferredGroupedFieldSetRecord, isDeferredGroupedFieldSetResult, isNonReconcilableDeferredGroupedFieldSetResult, } from './types.js'; @@ -58,24 +54,16 @@ interface IncrementalPublisherContext { class IncrementalPublisher { private _context: IncrementalPublisherContext; private _nextId: number; - private _pending: Set; - private _completedResultQueue: Array; - private _newPending: Set; + private _incrementalGraph: IncrementalGraph; private _incremental: Array; private _completed: Array; - // these are assigned within the Promise executor called synchronously within the constructor - private _signalled!: Promise; - private _resolve!: () => void; constructor(context: IncrementalPublisherContext) { this._context = context; this._nextId = 0; - this._pending = new Set(); - this._completedResultQueue = []; - this._newPending = new Set(); + this._incrementalGraph = new IncrementalGraph(); this._incremental = []; this._completed = []; - this._reset(); } buildResponse( @@ -83,10 +71,10 @@ class IncrementalPublisher { errors: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { - this._addIncrementalDataRecords(incrementalDataRecords); - this._pruneEmpty(); + this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); + const newPending = this._incrementalGraph.getNewPending(); - const pending = this._pendingSourcesToResults(); + const pending = this._pendingSourcesToResults(newPending); const initialResult: InitialIncrementalExecutionResult = errors === undefined @@ -99,130 +87,12 @@ class IncrementalPublisher { }; } - private _addIncrementalDataRecords( - incrementalDataRecords: ReadonlyArray, - ): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { - deferredFragmentRecord.expectedReconcilableResults++; - - this._addDeferredFragmentRecord(deferredFragmentRecord); - } - - const result = incrementalDataRecord.result; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => { - this._enqueueCompletedDeferredGroupedFieldSet(resolved); - }); - } else { - this._enqueueCompletedDeferredGroupedFieldSet(result); - } - - continue; - } - - const streamRecord = incrementalDataRecord.streamRecord; - if (streamRecord.id === undefined) { - this._newPending.add(streamRecord); - } - - const result = incrementalDataRecord.result; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => { - this._enqueueCompletedStreamItems(resolved); - }); - } else { - this._enqueueCompletedStreamItems(result); - } - } - } - - private _addDeferredFragmentRecord( - deferredFragmentRecord: DeferredFragmentRecord, - ): void { - const parent = deferredFragmentRecord.parent; - if (parent === undefined) { - // Below is equivalent and slightly faster version of: - // if (this._pending.has(deferredFragmentRecord)) { ... } - // as all released deferredFragmentRecords have ids. - if (deferredFragmentRecord.id !== undefined) { - return; - } - - this._newPending.add(deferredFragmentRecord); - return; - } - - if (parent.children.has(deferredFragmentRecord)) { - return; - } - - parent.children.add(deferredFragmentRecord); - - this._addDeferredFragmentRecord(parent); - } - - private _pruneEmpty() { - const maybeEmptyNewPending = this._newPending; - this._newPending = new Set(); - for (const node of maybeEmptyNewPending) { - if (isDeferredFragmentRecord(node)) { - if (node.expectedReconcilableResults) { - this._newPending.add(node); - continue; - } - for (const child of node.children) { - this._addNonEmptyNewPending(child); - } - } else { - this._newPending.add(node); - } - } - } - - private _addNonEmptyNewPending( - deferredFragmentRecord: DeferredFragmentRecord, - ): void { - if (deferredFragmentRecord.expectedReconcilableResults) { - this._newPending.add(deferredFragmentRecord); - return; - } - /* c8 ignore next 5 */ - // TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children. - for (const child of deferredFragmentRecord.children) { - this._addNonEmptyNewPending(child); - } - } - - private _enqueueCompletedDeferredGroupedFieldSet( - result: DeferredGroupedFieldSetResult, - ): void { - let hasPendingParent = false; - for (const deferredFragmentRecord of result.deferredFragmentRecords) { - if (deferredFragmentRecord.id !== undefined) { - hasPendingParent = true; - } - deferredFragmentRecord.results.push(result); - } - if (hasPendingParent) { - this._completedResultQueue.push(result); - this._trigger(); - } - } - - private _enqueueCompletedStreamItems(result: StreamItemsResult): void { - this._completedResultQueue.push(result); - this._trigger(); - } - - private _pendingSourcesToResults(): Array { + private _pendingSourcesToResults( + newPending: ReadonlyArray, + ): Array { const pendingResults: Array = []; - for (const pendingSource of this._newPending) { + for (const pendingSource of newPending) { const id = String(this._getNextId()); - this._pending.add(pendingSource); pendingSource.id = id; const pendingResult: PendingResult = { id, @@ -233,7 +103,6 @@ class IncrementalPublisher { } pendingResults.push(pendingResult); } - this._newPending.clear(); return pendingResults; } @@ -254,21 +123,19 @@ class IncrementalPublisher { while (!isDone) { let pending: Array = []; - let completedResult: IncrementalDataRecordResult | undefined; - while ( - (completedResult = this._completedResultQueue.shift()) !== undefined - ) { + for (const completedResult of this._incrementalGraph.completedResults()) { if (isDeferredGroupedFieldSetResult(completedResult)) { this._handleCompletedDeferredGroupedFieldSet(completedResult); } else { this._handleCompletedStreamItems(completedResult); } - pending = [...pending, ...this._pendingSourcesToResults()]; + const newPending = this._incrementalGraph.getNewPending(); + pending = [...pending, ...this._pendingSourcesToResults(newPending)]; } if (this._incremental.length > 0 || this._completed.length > 0) { - const hasNext = this._pending.size > 0; + const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { isDone = true; @@ -295,7 +162,7 @@ class IncrementalPublisher { } // eslint-disable-next-line no-await-in-loop - await this._signalled; + await this._incrementalGraph.newCompletedResultAvailable; } await returnStreamIterators().catch(() => { @@ -345,20 +212,6 @@ class IncrementalPublisher { }; } - private _trigger() { - this._resolve(); - this._reset(); - } - - private _reset() { - // promiseWithResolvers uses void only as a generic type parameter - // see: https://typescript-eslint.io/rules/no-invalid-void-type/ - // eslint-disable-next-line @typescript-eslint/no-invalid-void-type - const { promise: signalled, resolve } = promiseWithResolvers(); - this._resolve = resolve; - this._signalled = signalled; - } - private _handleCompletedDeferredGroupedFieldSet( deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, ): void { @@ -374,7 +227,9 @@ class IncrementalPublisher { id, errors: deferredGroupedFieldSetResult.errors, }); - this._pending.delete(deferredFragmentRecord); + this._incrementalGraph.removeSubsequentResultRecord( + deferredFragmentRecord, + ); } } return; @@ -388,7 +243,7 @@ class IncrementalPublisher { const incrementalDataRecords = deferredGroupedFieldSetResult.incrementalDataRecords; if (incrementalDataRecords !== undefined) { - this._addIncrementalDataRecords(incrementalDataRecords); + this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); } for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { @@ -400,11 +255,9 @@ class IncrementalPublisher { if (id === undefined) { continue; } - const reconcilableResults = deferredFragmentRecord.reconcilableResults; - if ( - deferredFragmentRecord.expectedReconcilableResults !== - reconcilableResults.length - ) { + const reconcilableResults = + this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); + if (reconcilableResults === undefined) { continue; } for (const reconcilableResult of reconcilableResults) { @@ -427,14 +280,7 @@ class IncrementalPublisher { this._incremental.push(incrementalEntry); } this._completed.push({ id }); - this._pending.delete(deferredFragmentRecord); - for (const child of deferredFragmentRecord.children) { - this._newPending.add(child); - this._completedResultQueue.push(...child.results); - } } - - this._pruneEmpty(); } private _handleCompletedStreamItems( @@ -453,7 +299,7 @@ class IncrementalPublisher { id, errors: streamItemsResult.errors, }); - this._pending.delete(streamRecord); + this._incrementalGraph.removeSubsequentResultRecord(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); this._context.cancellableStreams.delete(streamRecord); @@ -464,7 +310,7 @@ class IncrementalPublisher { } } else if (streamItemsResult.result === undefined) { this._completed.push({ id }); - this._pending.delete(streamRecord); + this._incrementalGraph.removeSubsequentResultRecord(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); this._context.cancellableStreams.delete(streamRecord); @@ -478,10 +324,9 @@ class IncrementalPublisher { this._incremental.push(incrementalEntry); if (streamItemsResult.incrementalDataRecords !== undefined) { - this._addIncrementalDataRecords( + this._incrementalGraph.addIncrementalDataRecords( streamItemsResult.incrementalDataRecords, ); - this._pruneEmpty(); } } } diff --git a/src/execution/types.ts b/src/execution/types.ts index d2fd84827b..519d08ea46 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -188,7 +188,7 @@ export function isDeferredGroupedFieldSetResult( return 'deferredFragmentRecords' in subsequentResult; } -interface ReconcilableDeferredGroupedFieldSetResult { +export interface ReconcilableDeferredGroupedFieldSetResult { deferredFragmentRecords: ReadonlyArray; path: Array; result: BareDeferredGroupedFieldSetResult; From 44da0380a4a5ac54bc789c47f6e803ff554197cd Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 14:40:05 +0300 Subject: [PATCH 02/13] refactor: move returnStreamIterators to method --- src/execution/IncrementalPublisher.ts | 34 +++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 92a3f199c3..8257bb2a8a 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -165,32 +165,18 @@ class IncrementalPublisher { await this._incrementalGraph.newCompletedResultAvailable; } - await returnStreamIterators().catch(() => { + await this._returnStreamIterators().catch(() => { // ignore errors }); return { value: undefined, done: true }; }; - const returnStreamIterators = async (): Promise => { - const cancellableStreams = this._context.cancellableStreams; - if (cancellableStreams === undefined) { - return; - } - const promises: Array> = []; - for (const streamRecord of cancellableStreams) { - if (streamRecord.earlyReturn !== undefined) { - promises.push(streamRecord.earlyReturn()); - } - } - await Promise.all(promises); - }; - const _return = async (): Promise< IteratorResult > => { isDone = true; - await returnStreamIterators(); + await this._returnStreamIterators(); return { value: undefined, done: true }; }; @@ -198,7 +184,7 @@ class IncrementalPublisher { error?: unknown, ): Promise> => { isDone = true; - await returnStreamIterators(); + await this._returnStreamIterators(); return Promise.reject(error); }; @@ -362,4 +348,18 @@ class IncrementalPublisher { subPath: subPath.length > 0 ? subPath : undefined, }; } + + private async _returnStreamIterators(): Promise { + const cancellableStreams = this._context.cancellableStreams; + if (cancellableStreams === undefined) { + return; + } + const promises: Array> = []; + for (const streamRecord of cancellableStreams) { + if (streamRecord.earlyReturn !== undefined) { + promises.push(streamRecord.earlyReturn()); + } + } + await Promise.all(promises); + } } From 8bd2cfae1de678b933a12d5c84a931c20f940532 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 15:00:10 +0300 Subject: [PATCH 03/13] refactor: use asyncIterator instead of extra promise --- src/execution/IncrementalGraph.ts | 104 +++++++++++++++++--------- src/execution/IncrementalPublisher.ts | 37 ++++++--- 2 files changed, 94 insertions(+), 47 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index b998af5dda..0d403ea5e7 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -7,7 +7,6 @@ import type { IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, - StreamItemsResult, SubsequentResultRecord, } from './types.js'; import { @@ -19,19 +18,18 @@ import { * @internal */ export class IncrementalGraph { - // these are assigned within the Promise executor called synchronously within the constructor - newCompletedResultAvailable!: Promise; - private _resolve!: () => void; - private _pending: Set; private _newPending: Set; - private _completedResultQueue: Array; + private _completedQueue: Array; + private _nextQueue: Array< + (iterable: IteratorResult>) => void + >; constructor() { this._pending = new Set(); this._newPending = new Set(); - this._completedResultQueue = []; - this._reset(); + this._completedQueue = []; + this._nextQueue = []; } addIncrementalDataRecords( @@ -67,10 +65,10 @@ export class IncrementalGraph { if (isPromise(result)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises result.then((resolved) => { - this._enqueueCompletedStreamItems(resolved); + this._enqueue(resolved); }); } else { - this._enqueueCompletedStreamItems(result); + this._enqueue(result); } } } @@ -97,13 +95,37 @@ export class IncrementalGraph { return newPending; } - *completedResults(): Generator { - let completedResult: IncrementalDataRecordResult | undefined; - while ( - (completedResult = this._completedResultQueue.shift()) !== undefined - ) { - yield completedResult; - } + completedIncrementalData() { + return { + [Symbol.asyncIterator]() { + return this; + }, + next: (): Promise< + IteratorResult> + > => { + const firstResult = this._completedQueue.shift(); + if (firstResult !== undefined) { + return Promise.resolve({ + value: this._yieldCurrentCompletedIncrementalData(firstResult), + done: false, + }); + } + const { promise, resolve } = + promiseWithResolvers< + IteratorResult> + >(); + this._nextQueue.push(resolve); + return promise; + }, + return: (): Promise< + IteratorResult> + > => { + for (const resolve of this._nextQueue) { + resolve({ value: undefined, done: true }); + } + return Promise.resolve({ value: undefined, done: true }); + }, + }; } hasNext(): boolean { @@ -120,10 +142,12 @@ export class IncrementalGraph { ) { return; } - this._pending.delete(deferredFragmentRecord); + this.removeSubsequentResultRecord(deferredFragmentRecord); for (const child of deferredFragmentRecord.children) { this._newPending.add(child); - this._completedResultQueue.push(...child.results); + for (const result of child.results) { + this._enqueue(result); + } } return reconcilableResults; } @@ -132,6 +156,11 @@ export class IncrementalGraph { subsequentResultRecord: SubsequentResultRecord, ): void { this._pending.delete(subsequentResultRecord); + if (this._pending.size === 0) { + for (const resolve of this._nextQueue) { + resolve({ value: undefined, done: true }); + } + } } private _addDeferredFragmentRecord( @@ -186,28 +215,29 @@ export class IncrementalGraph { deferredFragmentRecord.results.push(result); } if (hasPendingParent) { - this._completedResultQueue.push(result); - this._trigger(); + this._enqueue(result); } } - private _enqueueCompletedStreamItems(result: StreamItemsResult): void { - this._completedResultQueue.push(result); - this._trigger(); - } - - private _trigger() { - this._resolve(); - this._reset(); + private *_yieldCurrentCompletedIncrementalData( + first: IncrementalDataRecordResult, + ): Generator { + yield first; + let completed; + while ((completed = this._completedQueue.shift()) !== undefined) { + yield completed; + } } - private _reset() { - const { promise: newCompletedResultAvailable, resolve } = - // promiseWithResolvers uses void only as a generic type parameter - // see: https://typescript-eslint.io/rules/no-invalid-void-type/ - // eslint-disable-next-line @typescript-eslint/no-invalid-void-type - promiseWithResolvers(); - this._resolve = resolve; - this.newCompletedResultAvailable = newCompletedResultAvailable; + private _enqueue(completed: IncrementalDataRecordResult): void { + const next = this._nextQueue.shift(); + if (next !== undefined) { + next({ + value: this._yieldCurrentCompletedIncrementalData(completed), + done: false, + }); + return; + } + this._completedQueue.push(completed); } } diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 8257bb2a8a..89f0c3833e 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -120,10 +120,21 @@ class IncrementalPublisher { const _next = async (): Promise< IteratorResult > => { - while (!isDone) { + if (isDone) { + await this._returnAsyncIteratorsIgnoringErrors(); + return { value: undefined, done: true }; + } + + const completedIncrementalData = + this._incrementalGraph.completedIncrementalData(); + // use the raw iterator rather than 'for await ... of' so as not to trigger the + // '.return()' method on the iterator when exiting the loop with the next value + const asyncIterator = completedIncrementalData[Symbol.asyncIterator](); + let iteration = await asyncIterator.next(); + while (!iteration.done) { let pending: Array = []; - for (const completedResult of this._incrementalGraph.completedResults()) { + for (const completedResult of iteration.value) { if (isDeferredGroupedFieldSetResult(completedResult)) { this._handleCompletedDeferredGroupedFieldSet(completedResult); } else { @@ -138,6 +149,7 @@ class IncrementalPublisher { const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { + // eslint-disable-next-line require-atomic-updates isDone = true; } @@ -162,13 +174,10 @@ class IncrementalPublisher { } // eslint-disable-next-line no-await-in-loop - await this._incrementalGraph.newCompletedResultAvailable; + iteration = await asyncIterator.next(); } - await this._returnStreamIterators().catch(() => { - // ignore errors - }); - + await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; @@ -176,7 +185,7 @@ class IncrementalPublisher { IteratorResult > => { isDone = true; - await this._returnStreamIterators(); + await this._returnAsyncIterators(); return { value: undefined, done: true }; }; @@ -184,7 +193,7 @@ class IncrementalPublisher { error?: unknown, ): Promise> => { isDone = true; - await this._returnStreamIterators(); + await this._returnAsyncIterators(); return Promise.reject(error); }; @@ -349,7 +358,9 @@ class IncrementalPublisher { }; } - private async _returnStreamIterators(): Promise { + private async _returnAsyncIterators(): Promise { + await this._incrementalGraph.completedIncrementalData().return(); + const cancellableStreams = this._context.cancellableStreams; if (cancellableStreams === undefined) { return; @@ -362,4 +373,10 @@ class IncrementalPublisher { } await Promise.all(promises); } + + private async _returnAsyncIteratorsIgnoringErrors(): Promise { + await this._returnAsyncIterators().catch(() => { + // Ignore errors + }); + } } From ffeeb916ea62518f2cdf8e66956d065772daaf5b Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 15:49:23 +0300 Subject: [PATCH 04/13] refactor: convert IncrementalPublisher class members to method args --- src/execution/IncrementalPublisher.ts | 75 ++++++++++++++++----------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 89f0c3833e..5934f3a00b 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -12,6 +12,7 @@ import type { DeferredGroupedFieldSetResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, + IncrementalDataRecordResult, IncrementalDeferResult, IncrementalResult, IncrementalStreamResult, @@ -45,6 +46,12 @@ interface IncrementalPublisherContext { cancellableStreams: Set | undefined; } +interface SubsequentIncrementalExecutionResultContext { + pending: Array; + incremental: Array; + completed: Array; +} + /** * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. @@ -55,15 +62,11 @@ class IncrementalPublisher { private _context: IncrementalPublisherContext; private _nextId: number; private _incrementalGraph: IncrementalGraph; - private _incremental: Array; - private _completed: Array; constructor(context: IncrementalPublisherContext) { this._context = context; this._nextId = 0; this._incrementalGraph = new IncrementalGraph(); - this._incremental = []; - this._completed = []; } buildResponse( @@ -125,6 +128,12 @@ class IncrementalPublisher { return { value: undefined, done: true }; } + const context: SubsequentIncrementalExecutionResultContext = { + pending: [], + incremental: [], + completed: [], + }; + const completedIncrementalData = this._incrementalGraph.completedIncrementalData(); // use the raw iterator rather than 'for await ... of' so as not to trigger the @@ -132,20 +141,12 @@ class IncrementalPublisher { const asyncIterator = completedIncrementalData[Symbol.asyncIterator](); let iteration = await asyncIterator.next(); while (!iteration.done) { - let pending: Array = []; - for (const completedResult of iteration.value) { - if (isDeferredGroupedFieldSetResult(completedResult)) { - this._handleCompletedDeferredGroupedFieldSet(completedResult); - } else { - this._handleCompletedStreamItems(completedResult); - } - - const newPending = this._incrementalGraph.getNewPending(); - pending = [...pending, ...this._pendingSourcesToResults(newPending)]; + this._handleCompletedIncrementalData(completedResult, context); } - if (this._incremental.length > 0 || this._completed.length > 0) { + const { incremental, completed } = context; + if (incremental.length > 0 || completed.length > 0) { const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { @@ -156,20 +157,17 @@ class IncrementalPublisher { const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = { hasNext }; + const pending = context.pending; if (pending.length > 0) { subsequentIncrementalExecutionResult.pending = pending; } - if (this._incremental.length > 0) { - subsequentIncrementalExecutionResult.incremental = - this._incremental; + if (incremental.length > 0) { + subsequentIncrementalExecutionResult.incremental = incremental; } - if (this._completed.length > 0) { - subsequentIncrementalExecutionResult.completed = this._completed; + if (completed.length > 0) { + subsequentIncrementalExecutionResult.completed = completed; } - this._incremental = []; - this._completed = []; - return { value: subsequentIncrementalExecutionResult, done: false }; } @@ -207,8 +205,25 @@ class IncrementalPublisher { }; } + private _handleCompletedIncrementalData( + completedIncrementalData: IncrementalDataRecordResult, + context: SubsequentIncrementalExecutionResultContext, + ): void { + if (isDeferredGroupedFieldSetResult(completedIncrementalData)) { + this._handleCompletedDeferredGroupedFieldSet( + completedIncrementalData, + context, + ); + } else { + this._handleCompletedStreamItems(completedIncrementalData, context); + } + const newPending = this._incrementalGraph.getNewPending(); + context.pending.push(...this._pendingSourcesToResults(newPending)); + } + private _handleCompletedDeferredGroupedFieldSet( deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, + context: SubsequentIncrementalExecutionResultContext, ): void { if ( isNonReconcilableDeferredGroupedFieldSetResult( @@ -218,7 +233,7 @@ class IncrementalPublisher { for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { const id = deferredFragmentRecord.id; if (id !== undefined) { - this._completed.push({ + context.completed.push({ id, errors: deferredGroupedFieldSetResult.errors, }); @@ -255,6 +270,7 @@ class IncrementalPublisher { if (reconcilableResults === undefined) { continue; } + const incremental = context.incremental; for (const reconcilableResult of reconcilableResults) { if (reconcilableResult.sent) { continue; @@ -272,14 +288,15 @@ class IncrementalPublisher { if (subPath !== undefined) { incrementalEntry.subPath = subPath; } - this._incremental.push(incrementalEntry); + incremental.push(incrementalEntry); } - this._completed.push({ id }); + context.completed.push({ id }); } } private _handleCompletedStreamItems( streamItemsResult: StreamItemsResult, + context: SubsequentIncrementalExecutionResultContext, ): void { const streamRecord = streamItemsResult.streamRecord; const id = streamRecord.id; @@ -290,7 +307,7 @@ class IncrementalPublisher { return; } if (streamItemsResult.errors !== undefined) { - this._completed.push({ + context.completed.push({ id, errors: streamItemsResult.errors, }); @@ -304,7 +321,7 @@ class IncrementalPublisher { }); } } else if (streamItemsResult.result === undefined) { - this._completed.push({ id }); + context.completed.push({ id }); this._incrementalGraph.removeSubsequentResultRecord(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); @@ -316,7 +333,7 @@ class IncrementalPublisher { ...streamItemsResult.result, }; - this._incremental.push(incrementalEntry); + context.incremental.push(incrementalEntry); if (streamItemsResult.incrementalDataRecords !== undefined) { this._incrementalGraph.addIncrementalDataRecords( From fb8dea2655e3f17ba0d817b419d2af7e41b3aa12 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 15:55:30 +0300 Subject: [PATCH 05/13] incremental: use invariant for checking id --- src/execution/IncrementalPublisher.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 5934f3a00b..9f233a8cb1 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -300,12 +300,7 @@ class IncrementalPublisher { ): void { const streamRecord = streamItemsResult.streamRecord; const id = streamRecord.id; - // TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list - // for ordering purposes, if an entry errors, additional entries will not be processed. - /* c8 ignore next 3 */ - if (id === undefined) { - return; - } + invariant(id !== undefined); if (streamItemsResult.errors !== undefined) { context.completed.push({ id, From 3c4aea04195fe889a4aad843cae2322b9026113e Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 17:07:11 +0300 Subject: [PATCH 06/13] refactor(IncrementalGraph): remove unnecessary method --- src/execution/IncrementalGraph.ts | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 0d403ea5e7..49ebf4acd6 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -74,9 +74,8 @@ export class IncrementalGraph { } getNewPending(): ReadonlyArray { - const maybeEmptyNewPending = this._newPending; const newPending = []; - for (const node of maybeEmptyNewPending) { + for (const node of this._newPending) { if (isDeferredFragmentRecord(node)) { if (node.expectedReconcilableResults) { this._pending.add(node); @@ -84,7 +83,7 @@ export class IncrementalGraph { continue; } for (const child of node.children) { - this._addNonEmptyNewPending(child, newPending); + this._newPending.add(child); } } else { this._pending.add(node); @@ -188,22 +187,6 @@ export class IncrementalGraph { this._addDeferredFragmentRecord(parent); } - private _addNonEmptyNewPending( - deferredFragmentRecord: DeferredFragmentRecord, - newPending: Array, - ): void { - if (deferredFragmentRecord.expectedReconcilableResults) { - this._pending.add(deferredFragmentRecord); - newPending.push(deferredFragmentRecord); - return; - } - /* c8 ignore next 5 */ - // TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children. - for (const child of deferredFragmentRecord.children) { - this._addNonEmptyNewPending(child, newPending); - } - } - private _enqueueCompletedDeferredGroupedFieldSet( result: DeferredGroupedFieldSetResult, ): void { From 00edd3a69eae9dce882874770ac1ea19c23263de Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 17:37:03 +0300 Subject: [PATCH 07/13] refactor(IncrementalGraph): use Subsequent Result nodes to reduce mutation --- src/execution/IncrementalGraph.ts | 181 ++++++++++++++++++-------- src/execution/IncrementalPublisher.ts | 17 +-- src/execution/execute.ts | 10 +- src/execution/types.ts | 27 +--- 4 files changed, 142 insertions(+), 93 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 49ebf4acd6..7cf722c29d 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -9,17 +9,41 @@ import type { ReconcilableDeferredGroupedFieldSetResult, SubsequentResultRecord, } from './types.js'; -import { - isDeferredFragmentRecord, - isDeferredGroupedFieldSetRecord, -} from './types.js'; +import { isDeferredGroupedFieldSetRecord } from './types.js'; + +interface DeferredFragmentNode { + deferredFragmentRecord: DeferredFragmentRecord; + expectedReconcilableResults: number; + results: Array; + reconcilableResults: Array; + children: Array; +} + +function isDeferredFragmentNode( + node: DeferredFragmentNode | undefined, +): node is DeferredFragmentNode { + return node !== undefined; +} + +function isStreamNode( + subsequentResultNode: SubsequentResultNode, +): subsequentResultNode is SubsequentResultRecord { + return 'path' in subsequentResultNode; +} + +type SubsequentResultNode = DeferredFragmentNode | SubsequentResultRecord; /** * @internal */ export class IncrementalGraph { - private _pending: Set; - private _newPending: Set; + private _pending: Set; + private _deferredFragmentNodes: Map< + DeferredFragmentRecord, + DeferredFragmentNode + >; + + private _newPending: Set; private _completedQueue: Array; private _nextQueue: Array< (iterable: IteratorResult>) => void @@ -27,6 +51,7 @@ export class IncrementalGraph { constructor() { this._pending = new Set(); + this._deferredFragmentNodes = new Map(); this._newPending = new Set(); this._completedQueue = []; this._nextQueue = []; @@ -38,9 +63,10 @@ export class IncrementalGraph { for (const incrementalDataRecord of incrementalDataRecords) { if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { - deferredFragmentRecord.expectedReconcilableResults++; - - this._addDeferredFragmentRecord(deferredFragmentRecord); + const deferredFragmentNode = this._addDeferredFragmentNode( + deferredFragmentRecord, + ); + deferredFragmentNode.expectedReconcilableResults++; } const result = incrementalDataRecord.result; @@ -73,21 +99,33 @@ export class IncrementalGraph { } } + addCompletedReconcilableDeferredGroupedFieldSet( + reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, + ): void { + const deferredFragmentNodes: Array = + reconcilableResult.deferredFragmentRecords + .map((deferredFragmentRecord) => + this._deferredFragmentNodes.get(deferredFragmentRecord), + ) + .filter(isDeferredFragmentNode); + for (const deferredFragmentNode of deferredFragmentNodes) { + deferredFragmentNode.reconcilableResults.push(reconcilableResult); + } + } + getNewPending(): ReadonlyArray { - const newPending = []; + const newPending: Array = []; for (const node of this._newPending) { - if (isDeferredFragmentRecord(node)) { - if (node.expectedReconcilableResults) { - this._pending.add(node); - newPending.push(node); - continue; - } + if (isStreamNode(node)) { + this._pending.add(node); + newPending.push(node); + } else if (node.expectedReconcilableResults) { + this._pending.add(node); + newPending.push(node.deferredFragmentRecord); + } else { for (const child of node.children) { this._newPending.add(child); } - } else { - this._pending.add(node); - newPending.push(node); } } this._newPending.clear(); @@ -134,15 +172,23 @@ export class IncrementalGraph { completeDeferredFragment( deferredFragmentRecord: DeferredFragmentRecord, ): Array | undefined { - const reconcilableResults = deferredFragmentRecord.reconcilableResults; + const deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + // TODO: add test case? + /* c8 ignore next 3 */ + if (deferredFragmentNode === undefined) { + return undefined; + } + const reconcilableResults = deferredFragmentNode.reconcilableResults; if ( - deferredFragmentRecord.expectedReconcilableResults !== + deferredFragmentNode.expectedReconcilableResults !== reconcilableResults.length ) { return; } - this.removeSubsequentResultRecord(deferredFragmentRecord); - for (const child of deferredFragmentRecord.children) { + this._removePending(deferredFragmentNode); + for (const child of deferredFragmentNode.children) { this._newPending.add(child); for (const result of child.results) { this._enqueue(result); @@ -151,10 +197,30 @@ export class IncrementalGraph { return reconcilableResults; } - removeSubsequentResultRecord( - subsequentResultRecord: SubsequentResultRecord, - ): void { - this._pending.delete(subsequentResultRecord); + removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): void { + const deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + // TODO: add test case? + /* c8 ignore next 3 */ + if (deferredFragmentNode === undefined) { + return; + } + this._removePending(deferredFragmentNode); + this._deferredFragmentNodes.delete(deferredFragmentRecord); + // TODO: add test case for an erroring deferred fragment with child defers + /* c8 ignore next 3 */ + for (const child of deferredFragmentNode.children) { + this.removeDeferredFragment(child.deferredFragmentRecord); + } + } + + removeStream(streamRecord: SubsequentResultRecord): void { + this._removePending(streamRecord); + } + + private _removePending(subsequentResultNode: SubsequentResultNode): void { + this._pending.delete(subsequentResultNode); if (this._pending.size === 0) { for (const resolve of this._nextQueue) { resolve({ value: undefined, done: true }); @@ -162,42 +228,55 @@ export class IncrementalGraph { } } - private _addDeferredFragmentRecord( + private _addDeferredFragmentNode( deferredFragmentRecord: DeferredFragmentRecord, - ): void { + ): DeferredFragmentNode { + let deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + if (deferredFragmentNode !== undefined) { + return deferredFragmentNode; + } + deferredFragmentNode = { + deferredFragmentRecord, + expectedReconcilableResults: 0, + results: [], + reconcilableResults: [], + children: [], + }; + this._deferredFragmentNodes.set( + deferredFragmentRecord, + deferredFragmentNode, + ); const parent = deferredFragmentRecord.parent; if (parent === undefined) { - // Below is equivalent and slightly faster version of: - // if (this._pending.has(deferredFragmentRecord)) { ... } - // as all released deferredFragmentRecords have ids. - if (deferredFragmentRecord.id !== undefined) { - return; - } - - this._newPending.add(deferredFragmentRecord); - return; + this._newPending.add(deferredFragmentNode); + return deferredFragmentNode; } - - if (parent.children.has(deferredFragmentRecord)) { - return; - } - - parent.children.add(deferredFragmentRecord); - - this._addDeferredFragmentRecord(parent); + const parentNode = this._addDeferredFragmentNode(parent); + parentNode.children.push(deferredFragmentNode); + return deferredFragmentNode; } private _enqueueCompletedDeferredGroupedFieldSet( result: DeferredGroupedFieldSetResult, ): void { - let hasPendingParent = false; + let isPending = false; for (const deferredFragmentRecord of result.deferredFragmentRecords) { - if (deferredFragmentRecord.id !== undefined) { - hasPendingParent = true; + const deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + // TODO: add test case? + /* c8 ignore next 3 */ + if (deferredFragmentNode === undefined) { + continue; + } + if (this._pending.has(deferredFragmentNode)) { + isPending = true; } - deferredFragmentRecord.results.push(result); + deferredFragmentNode.results.push(result); } - if (hasPendingParent) { + if (isPending) { this._enqueue(result); } } diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 9f233a8cb1..9499dfced4 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -237,18 +237,15 @@ class IncrementalPublisher { id, errors: deferredGroupedFieldSetResult.errors, }); - this._incrementalGraph.removeSubsequentResultRecord( - deferredFragmentRecord, - ); + this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord); } } return; } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { - deferredFragmentRecord.reconcilableResults.push( - deferredGroupedFieldSetResult, - ); - } + + this._incrementalGraph.addCompletedReconcilableDeferredGroupedFieldSet( + deferredGroupedFieldSetResult, + ); const incrementalDataRecords = deferredGroupedFieldSetResult.incrementalDataRecords; @@ -306,7 +303,7 @@ class IncrementalPublisher { id, errors: streamItemsResult.errors, }); - this._incrementalGraph.removeSubsequentResultRecord(streamRecord); + this._incrementalGraph.removeStream(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); this._context.cancellableStreams.delete(streamRecord); @@ -317,7 +314,7 @@ class IncrementalPublisher { } } else if (streamItemsResult.result === undefined) { context.completed.push({ id }); - this._incrementalGraph.removeSubsequentResultRecord(streamRecord); + this._incrementalGraph.removeStream(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); this._context.cancellableStreams.delete(streamRecord); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index dfb7f36074..90e8ec5454 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -62,6 +62,7 @@ import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import type { CancellableStreamRecord, + DeferredFragmentRecord, DeferredGroupedFieldSetRecord, DeferredGroupedFieldSetResult, ExecutionResult, @@ -71,10 +72,7 @@ import type { StreamItemsResult, SubsequentResultRecord, } from './types.js'; -import { - DeferredFragmentRecord, - isReconcilableStreamItemsResult, -} from './types.js'; +import { isReconcilableStreamItemsResult } from './types.js'; import { getArgumentValues, getDirectiveValues, @@ -1674,11 +1672,11 @@ function addNewDeferredFragments( : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. - const deferredFragmentRecord = new DeferredFragmentRecord({ + const deferredFragmentRecord: DeferredFragmentRecord = { path, label: newDeferUsage.label, parent, - }); + }; // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); diff --git a/src/execution/types.ts b/src/execution/types.ts index 519d08ea46..c8fc64ec07 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -166,12 +166,6 @@ export interface FormattedCompletedResult { errors?: ReadonlyArray; } -export function isDeferredFragmentRecord( - subsequentResultRecord: SubsequentResultRecord, -): subsequentResultRecord is DeferredFragmentRecord { - return 'parent' in subsequentResultRecord; -} - export function isDeferredGroupedFieldSetRecord( incrementalDataRecord: IncrementalDataRecord, ): incrementalDataRecord is DeferredGroupedFieldSetRecord { @@ -221,30 +215,11 @@ export interface SubsequentResultRecord { id?: string | undefined; } -/** @internal */ -export class DeferredFragmentRecord implements SubsequentResultRecord { +export interface DeferredFragmentRecord extends SubsequentResultRecord { path: Path | undefined; label: string | undefined; id?: string | undefined; parent: DeferredFragmentRecord | undefined; - expectedReconcilableResults: number; - results: Array; - reconcilableResults: Array; - children: Set; - - constructor(opts: { - path: Path | undefined; - label: string | undefined; - parent: DeferredFragmentRecord | undefined; - }) { - this.path = opts.path; - this.label = opts.label; - this.parent = opts.parent; - this.expectedReconcilableResults = 0; - this.results = []; - this.reconcilableResults = []; - this.children = new Set(); - } } export interface CancellableStreamRecord extends SubsequentResultRecord { From f5b1c33ec5d2a46f84ce8be814ad5768bc717eb3 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 3 Jun 2024 12:58:42 +0300 Subject: [PATCH 08/13] refactor(incremental): introduce StreamRecord type --- src/execution/IncrementalGraph.ts | 7 ++++--- src/execution/execute.ts | 18 +++++++++--------- src/execution/types.ts | 20 +++++++++++--------- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 7cf722c29d..5a4de46ff9 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -7,6 +7,7 @@ import type { IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, + StreamRecord, SubsequentResultRecord, } from './types.js'; import { isDeferredGroupedFieldSetRecord } from './types.js'; @@ -27,11 +28,11 @@ function isDeferredFragmentNode( function isStreamNode( subsequentResultNode: SubsequentResultNode, -): subsequentResultNode is SubsequentResultRecord { +): subsequentResultNode is StreamRecord { return 'path' in subsequentResultNode; } -type SubsequentResultNode = DeferredFragmentNode | SubsequentResultRecord; +type SubsequentResultNode = DeferredFragmentNode | StreamRecord; /** * @internal @@ -215,7 +216,7 @@ export class IncrementalGraph { } } - removeStream(streamRecord: SubsequentResultRecord): void { + removeStream(streamRecord: StreamRecord): void { this._removePending(streamRecord); } diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 90e8ec5454..faac330f08 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -70,7 +70,7 @@ import type { IncrementalDataRecord, StreamItemsRecord, StreamItemsResult, - SubsequentResultRecord, + StreamRecord, } from './types.js'; import { isReconcilableStreamItemsResult } from './types.js'; import { @@ -1094,12 +1094,12 @@ async function completeAsyncIteratorValue( while (true) { if (streamUsage && index >= streamUsage.initialCount) { const returnFn = asyncIterator.return; - let streamRecord: SubsequentResultRecord | CancellableStreamRecord; + let streamRecord: StreamRecord | CancellableStreamRecord; if (returnFn === undefined) { streamRecord = { label: streamUsage.label, path, - } as SubsequentResultRecord; + } as StreamRecord; } else { streamRecord = { label: streamUsage.label, @@ -1266,7 +1266,7 @@ function completeIterableValue( const item = iteration.value; if (streamUsage && index >= streamUsage.initialCount) { - const streamRecord: SubsequentResultRecord = { + const streamRecord: StreamRecord = { label: streamUsage.label, path, }; @@ -2212,7 +2212,7 @@ function getDeferredFragmentRecords( } function firstSyncStreamItems( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, initialItem: PromiseOrValue, initialIndex: number, iterator: Iterator, @@ -2315,7 +2315,7 @@ function prependNextResolvedStreamItems( } function firstAsyncStreamItems( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, path: Path, initialIndex: number, asyncIterator: AsyncIterator, @@ -2341,7 +2341,7 @@ function firstAsyncStreamItems( } async function getNextAsyncStreamItemsResult( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, path: Path, index: number, asyncIterator: AsyncIterator, @@ -2395,7 +2395,7 @@ async function getNextAsyncStreamItemsResult( } function completeStreamItems( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, itemPath: Path, item: unknown, exeContext: ExecutionContext, @@ -2495,7 +2495,7 @@ function completeStreamItems( function buildStreamItemsResult( errors: ReadonlyArray | undefined, - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, result: GraphQLWrappedResult, ): StreamItemsResult { return { diff --git a/src/execution/types.ts b/src/execution/types.ts index c8fc64ec07..e136388ced 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -209,20 +209,22 @@ export interface DeferredGroupedFieldSetRecord { result: PromiseOrValue; } -export interface SubsequentResultRecord { +export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; + +export interface DeferredFragmentRecord { path: Path | undefined; label: string | undefined; id?: string | undefined; + parent: DeferredFragmentRecord | undefined; } -export interface DeferredFragmentRecord extends SubsequentResultRecord { - path: Path | undefined; +export interface StreamRecord { + path: Path; label: string | undefined; id?: string | undefined; - parent: DeferredFragmentRecord | undefined; } -export interface CancellableStreamRecord extends SubsequentResultRecord { +export interface CancellableStreamRecord extends StreamRecord { earlyReturn: () => Promise; } @@ -233,7 +235,7 @@ export function isCancellableStreamRecord( } interface ReconcilableStreamItemsResult { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; result: BareStreamItemsResult; incrementalDataRecords: ReadonlyArray | undefined; errors?: never; @@ -246,14 +248,14 @@ export function isReconcilableStreamItemsResult( } interface TerminatingStreamItemsResult { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; result?: never; incrementalDataRecords?: never; errors?: never; } interface NonReconcilableStreamItemsResult { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; errors: ReadonlyArray; result?: never; } @@ -264,7 +266,7 @@ export type StreamItemsResult = | NonReconcilableStreamItemsResult; export interface StreamItemsRecord { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; result: PromiseOrValue; } From bf62bd5a16ea1faea530094403424cc6cff19d07 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 17:51:32 +0300 Subject: [PATCH 09/13] refactor(IncrementalGraph): use set of pending deferred grouped field set results to reduce mutation --- src/execution/IncrementalGraph.ts | 48 +++++++++++++++++++-------- src/execution/IncrementalPublisher.ts | 13 ++++---- src/execution/execute.ts | 33 ++++++++++-------- src/execution/types.ts | 9 +++-- 4 files changed, 63 insertions(+), 40 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 5a4de46ff9..bf89d0f7e0 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -3,6 +3,7 @@ import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { DeferredFragmentRecord, + DeferredGroupedFieldSetRecord, DeferredGroupedFieldSetResult, IncrementalDataRecord, IncrementalDataRecordResult, @@ -14,9 +15,9 @@ import { isDeferredGroupedFieldSetRecord } from './types.js'; interface DeferredFragmentNode { deferredFragmentRecord: DeferredFragmentRecord; - expectedReconcilableResults: number; + deferredGroupedFieldSetRecords: Set; results: Array; - reconcilableResults: Array; + reconcilableResults: Set; children: Array; } @@ -67,7 +68,9 @@ export class IncrementalGraph { const deferredFragmentNode = this._addDeferredFragmentNode( deferredFragmentRecord, ); - deferredFragmentNode.expectedReconcilableResults++; + deferredFragmentNode.deferredGroupedFieldSetRecords.add( + incrementalDataRecord, + ); } const result = incrementalDataRecord.result; @@ -104,13 +107,16 @@ export class IncrementalGraph { reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, ): void { const deferredFragmentNodes: Array = - reconcilableResult.deferredFragmentRecords + reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords .map((deferredFragmentRecord) => this._deferredFragmentNodes.get(deferredFragmentRecord), ) .filter(isDeferredFragmentNode); for (const deferredFragmentNode of deferredFragmentNodes) { - deferredFragmentNode.reconcilableResults.push(reconcilableResult); + deferredFragmentNode.deferredGroupedFieldSetRecords.delete( + reconcilableResult.deferredGroupedFieldSetRecord, + ); + deferredFragmentNode.reconcilableResults.add(reconcilableResult); } } @@ -120,7 +126,7 @@ export class IncrementalGraph { if (isStreamNode(node)) { this._pending.add(node); newPending.push(node); - } else if (node.expectedReconcilableResults) { + } else if (node.deferredGroupedFieldSetRecords.size > 0) { this._pending.add(node); newPending.push(node.deferredFragmentRecord); } else { @@ -181,13 +187,26 @@ export class IncrementalGraph { if (deferredFragmentNode === undefined) { return undefined; } - const reconcilableResults = deferredFragmentNode.reconcilableResults; - if ( - deferredFragmentNode.expectedReconcilableResults !== - reconcilableResults.length - ) { + if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) { return; } + const reconcilableResults = Array.from( + deferredFragmentNode.reconcilableResults, + ); + for (const reconcilableResult of reconcilableResults) { + for (const otherDeferredFragmentRecord of reconcilableResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { + const otherDeferredFragmentNode = this._deferredFragmentNodes.get( + otherDeferredFragmentRecord, + ); + if (otherDeferredFragmentNode === undefined) { + continue; + } + otherDeferredFragmentNode.reconcilableResults.delete( + reconcilableResult, + ); + } + } this._removePending(deferredFragmentNode); for (const child of deferredFragmentNode.children) { this._newPending.add(child); @@ -240,9 +259,9 @@ export class IncrementalGraph { } deferredFragmentNode = { deferredFragmentRecord, - expectedReconcilableResults: 0, + deferredGroupedFieldSetRecords: new Set(), results: [], - reconcilableResults: [], + reconcilableResults: new Set(), children: [], }; this._deferredFragmentNodes.set( @@ -263,7 +282,8 @@ export class IncrementalGraph { result: DeferredGroupedFieldSetResult, ): void { let isPending = false; - for (const deferredFragmentRecord of result.deferredFragmentRecords) { + for (const deferredFragmentRecord of result.deferredGroupedFieldSetRecord + .deferredFragmentRecords) { const deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, ); diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 9499dfced4..caf167d114 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -230,7 +230,8 @@ class IncrementalPublisher { deferredGroupedFieldSetResult, ) ) { - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { const id = deferredFragmentRecord.id; if (id !== undefined) { context.completed.push({ @@ -253,7 +254,8 @@ class IncrementalPublisher { this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { const id = deferredFragmentRecord.id; // TODO: add test case for this. // Presumably, this can occur if an error causes a fragment to be completed early, @@ -269,10 +271,6 @@ class IncrementalPublisher { } const incremental = context.incremental; for (const reconcilableResult of reconcilableResults) { - if (reconcilableResult.sent) { - continue; - } - reconcilableResult.sent = true; const { bestId, subPath } = this._getBestIdAndSubPath( id, deferredFragmentRecord, @@ -343,7 +341,8 @@ class IncrementalPublisher { let maxLength = pathToArray(initialDeferredFragmentRecord.path).length; let bestId = initialId; - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { if (deferredFragmentRecord === initialDeferredFragmentRecord) { continue; } diff --git a/src/execution/execute.ts b/src/execution/execute.ts index faac330f08..a60fb0d7da 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2089,9 +2089,14 @@ function executeDeferredGroupedFieldSets( deferMap, ); + const deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord = { + deferredFragmentRecords, + result: undefined as unknown as DeferredGroupedFieldSetResult, + }; + const executor = () => executeDeferredGroupedFieldSet( - deferredFragmentRecords, + deferredGroupedFieldSetRecord, exeContext, parentType, sourceValue, @@ -2104,12 +2109,12 @@ function executeDeferredGroupedFieldSets( deferMap, ); - const deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord = { - deferredFragmentRecords, - result: shouldDefer(parentDeferUsages, deferUsageSet) - ? Promise.resolve().then(executor) - : executor(), - }; + deferredGroupedFieldSetRecord.result = shouldDefer( + parentDeferUsages, + deferUsageSet, + ) + ? Promise.resolve().then(executor) + : executor(); newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } @@ -2134,7 +2139,7 @@ function shouldDefer( } function executeDeferredGroupedFieldSet( - deferredFragmentRecords: ReadonlyArray, + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, exeContext: ExecutionContext, parentType: GraphQLObjectType, sourceValue: unknown, @@ -2156,7 +2161,7 @@ function executeDeferredGroupedFieldSet( ); } catch (error) { return { - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path: pathToArray(path), errors: withError(incrementalContext.errors, error), }; @@ -2167,12 +2172,12 @@ function executeDeferredGroupedFieldSet( (resolved) => buildDeferredGroupedFieldSetResult( incrementalContext.errors, - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path, resolved, ), (error) => ({ - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path: pathToArray(path), errors: withError(incrementalContext.errors, error), }), @@ -2181,7 +2186,7 @@ function executeDeferredGroupedFieldSet( return buildDeferredGroupedFieldSetResult( incrementalContext.errors, - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path, result, ); @@ -2189,12 +2194,12 @@ function executeDeferredGroupedFieldSet( function buildDeferredGroupedFieldSetResult( errors: ReadonlyArray | undefined, - deferredFragmentRecords: ReadonlyArray, + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, path: Path | undefined, result: GraphQLWrappedResult>, ): DeferredGroupedFieldSetResult { return { - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path: pathToArray(path), result: errors === undefined ? { data: result[0] } : { data: result[0], errors }, diff --git a/src/execution/types.ts b/src/execution/types.ts index e136388ced..343e3edf9b 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -179,22 +179,21 @@ export type DeferredGroupedFieldSetResult = export function isDeferredGroupedFieldSetResult( subsequentResult: DeferredGroupedFieldSetResult | StreamItemsResult, ): subsequentResult is DeferredGroupedFieldSetResult { - return 'deferredFragmentRecords' in subsequentResult; + return 'deferredGroupedFieldSetRecord' in subsequentResult; } export interface ReconcilableDeferredGroupedFieldSetResult { - deferredFragmentRecords: ReadonlyArray; + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord; path: Array; result: BareDeferredGroupedFieldSetResult; incrementalDataRecords: ReadonlyArray | undefined; - sent?: true | undefined; errors?: never; } interface NonReconcilableDeferredGroupedFieldSetResult { - errors: ReadonlyArray; - deferredFragmentRecords: ReadonlyArray; + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord; path: Array; + errors: ReadonlyArray; result?: never; } From a86b207b8cf9cdd7ff586afb4d490391f3cc5546 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 3 Jun 2024 12:43:50 +0300 Subject: [PATCH 10/13] fix(incremental): emit only single completion when multiple deferred grouped field sets error --- src/execution/IncrementalGraph.ts | 7 +- src/execution/IncrementalPublisher.ts | 17 +-- src/execution/__tests__/defer-test.ts | 146 ++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 8 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index bf89d0f7e0..8e93b3eda4 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -217,14 +217,16 @@ export class IncrementalGraph { return reconcilableResults; } - removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): void { + removeDeferredFragment( + deferredFragmentRecord: DeferredFragmentRecord, + ): boolean { const deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, ); // TODO: add test case? /* c8 ignore next 3 */ if (deferredFragmentNode === undefined) { - return; + return false; } this._removePending(deferredFragmentNode); this._deferredFragmentNodes.delete(deferredFragmentRecord); @@ -233,6 +235,7 @@ export class IncrementalGraph { for (const child of deferredFragmentNode.children) { this.removeDeferredFragment(child.deferredFragmentRecord); } + return true; } removeStream(streamRecord: StreamRecord): void { diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index caf167d114..87fe548628 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -233,13 +233,18 @@ class IncrementalPublisher { for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { const id = deferredFragmentRecord.id; - if (id !== undefined) { - context.completed.push({ - id, - errors: deferredGroupedFieldSetResult.errors, - }); - this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord); + if ( + !this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord) + ) { + // This can occur if multiple deferred grouped field sets error for a fragment. + continue; } + invariant(id !== undefined); + context.completed.push({ + id, + errors: deferredGroupedFieldSetResult.errors, + }); + this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord); } return; } diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index 71d86862f4..dec0982452 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -1334,6 +1334,152 @@ describe('Execute: defer directive', () => { ]); }); + it('Handles multiple erroring deferred grouped field sets', async () => { + const document = parse(` + query { + ... @defer { + a { + b { + c { + someError: nonNullErrorField + } + } + } + } + ... @defer { + a { + b { + c { + anotherError: nonNullErrorField + } + } + } + } + } + `); + const result = await complete(document, { + a: { + b: { c: { nonNullErrorField: null } }, + }, + }); + expectJSON(result).toDeepEqual([ + { + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }, + { + completed: [ + { + id: '0', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 7, column: 17 }], + path: ['a', 'b', 'c', 'someError'], + }, + ], + }, + { + id: '1', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 16, column: 17 }], + path: ['a', 'b', 'c', 'anotherError'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + + it('Handles multiple erroring deferred grouped field sets for the same fragment', async () => { + const document = parse(` + query { + ... @defer { + a { + b { + someC: c { + d: d + } + anotherC: c { + d: d + } + } + } + } + ... @defer { + a { + b { + someC: c { + someError: nonNullErrorField + } + anotherC: c { + anotherError: nonNullErrorField + } + } + } + } + } + `); + const result = await complete(document, { + a: { + b: { c: { d: 'd', nonNullErrorField: null } }, + }, + }); + expectJSON(result).toDeepEqual([ + { + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }, + { + incremental: [ + { + data: { a: { b: { someC: {}, anotherC: {} } } }, + id: '0', + }, + { + data: { d: 'd' }, + id: '0', + subPath: ['a', 'b', 'someC'], + }, + { + data: { d: 'd' }, + id: '0', + subPath: ['a', 'b', 'anotherC'], + }, + ], + completed: [ + { + id: '1', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 19, column: 17 }], + path: ['a', 'b', 'someC', 'someError'], + }, + ], + }, + { id: '0' }, + ], + hasNext: false, + }, + ]); + }); + it('filters a payload with a null that cannot be merged', async () => { const document = parse(` query { From 27669221947fab0a9c5626b3110d725116b49ad4 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 2 Jun 2024 22:02:05 +0300 Subject: [PATCH 11/13] refactor(incremental): enqueue only released records --- src/execution/IncrementalGraph.ts | 108 ++++++++++--------------- src/execution/__tests__/defer-test.ts | 28 +++++-- src/execution/__tests__/stream-test.ts | 24 +++--- 3 files changed, 78 insertions(+), 82 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 8e93b3eda4..a706a8755b 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -4,10 +4,10 @@ import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { DeferredFragmentRecord, DeferredGroupedFieldSetRecord, - DeferredGroupedFieldSetResult, IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, + StreamItemsRecord, StreamRecord, SubsequentResultRecord, } from './types.js'; @@ -16,7 +16,6 @@ import { isDeferredGroupedFieldSetRecord } from './types.js'; interface DeferredFragmentNode { deferredFragmentRecord: DeferredFragmentRecord; deferredGroupedFieldSetRecords: Set; - results: Array; reconcilableResults: Set; children: Array; } @@ -46,6 +45,7 @@ export class IncrementalGraph { >; private _newPending: Set; + private _newIncrementalDataRecords: Set; private _completedQueue: Array; private _nextQueue: Array< (iterable: IteratorResult>) => void @@ -54,6 +54,7 @@ export class IncrementalGraph { constructor() { this._pending = new Set(); this._deferredFragmentNodes = new Map(); + this._newIncrementalDataRecords = new Set(); this._newPending = new Set(); this._completedQueue = []; this._nextQueue = []; @@ -64,41 +65,9 @@ export class IncrementalGraph { ): void { for (const incrementalDataRecord of incrementalDataRecords) { if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { - const deferredFragmentNode = this._addDeferredFragmentNode( - deferredFragmentRecord, - ); - deferredFragmentNode.deferredGroupedFieldSetRecords.add( - incrementalDataRecord, - ); - } - - const result = incrementalDataRecord.result; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => { - this._enqueueCompletedDeferredGroupedFieldSet(resolved); - }); - } else { - this._enqueueCompletedDeferredGroupedFieldSet(result); - } - - continue; - } - - const streamRecord = incrementalDataRecord.streamRecord; - if (streamRecord.id === undefined) { - this._newPending.add(streamRecord); - } - - const result = incrementalDataRecord.result; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => { - this._enqueue(resolved); - }); + this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); } else { - this._enqueue(result); + this._addStreamItemsRecord(incrementalDataRecord); } } } @@ -127,6 +96,9 @@ export class IncrementalGraph { this._pending.add(node); newPending.push(node); } else if (node.deferredGroupedFieldSetRecords.size > 0) { + for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { + this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); + } this._pending.add(node); newPending.push(node.deferredFragmentRecord); } else { @@ -136,6 +108,18 @@ export class IncrementalGraph { } } this._newPending.clear(); + + for (const incrementalDataRecord of this._newIncrementalDataRecords) { + const result = incrementalDataRecord.result; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => this._enqueue(resolved)); + } else { + this._enqueue(result); + } + } + this._newIncrementalDataRecords.clear(); + return newPending; } @@ -210,9 +194,6 @@ export class IncrementalGraph { this._removePending(deferredFragmentNode); for (const child of deferredFragmentNode.children) { this._newPending.add(child); - for (const result of child.results) { - this._enqueue(result); - } } return reconcilableResults; } @@ -251,6 +232,30 @@ export class IncrementalGraph { } } + private _addDeferredGroupedFieldSetRecord( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): void { + for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { + const deferredFragmentNode = this._addDeferredFragmentNode( + deferredFragmentRecord, + ); + if (this._pending.has(deferredFragmentNode)) { + this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord); + } + deferredFragmentNode.deferredGroupedFieldSetRecords.add( + deferredGroupedFieldSetRecord, + ); + } + } + + private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void { + const streamRecord = streamItemsRecord.streamRecord; + if (!this._pending.has(streamRecord)) { + this._newPending.add(streamRecord); + } + this._newIncrementalDataRecords.add(streamItemsRecord); + } + private _addDeferredFragmentNode( deferredFragmentRecord: DeferredFragmentRecord, ): DeferredFragmentNode { @@ -263,7 +268,6 @@ export class IncrementalGraph { deferredFragmentNode = { deferredFragmentRecord, deferredGroupedFieldSetRecords: new Set(), - results: [], reconcilableResults: new Set(), children: [], }; @@ -281,30 +285,6 @@ export class IncrementalGraph { return deferredFragmentNode; } - private _enqueueCompletedDeferredGroupedFieldSet( - result: DeferredGroupedFieldSetResult, - ): void { - let isPending = false; - for (const deferredFragmentRecord of result.deferredGroupedFieldSetRecord - .deferredFragmentRecords) { - const deferredFragmentNode = this._deferredFragmentNodes.get( - deferredFragmentRecord, - ); - // TODO: add test case? - /* c8 ignore next 3 */ - if (deferredFragmentNode === undefined) { - continue; - } - if (this._pending.has(deferredFragmentNode)) { - isPending = true; - } - deferredFragmentNode.results.push(result); - } - if (isPending) { - this._enqueue(result); - } - } - private *_yieldCurrentCompletedIncrementalData( first: IncrementalDataRecordResult, ): Generator { diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index dec0982452..a79f211545 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -367,6 +367,12 @@ describe('Execute: defer directive', () => { }, id: '0', }, + ], + completed: [{ id: '0' }], + hasNext: true, + }, + { + incremental: [ { data: { friends: [{ name: 'Han' }, { name: 'Leia' }, { name: 'C-3PO' }], @@ -374,7 +380,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '0' }, { id: '1' }], + completed: [{ id: '1' }], hasNext: false, }, ]); @@ -977,27 +983,37 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [ - { id: '1', path: ['hero', 'nestedObject'] }, - { id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }, - ], + pending: [{ id: '1', path: ['hero', 'nestedObject'] }], incremental: [ { data: { bar: 'bar' }, id: '0', subPath: ['nestedObject', 'deeperObject'], }, + ], + completed: [{ id: '0' }], + hasNext: true, + }, + { + pending: [{ id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }], + incremental: [ { data: { baz: 'baz' }, id: '1', subPath: ['deeperObject'], }, + ], + completed: [{ id: '1' }], + hasNext: true, + }, + { + incremental: [ { data: { bak: 'bak' }, id: '2', }, ], - completed: [{ id: '0' }, { id: '1' }, { id: '2' }], + completed: [{ id: '2' }], hasNext: false, }, ]); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index f0a103b935..332db58e11 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1444,6 +1444,10 @@ describe('Execute: stream directive', () => { }, { incremental: [ + { + items: [{ name: 'Luke' }], + id: '1', + }, { data: { scalarField: null }, id: '0', @@ -1455,10 +1459,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ name: 'Luke' }], - id: '1', - }, ], completed: [{ id: '0' }], hasNext: true, @@ -1946,14 +1946,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - data: { name: 'Luke' }, - id: '0', - }, { items: [{ id: '2' }], id: '1', }, + { + data: { name: 'Luke' }, + id: '0', + }, ], completed: [{ id: '0' }], hasNext: true, @@ -2047,14 +2047,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - data: { name: 'Luke' }, - id: '0', - }, { items: [{ id: '2' }], id: '1', }, + { + data: { name: 'Luke' }, + id: '0', + }, ], completed: [{ id: '0' }], hasNext: true, From 64894d9a7ca66197bc4323e176bc4747d372010c Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 3 Jun 2024 05:28:19 +0300 Subject: [PATCH 12/13] refactor(incremental): introduce BoxedPromiseOrValue to save resolved promise results --- src/execution/IncrementalGraph.ts | 2 +- src/execution/__tests__/defer-test.ts | 60 +----- src/execution/__tests__/stream-test.ts | 96 +--------- src/execution/execute.ts | 172 ++++++++++-------- src/execution/types.ts | 6 +- src/jsutils/BoxedPromiseOrValue.ts | 26 +++ .../__tests__/BoxedPromiseOrValue-test.ts | 30 +++ 7 files changed, 168 insertions(+), 224 deletions(-) create mode 100644 src/jsutils/BoxedPromiseOrValue.ts create mode 100644 src/jsutils/__tests__/BoxedPromiseOrValue-test.ts diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index a706a8755b..d47f64c049 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -110,7 +110,7 @@ export class IncrementalGraph { this._newPending.clear(); for (const incrementalDataRecord of this._newIncrementalDataRecords) { - const result = incrementalDataRecord.result; + const result = incrementalDataRecord.result.value; if (isPromise(result)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises result.then((resolved) => this._enqueue(resolved)); diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index a79f211545..537f875d37 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -367,12 +367,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { friends: [{ name: 'Han' }, { name: 'Leia' }, { name: 'C-3PO' }], @@ -380,7 +374,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -732,12 +726,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { id: '1', @@ -745,7 +733,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -909,12 +897,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { bar: 'bar', @@ -922,7 +904,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -983,37 +965,27 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ id: '1', path: ['hero', 'nestedObject'] }], + pending: [ + { id: '1', path: ['hero', 'nestedObject'] }, + { id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }, + ], incremental: [ { data: { bar: 'bar' }, id: '0', subPath: ['nestedObject', 'deeperObject'], }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - pending: [{ id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }], - incremental: [ { data: { baz: 'baz' }, id: '1', subPath: ['deeperObject'], }, - ], - completed: [{ id: '1' }], - hasNext: true, - }, - { - incremental: [ { data: { bak: 'bak' }, id: '2', }, ], - completed: [{ id: '2' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }], hasNext: false, }, ]); @@ -2080,17 +2052,11 @@ describe('Execute: defer directive', () => { data: { name: 'slow', friends: [{}, {}, {}] }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, id: '1' }, { data: { name: 'Leia' }, id: '2' }, { data: { name: 'C-3PO' }, id: '3' }, ], - completed: [{ id: '1' }, { id: '2' }, { id: '3' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }, { id: '3' }], hasNext: false, }, ]); @@ -2136,17 +2102,11 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, id: '1' }, { data: { name: 'Leia' }, id: '2' }, { data: { name: 'C-3PO' }, id: '3' }, ], - completed: [{ id: '1' }, { id: '2' }, { id: '3' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }, { id: '3' }], hasNext: false, }, ]); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 332db58e11..461eeb4f93 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -369,20 +369,10 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Han', id: '2' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], id: '0', @@ -527,11 +517,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], id: '0', @@ -572,11 +557,6 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Han', id: '2' }], id: '0', @@ -591,9 +571,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -633,9 +610,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -946,11 +920,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], id: '0', @@ -997,11 +966,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], id: '0', @@ -1132,19 +1096,11 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1460,11 +1416,7 @@ describe('Execute: stream directive', () => { ], }, ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -1570,9 +1522,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1724,9 +1673,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1774,19 +1720,11 @@ describe('Execute: stream directive', () => { items: [{ id: '1', name: 'Luke' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ id: '2', name: 'Han' }], id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1844,48 +1782,22 @@ describe('Execute: stream directive', () => { data: { scalarField: 'slow', nestedFriendList: [] }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - done: false, - }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ - value: { - incremental: [ { items: [{ name: 'Luke' }], id: '1', }, - ], - hasNext: true, - }, - done: false, - }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ - value: { - incremental: [ { items: [{ name: 'Han' }], id: '1', }, ], - hasNext: true, - }, - done: false, - }); - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ - value: { - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, done: false, }); - const result6 = await iterator.next(); - expectJSON(result6).toDeepEqual({ + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ value: undefined, done: true, }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index a60fb0d7da..1c9a9024e2 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1,3 +1,4 @@ +import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import { inspect } from '../jsutils/inspect.js'; import { invariant } from '../jsutils/invariant.js'; import { isAsyncIterable } from '../jsutils/isAsyncIterable.js'; @@ -2091,7 +2092,8 @@ function executeDeferredGroupedFieldSets( const deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord = { deferredFragmentRecords, - result: undefined as unknown as DeferredGroupedFieldSetResult, + result: + undefined as unknown as BoxedPromiseOrValue, }; const executor = () => @@ -2109,12 +2111,11 @@ function executeDeferredGroupedFieldSets( deferMap, ); - deferredGroupedFieldSetRecord.result = shouldDefer( - parentDeferUsages, - deferUsageSet, - ) - ? Promise.resolve().then(executor) - : executor(); + deferredGroupedFieldSetRecord.result = new BoxedPromiseOrValue( + shouldDefer(parentDeferUsages, deferUsageSet) + ? Promise.resolve().then(executor) + : executor(), + ); newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } @@ -2228,65 +2229,76 @@ function firstSyncStreamItems( ): StreamItemsRecord { return { streamRecord, - result: Promise.resolve().then(() => { - const path = streamRecord.path; - const initialPath = addPath(path, initialIndex, undefined); + result: new BoxedPromiseOrValue( + Promise.resolve().then(() => { + const path = streamRecord.path; + const initialPath = addPath(path, initialIndex, undefined); - let result = completeStreamItems( - streamRecord, - initialPath, - initialItem, - exeContext, - { errors: undefined }, - fieldGroup, - info, - itemType, - ); - const firstStreamItems = { result }; - let currentStreamItems = firstStreamItems; - let currentIndex = initialIndex; - let iteration = iterator.next(); - let erroredSynchronously = false; - while (!iteration.done) { - if (!isPromise(result) && !isReconcilableStreamItemsResult(result)) { - erroredSynchronously = true; - break; - } - const item = iteration.value; - currentIndex++; - const currentPath = addPath(path, currentIndex, undefined); - result = completeStreamItems( - streamRecord, - currentPath, - item, - exeContext, - { errors: undefined }, - fieldGroup, - info, - itemType, + let result = new BoxedPromiseOrValue( + completeStreamItems( + streamRecord, + initialPath, + initialItem, + exeContext, + { errors: undefined }, + fieldGroup, + info, + itemType, + ), ); + const firstStreamItems = { result }; + let currentStreamItems = firstStreamItems; + let currentIndex = initialIndex; + let iteration = iterator.next(); + let erroredSynchronously = false; + while (!iteration.done) { + const value = result.value; + if (!isPromise(value) && !isReconcilableStreamItemsResult(value)) { + erroredSynchronously = true; + break; + } + const item = iteration.value; + currentIndex++; + const currentPath = addPath(path, currentIndex, undefined); + result = new BoxedPromiseOrValue( + completeStreamItems( + streamRecord, + currentPath, + item, + exeContext, + { errors: undefined }, + fieldGroup, + info, + itemType, + ), + ); - const nextStreamItems: StreamItemsRecord = { streamRecord, result }; - currentStreamItems.result = prependNextStreamItems( - currentStreamItems.result, - nextStreamItems, - ); - currentStreamItems = nextStreamItems; + const nextStreamItems: StreamItemsRecord = { streamRecord, result }; + currentStreamItems.result = new BoxedPromiseOrValue( + prependNextStreamItems( + currentStreamItems.result.value, + nextStreamItems, + ), + ); + currentStreamItems = nextStreamItems; - iteration = iterator.next(); - } + iteration = iterator.next(); + } - // If a non-reconcilable stream items result was encountered, then the stream terminates in error. - // Otherwise, add a stream terminator. - if (!erroredSynchronously) { - currentStreamItems.result = prependNextStreamItems( - currentStreamItems.result, - { streamRecord, result: { streamRecord } }, - ); - } + // If a non-reconcilable stream items result was encountered, then the stream terminates in error. + // Otherwise, add a stream terminator. + if (!erroredSynchronously) { + currentStreamItems.result = new BoxedPromiseOrValue( + prependNextStreamItems(currentStreamItems.result.value, { + streamRecord, + result: new BoxedPromiseOrValue({ streamRecord }), + }), + ); + } - return firstStreamItems.result; - }), + return firstStreamItems.result.value; + }), + ), }; } @@ -2331,15 +2343,17 @@ function firstAsyncStreamItems( ): StreamItemsRecord { const firstStreamItems: StreamItemsRecord = { streamRecord, - result: getNextAsyncStreamItemsResult( - streamRecord, - path, - initialIndex, - asyncIterator, - exeContext, - fieldGroup, - info, - itemType, + result: new BoxedPromiseOrValue( + getNextAsyncStreamItemsResult( + streamRecord, + path, + initialIndex, + asyncIterator, + exeContext, + fieldGroup, + info, + itemType, + ), ), }; return firstStreamItems; @@ -2384,15 +2398,17 @@ async function getNextAsyncStreamItemsResult( const nextStreamItems: StreamItemsRecord = { streamRecord, - result: getNextAsyncStreamItemsResult( - streamRecord, - path, - index, - asyncIterator, - exeContext, - fieldGroup, - info, - itemType, + result: new BoxedPromiseOrValue( + getNextAsyncStreamItemsResult( + streamRecord, + path, + index, + asyncIterator, + exeContext, + fieldGroup, + info, + itemType, + ), ), }; diff --git a/src/execution/types.ts b/src/execution/types.ts index 343e3edf9b..5c44e6dea8 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -1,6 +1,6 @@ +import type { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; -import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import type { GraphQLError, @@ -205,7 +205,7 @@ export function isNonReconcilableDeferredGroupedFieldSetResult( export interface DeferredGroupedFieldSetRecord { deferredFragmentRecords: ReadonlyArray; - result: PromiseOrValue; + result: BoxedPromiseOrValue; } export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; @@ -266,7 +266,7 @@ export type StreamItemsResult = export interface StreamItemsRecord { streamRecord: StreamRecord; - result: PromiseOrValue; + result: BoxedPromiseOrValue; } export type IncrementalDataRecord = diff --git a/src/jsutils/BoxedPromiseOrValue.ts b/src/jsutils/BoxedPromiseOrValue.ts new file mode 100644 index 0000000000..7f6f758270 --- /dev/null +++ b/src/jsutils/BoxedPromiseOrValue.ts @@ -0,0 +1,26 @@ +import { isPromise } from './isPromise.js'; +import type { PromiseOrValue } from './PromiseOrValue.js'; + +/** + * A BoxedPromiseOrValue is a container for a value or promise where the value + * will be updated when the promise resolves. + * + * A BoxedPromiseOrValue may only be used with promises whose possible + * rejection has already been handled, otherwise this will lead to unhandled + * promise rejections. + * + * @internal + * */ +export class BoxedPromiseOrValue { + value: PromiseOrValue; + + constructor(value: PromiseOrValue) { + this.value = value; + if (isPromise(value)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + value.then((resolved) => { + this.value = resolved; + }); + } + } +} diff --git a/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts b/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts new file mode 100644 index 0000000000..19bc79a4bb --- /dev/null +++ b/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts @@ -0,0 +1,30 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; + +import { BoxedPromiseOrValue } from '../BoxedPromiseOrValue.js'; + +describe('BoxedPromiseOrValue', () => { + it('can box a value', () => { + const boxed = new BoxedPromiseOrValue(42); + + expect(boxed.value).to.equal(42); + }); + + it('can box a promise', () => { + const promise = Promise.resolve(42); + const boxed = new BoxedPromiseOrValue(promise); + + expect(boxed.value).to.equal(promise); + }); + + it('resets the boxed value when the passed promise resolves', async () => { + const promise = Promise.resolve(42); + const boxed = new BoxedPromiseOrValue(promise); + + await resolveOnNextTick(); + + expect(boxed.value).to.equal(42); + }); +}); From 09ae45a269e53cef56fc09e3270f345c753bf9a6 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 4 Jun 2024 06:16:55 +0300 Subject: [PATCH 13/13] polish: remove outdated coverage ignore statement --- src/execution/IncrementalGraph.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index d47f64c049..e95da2f4af 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -204,8 +204,6 @@ export class IncrementalGraph { const deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, ); - // TODO: add test case? - /* c8 ignore next 3 */ if (deferredFragmentNode === undefined) { return false; }