diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts new file mode 100644 index 0000000000..e95da2f4af --- /dev/null +++ b/src/execution/IncrementalGraph.ts @@ -0,0 +1,307 @@ +import { isPromise } from '../jsutils/isPromise.js'; +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +import type { + DeferredFragmentRecord, + DeferredGroupedFieldSetRecord, + IncrementalDataRecord, + IncrementalDataRecordResult, + ReconcilableDeferredGroupedFieldSetResult, + StreamItemsRecord, + StreamRecord, + SubsequentResultRecord, +} from './types.js'; +import { isDeferredGroupedFieldSetRecord } from './types.js'; + +interface DeferredFragmentNode { + deferredFragmentRecord: DeferredFragmentRecord; + deferredGroupedFieldSetRecords: Set; + reconcilableResults: Set; + children: Array; +} + +function isDeferredFragmentNode( + node: DeferredFragmentNode | undefined, +): node is DeferredFragmentNode { + return node !== undefined; +} + +function isStreamNode( + subsequentResultNode: SubsequentResultNode, +): subsequentResultNode is StreamRecord { + return 'path' in subsequentResultNode; +} + +type SubsequentResultNode = DeferredFragmentNode | StreamRecord; + +/** + * @internal + */ +export class IncrementalGraph { + private _pending: Set; + private _deferredFragmentNodes: Map< + DeferredFragmentRecord, + DeferredFragmentNode + >; + + private _newPending: Set; + private _newIncrementalDataRecords: Set; + private _completedQueue: Array; + private _nextQueue: Array< + (iterable: IteratorResult>) => void + >; + + constructor() { + this._pending = new Set(); + this._deferredFragmentNodes = new Map(); + this._newIncrementalDataRecords = new Set(); + this._newPending = new Set(); + this._completedQueue = []; + this._nextQueue = []; + } + + addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + ): void { + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); + } else { + this._addStreamItemsRecord(incrementalDataRecord); + } + } + } + + addCompletedReconcilableDeferredGroupedFieldSet( + reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, + ): void { + const deferredFragmentNodes: Array = + reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords + .map((deferredFragmentRecord) => + this._deferredFragmentNodes.get(deferredFragmentRecord), + ) + .filter(isDeferredFragmentNode); + for (const deferredFragmentNode of deferredFragmentNodes) { + deferredFragmentNode.deferredGroupedFieldSetRecords.delete( + reconcilableResult.deferredGroupedFieldSetRecord, + ); + deferredFragmentNode.reconcilableResults.add(reconcilableResult); + } + } + + getNewPending(): ReadonlyArray { + const newPending: Array = []; + for (const node of this._newPending) { + if (isStreamNode(node)) { + this._pending.add(node); + newPending.push(node); + } else if (node.deferredGroupedFieldSetRecords.size > 0) { + for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { + this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); + } + this._pending.add(node); + newPending.push(node.deferredFragmentRecord); + } else { + for (const child of node.children) { + this._newPending.add(child); + } + } + } + this._newPending.clear(); + + for (const incrementalDataRecord of this._newIncrementalDataRecords) { + const result = incrementalDataRecord.result.value; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => this._enqueue(resolved)); + } else { + this._enqueue(result); + } + } + this._newIncrementalDataRecords.clear(); + + return newPending; + } + + completedIncrementalData() { + return { + [Symbol.asyncIterator]() { + return this; + }, + next: (): Promise< + IteratorResult> + > => { + const firstResult = this._completedQueue.shift(); + if (firstResult !== undefined) { + return Promise.resolve({ + value: this._yieldCurrentCompletedIncrementalData(firstResult), + done: false, + }); + } + const { promise, resolve } = + promiseWithResolvers< + IteratorResult> + >(); + this._nextQueue.push(resolve); + return promise; + }, + return: (): Promise< + IteratorResult> + > => { + for (const resolve of this._nextQueue) { + resolve({ value: undefined, done: true }); + } + return Promise.resolve({ value: undefined, done: true }); + }, + }; + } + + hasNext(): boolean { + return this._pending.size > 0; + } + + completeDeferredFragment( + deferredFragmentRecord: DeferredFragmentRecord, + ): Array | undefined { + const deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + // TODO: add test case? + /* c8 ignore next 3 */ + if (deferredFragmentNode === undefined) { + return undefined; + } + if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) { + return; + } + const reconcilableResults = Array.from( + deferredFragmentNode.reconcilableResults, + ); + for (const reconcilableResult of reconcilableResults) { + for (const otherDeferredFragmentRecord of reconcilableResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { + const otherDeferredFragmentNode = this._deferredFragmentNodes.get( + otherDeferredFragmentRecord, + ); + if (otherDeferredFragmentNode === undefined) { + continue; + } + otherDeferredFragmentNode.reconcilableResults.delete( + reconcilableResult, + ); + } + } + this._removePending(deferredFragmentNode); + for (const child of deferredFragmentNode.children) { + this._newPending.add(child); + } + return reconcilableResults; + } + + removeDeferredFragment( + deferredFragmentRecord: DeferredFragmentRecord, + ): boolean { + const deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + if (deferredFragmentNode === undefined) { + return false; + } + this._removePending(deferredFragmentNode); + this._deferredFragmentNodes.delete(deferredFragmentRecord); + // TODO: add test case for an erroring deferred fragment with child defers + /* c8 ignore next 3 */ + for (const child of deferredFragmentNode.children) { + this.removeDeferredFragment(child.deferredFragmentRecord); + } + return true; + } + + removeStream(streamRecord: StreamRecord): void { + this._removePending(streamRecord); + } + + private _removePending(subsequentResultNode: SubsequentResultNode): void { + this._pending.delete(subsequentResultNode); + if (this._pending.size === 0) { + for (const resolve of this._nextQueue) { + resolve({ value: undefined, done: true }); + } + } + } + + private _addDeferredGroupedFieldSetRecord( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): void { + for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { + const deferredFragmentNode = this._addDeferredFragmentNode( + deferredFragmentRecord, + ); + if (this._pending.has(deferredFragmentNode)) { + this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord); + } + deferredFragmentNode.deferredGroupedFieldSetRecords.add( + deferredGroupedFieldSetRecord, + ); + } + } + + private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void { + const streamRecord = streamItemsRecord.streamRecord; + if (!this._pending.has(streamRecord)) { + this._newPending.add(streamRecord); + } + this._newIncrementalDataRecords.add(streamItemsRecord); + } + + private _addDeferredFragmentNode( + deferredFragmentRecord: DeferredFragmentRecord, + ): DeferredFragmentNode { + let deferredFragmentNode = this._deferredFragmentNodes.get( + deferredFragmentRecord, + ); + if (deferredFragmentNode !== undefined) { + return deferredFragmentNode; + } + deferredFragmentNode = { + deferredFragmentRecord, + deferredGroupedFieldSetRecords: new Set(), + reconcilableResults: new Set(), + children: [], + }; + this._deferredFragmentNodes.set( + deferredFragmentRecord, + deferredFragmentNode, + ); + const parent = deferredFragmentRecord.parent; + if (parent === undefined) { + this._newPending.add(deferredFragmentNode); + return deferredFragmentNode; + } + const parentNode = this._addDeferredFragmentNode(parent); + parentNode.children.push(deferredFragmentNode); + return deferredFragmentNode; + } + + private *_yieldCurrentCompletedIncrementalData( + first: IncrementalDataRecordResult, + ): Generator { + yield first; + let completed; + while ((completed = this._completedQueue.shift()) !== undefined) { + yield completed; + } + } + + private _enqueue(completed: IncrementalDataRecordResult): void { + const next = this._nextQueue.shift(); + if (next !== undefined) { + next({ + value: this._yieldCurrentCompletedIncrementalData(completed), + done: false, + }); + return; + } + this._completedQueue.push(completed); + } +} diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 0504238eae..87fe548628 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,11 +1,10 @@ import { invariant } from '../jsutils/invariant.js'; -import { isPromise } from '../jsutils/isPromise.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import { pathToArray } from '../jsutils/Path.js'; -import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { GraphQLError } from '../error/GraphQLError.js'; +import { IncrementalGraph } from './IncrementalGraph.js'; import type { CancellableStreamRecord, CompletedResult, @@ -25,8 +24,6 @@ import type { } from './types.js'; import { isCancellableStreamRecord, - isDeferredFragmentRecord, - isDeferredGroupedFieldSetRecord, isDeferredGroupedFieldSetResult, isNonReconcilableDeferredGroupedFieldSetResult, } from './types.js'; @@ -49,6 +46,12 @@ interface IncrementalPublisherContext { cancellableStreams: Set | undefined; } +interface SubsequentIncrementalExecutionResultContext { + pending: Array; + incremental: Array; + completed: Array; +} + /** * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. @@ -58,24 +61,12 @@ interface IncrementalPublisherContext { class IncrementalPublisher { private _context: IncrementalPublisherContext; private _nextId: number; - private _pending: Set; - private _completedResultQueue: Array; - private _newPending: Set; - private _incremental: Array; - private _completed: Array; - // these are assigned within the Promise executor called synchronously within the constructor - private _signalled!: Promise; - private _resolve!: () => void; + private _incrementalGraph: IncrementalGraph; constructor(context: IncrementalPublisherContext) { this._context = context; this._nextId = 0; - this._pending = new Set(); - this._completedResultQueue = []; - this._newPending = new Set(); - this._incremental = []; - this._completed = []; - this._reset(); + this._incrementalGraph = new IncrementalGraph(); } buildResponse( @@ -83,10 +74,10 @@ class IncrementalPublisher { errors: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { - this._addIncrementalDataRecords(incrementalDataRecords); - this._pruneEmpty(); + this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); + const newPending = this._incrementalGraph.getNewPending(); - const pending = this._pendingSourcesToResults(); + const pending = this._pendingSourcesToResults(newPending); const initialResult: InitialIncrementalExecutionResult = errors === undefined @@ -99,130 +90,12 @@ class IncrementalPublisher { }; } - private _addIncrementalDataRecords( - incrementalDataRecords: ReadonlyArray, - ): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { - deferredFragmentRecord.expectedReconcilableResults++; - - this._addDeferredFragmentRecord(deferredFragmentRecord); - } - - const result = incrementalDataRecord.result; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => { - this._enqueueCompletedDeferredGroupedFieldSet(resolved); - }); - } else { - this._enqueueCompletedDeferredGroupedFieldSet(result); - } - - continue; - } - - const streamRecord = incrementalDataRecord.streamRecord; - if (streamRecord.id === undefined) { - this._newPending.add(streamRecord); - } - - const result = incrementalDataRecord.result; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => { - this._enqueueCompletedStreamItems(resolved); - }); - } else { - this._enqueueCompletedStreamItems(result); - } - } - } - - private _addDeferredFragmentRecord( - deferredFragmentRecord: DeferredFragmentRecord, - ): void { - const parent = deferredFragmentRecord.parent; - if (parent === undefined) { - // Below is equivalent and slightly faster version of: - // if (this._pending.has(deferredFragmentRecord)) { ... } - // as all released deferredFragmentRecords have ids. - if (deferredFragmentRecord.id !== undefined) { - return; - } - - this._newPending.add(deferredFragmentRecord); - return; - } - - if (parent.children.has(deferredFragmentRecord)) { - return; - } - - parent.children.add(deferredFragmentRecord); - - this._addDeferredFragmentRecord(parent); - } - - private _pruneEmpty() { - const maybeEmptyNewPending = this._newPending; - this._newPending = new Set(); - for (const node of maybeEmptyNewPending) { - if (isDeferredFragmentRecord(node)) { - if (node.expectedReconcilableResults) { - this._newPending.add(node); - continue; - } - for (const child of node.children) { - this._addNonEmptyNewPending(child); - } - } else { - this._newPending.add(node); - } - } - } - - private _addNonEmptyNewPending( - deferredFragmentRecord: DeferredFragmentRecord, - ): void { - if (deferredFragmentRecord.expectedReconcilableResults) { - this._newPending.add(deferredFragmentRecord); - return; - } - /* c8 ignore next 5 */ - // TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children. - for (const child of deferredFragmentRecord.children) { - this._addNonEmptyNewPending(child); - } - } - - private _enqueueCompletedDeferredGroupedFieldSet( - result: DeferredGroupedFieldSetResult, - ): void { - let hasPendingParent = false; - for (const deferredFragmentRecord of result.deferredFragmentRecords) { - if (deferredFragmentRecord.id !== undefined) { - hasPendingParent = true; - } - deferredFragmentRecord.results.push(result); - } - if (hasPendingParent) { - this._completedResultQueue.push(result); - this._trigger(); - } - } - - private _enqueueCompletedStreamItems(result: StreamItemsResult): void { - this._completedResultQueue.push(result); - this._trigger(); - } - - private _pendingSourcesToResults(): Array { + private _pendingSourcesToResults( + newPending: ReadonlyArray, + ): Array { const pendingResults: Array = []; - for (const pendingSource of this._newPending) { + for (const pendingSource of newPending) { const id = String(this._getNextId()); - this._pending.add(pendingSource); pendingSource.id = id; const pendingResult: PendingResult = { id, @@ -233,7 +106,6 @@ class IncrementalPublisher { } pendingResults.push(pendingResult); } - this._newPending.clear(); return pendingResults; } @@ -251,79 +123,67 @@ class IncrementalPublisher { const _next = async (): Promise< IteratorResult > => { - while (!isDone) { - let pending: Array = []; + if (isDone) { + await this._returnAsyncIteratorsIgnoringErrors(); + return { value: undefined, done: true }; + } - let completedResult: IncrementalDataRecordResult | undefined; - while ( - (completedResult = this._completedResultQueue.shift()) !== undefined - ) { - if (isDeferredGroupedFieldSetResult(completedResult)) { - this._handleCompletedDeferredGroupedFieldSet(completedResult); - } else { - this._handleCompletedStreamItems(completedResult); - } + const context: SubsequentIncrementalExecutionResultContext = { + pending: [], + incremental: [], + completed: [], + }; - pending = [...pending, ...this._pendingSourcesToResults()]; + const completedIncrementalData = + this._incrementalGraph.completedIncrementalData(); + // use the raw iterator rather than 'for await ... of' so as not to trigger the + // '.return()' method on the iterator when exiting the loop with the next value + const asyncIterator = completedIncrementalData[Symbol.asyncIterator](); + let iteration = await asyncIterator.next(); + while (!iteration.done) { + for (const completedResult of iteration.value) { + this._handleCompletedIncrementalData(completedResult, context); } - if (this._incremental.length > 0 || this._completed.length > 0) { - const hasNext = this._pending.size > 0; + const { incremental, completed } = context; + if (incremental.length > 0 || completed.length > 0) { + const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { + // eslint-disable-next-line require-atomic-updates isDone = true; } const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = { hasNext }; + const pending = context.pending; if (pending.length > 0) { subsequentIncrementalExecutionResult.pending = pending; } - if (this._incremental.length > 0) { - subsequentIncrementalExecutionResult.incremental = - this._incremental; + if (incremental.length > 0) { + subsequentIncrementalExecutionResult.incremental = incremental; } - if (this._completed.length > 0) { - subsequentIncrementalExecutionResult.completed = this._completed; + if (completed.length > 0) { + subsequentIncrementalExecutionResult.completed = completed; } - this._incremental = []; - this._completed = []; - return { value: subsequentIncrementalExecutionResult, done: false }; } // eslint-disable-next-line no-await-in-loop - await this._signalled; + iteration = await asyncIterator.next(); } - await returnStreamIterators().catch(() => { - // ignore errors - }); - + await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; - const returnStreamIterators = async (): Promise => { - const cancellableStreams = this._context.cancellableStreams; - if (cancellableStreams === undefined) { - return; - } - const promises: Array> = []; - for (const streamRecord of cancellableStreams) { - if (streamRecord.earlyReturn !== undefined) { - promises.push(streamRecord.earlyReturn()); - } - } - await Promise.all(promises); - }; - const _return = async (): Promise< IteratorResult > => { isDone = true; - await returnStreamIterators(); + await this._returnAsyncIterators(); return { value: undefined, done: true }; }; @@ -331,7 +191,7 @@ class IncrementalPublisher { error?: unknown, ): Promise> => { isDone = true; - await returnStreamIterators(); + await this._returnAsyncIterators(); return Promise.reject(error); }; @@ -345,53 +205,62 @@ class IncrementalPublisher { }; } - private _trigger() { - this._resolve(); - this._reset(); - } - - private _reset() { - // promiseWithResolvers uses void only as a generic type parameter - // see: https://typescript-eslint.io/rules/no-invalid-void-type/ - // eslint-disable-next-line @typescript-eslint/no-invalid-void-type - const { promise: signalled, resolve } = promiseWithResolvers(); - this._resolve = resolve; - this._signalled = signalled; + private _handleCompletedIncrementalData( + completedIncrementalData: IncrementalDataRecordResult, + context: SubsequentIncrementalExecutionResultContext, + ): void { + if (isDeferredGroupedFieldSetResult(completedIncrementalData)) { + this._handleCompletedDeferredGroupedFieldSet( + completedIncrementalData, + context, + ); + } else { + this._handleCompletedStreamItems(completedIncrementalData, context); + } + const newPending = this._incrementalGraph.getNewPending(); + context.pending.push(...this._pendingSourcesToResults(newPending)); } private _handleCompletedDeferredGroupedFieldSet( deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, + context: SubsequentIncrementalExecutionResultContext, ): void { if ( isNonReconcilableDeferredGroupedFieldSetResult( deferredGroupedFieldSetResult, ) ) { - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { const id = deferredFragmentRecord.id; - if (id !== undefined) { - this._completed.push({ - id, - errors: deferredGroupedFieldSetResult.errors, - }); - this._pending.delete(deferredFragmentRecord); + if ( + !this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord) + ) { + // This can occur if multiple deferred grouped field sets error for a fragment. + continue; } + invariant(id !== undefined); + context.completed.push({ + id, + errors: deferredGroupedFieldSetResult.errors, + }); + this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord); } return; } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { - deferredFragmentRecord.reconcilableResults.push( - deferredGroupedFieldSetResult, - ); - } + + this._incrementalGraph.addCompletedReconcilableDeferredGroupedFieldSet( + deferredGroupedFieldSetResult, + ); const incrementalDataRecords = deferredGroupedFieldSetResult.incrementalDataRecords; if (incrementalDataRecords !== undefined) { - this._addIncrementalDataRecords(incrementalDataRecords); + this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { const id = deferredFragmentRecord.id; // TODO: add test case for this. // Presumably, this can occur if an error causes a fragment to be completed early, @@ -400,18 +269,13 @@ class IncrementalPublisher { if (id === undefined) { continue; } - const reconcilableResults = deferredFragmentRecord.reconcilableResults; - if ( - deferredFragmentRecord.expectedReconcilableResults !== - reconcilableResults.length - ) { + const reconcilableResults = + this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); + if (reconcilableResults === undefined) { continue; } + const incremental = context.incremental; for (const reconcilableResult of reconcilableResults) { - if (reconcilableResult.sent) { - continue; - } - reconcilableResult.sent = true; const { bestId, subPath } = this._getBestIdAndSubPath( id, deferredFragmentRecord, @@ -424,36 +288,25 @@ class IncrementalPublisher { if (subPath !== undefined) { incrementalEntry.subPath = subPath; } - this._incremental.push(incrementalEntry); - } - this._completed.push({ id }); - this._pending.delete(deferredFragmentRecord); - for (const child of deferredFragmentRecord.children) { - this._newPending.add(child); - this._completedResultQueue.push(...child.results); + incremental.push(incrementalEntry); } + context.completed.push({ id }); } - - this._pruneEmpty(); } private _handleCompletedStreamItems( streamItemsResult: StreamItemsResult, + context: SubsequentIncrementalExecutionResultContext, ): void { const streamRecord = streamItemsResult.streamRecord; const id = streamRecord.id; - // TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list - // for ordering purposes, if an entry errors, additional entries will not be processed. - /* c8 ignore next 3 */ - if (id === undefined) { - return; - } + invariant(id !== undefined); if (streamItemsResult.errors !== undefined) { - this._completed.push({ + context.completed.push({ id, errors: streamItemsResult.errors, }); - this._pending.delete(streamRecord); + this._incrementalGraph.removeStream(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); this._context.cancellableStreams.delete(streamRecord); @@ -463,8 +316,8 @@ class IncrementalPublisher { }); } } else if (streamItemsResult.result === undefined) { - this._completed.push({ id }); - this._pending.delete(streamRecord); + context.completed.push({ id }); + this._incrementalGraph.removeStream(streamRecord); if (isCancellableStreamRecord(streamRecord)) { invariant(this._context.cancellableStreams !== undefined); this._context.cancellableStreams.delete(streamRecord); @@ -475,13 +328,12 @@ class IncrementalPublisher { ...streamItemsResult.result, }; - this._incremental.push(incrementalEntry); + context.incremental.push(incrementalEntry); if (streamItemsResult.incrementalDataRecords !== undefined) { - this._addIncrementalDataRecords( + this._incrementalGraph.addIncrementalDataRecords( streamItemsResult.incrementalDataRecords, ); - this._pruneEmpty(); } } } @@ -494,7 +346,8 @@ class IncrementalPublisher { let maxLength = pathToArray(initialDeferredFragmentRecord.path).length; let bestId = initialId; - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { if (deferredFragmentRecord === initialDeferredFragmentRecord) { continue; } @@ -517,4 +370,26 @@ class IncrementalPublisher { subPath: subPath.length > 0 ? subPath : undefined, }; } + + private async _returnAsyncIterators(): Promise { + await this._incrementalGraph.completedIncrementalData().return(); + + const cancellableStreams = this._context.cancellableStreams; + if (cancellableStreams === undefined) { + return; + } + const promises: Array> = []; + for (const streamRecord of cancellableStreams) { + if (streamRecord.earlyReturn !== undefined) { + promises.push(streamRecord.earlyReturn()); + } + } + await Promise.all(promises); + } + + private async _returnAsyncIteratorsIgnoringErrors(): Promise { + await this._returnAsyncIterators().catch(() => { + // Ignore errors + }); + } } diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index 71d86862f4..537f875d37 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -726,12 +726,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { id: '1', @@ -739,7 +733,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -903,12 +897,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { bar: 'bar', @@ -916,7 +904,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -1334,6 +1322,152 @@ describe('Execute: defer directive', () => { ]); }); + it('Handles multiple erroring deferred grouped field sets', async () => { + const document = parse(` + query { + ... @defer { + a { + b { + c { + someError: nonNullErrorField + } + } + } + } + ... @defer { + a { + b { + c { + anotherError: nonNullErrorField + } + } + } + } + } + `); + const result = await complete(document, { + a: { + b: { c: { nonNullErrorField: null } }, + }, + }); + expectJSON(result).toDeepEqual([ + { + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }, + { + completed: [ + { + id: '0', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 7, column: 17 }], + path: ['a', 'b', 'c', 'someError'], + }, + ], + }, + { + id: '1', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 16, column: 17 }], + path: ['a', 'b', 'c', 'anotherError'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + + it('Handles multiple erroring deferred grouped field sets for the same fragment', async () => { + const document = parse(` + query { + ... @defer { + a { + b { + someC: c { + d: d + } + anotherC: c { + d: d + } + } + } + } + ... @defer { + a { + b { + someC: c { + someError: nonNullErrorField + } + anotherC: c { + anotherError: nonNullErrorField + } + } + } + } + } + `); + const result = await complete(document, { + a: { + b: { c: { d: 'd', nonNullErrorField: null } }, + }, + }); + expectJSON(result).toDeepEqual([ + { + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }, + { + incremental: [ + { + data: { a: { b: { someC: {}, anotherC: {} } } }, + id: '0', + }, + { + data: { d: 'd' }, + id: '0', + subPath: ['a', 'b', 'someC'], + }, + { + data: { d: 'd' }, + id: '0', + subPath: ['a', 'b', 'anotherC'], + }, + ], + completed: [ + { + id: '1', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 19, column: 17 }], + path: ['a', 'b', 'someC', 'someError'], + }, + ], + }, + { id: '0' }, + ], + hasNext: false, + }, + ]); + }); + it('filters a payload with a null that cannot be merged', async () => { const document = parse(` query { @@ -1918,17 +2052,11 @@ describe('Execute: defer directive', () => { data: { name: 'slow', friends: [{}, {}, {}] }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, id: '1' }, { data: { name: 'Leia' }, id: '2' }, { data: { name: 'C-3PO' }, id: '3' }, ], - completed: [{ id: '1' }, { id: '2' }, { id: '3' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }, { id: '3' }], hasNext: false, }, ]); @@ -1974,17 +2102,11 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, id: '1' }, { data: { name: 'Leia' }, id: '2' }, { data: { name: 'C-3PO' }, id: '3' }, ], - completed: [{ id: '1' }, { id: '2' }, { id: '3' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }, { id: '3' }], hasNext: false, }, ]); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index f0a103b935..461eeb4f93 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -369,20 +369,10 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Han', id: '2' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], id: '0', @@ -527,11 +517,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], id: '0', @@ -572,11 +557,6 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Han', id: '2' }], id: '0', @@ -591,9 +571,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -633,9 +610,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -946,11 +920,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], id: '0', @@ -997,11 +966,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], id: '0', @@ -1132,19 +1096,11 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1444,6 +1400,10 @@ describe('Execute: stream directive', () => { }, { incremental: [ + { + items: [{ name: 'Luke' }], + id: '1', + }, { data: { scalarField: null }, id: '0', @@ -1455,16 +1415,8 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ name: 'Luke' }], - id: '1', - }, ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -1570,9 +1522,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1724,9 +1673,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1774,19 +1720,11 @@ describe('Execute: stream directive', () => { items: [{ id: '1', name: 'Luke' }], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ id: '2', name: 'Han' }], id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1844,48 +1782,22 @@ describe('Execute: stream directive', () => { data: { scalarField: 'slow', nestedFriendList: [] }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - done: false, - }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ - value: { - incremental: [ { items: [{ name: 'Luke' }], id: '1', }, - ], - hasNext: true, - }, - done: false, - }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ - value: { - incremental: [ { items: [{ name: 'Han' }], id: '1', }, ], - hasNext: true, - }, - done: false, - }); - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ - value: { - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, done: false, }); - const result6 = await iterator.next(); - expectJSON(result6).toDeepEqual({ + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ value: undefined, done: true, }); @@ -1946,14 +1858,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - data: { name: 'Luke' }, - id: '0', - }, { items: [{ id: '2' }], id: '1', }, + { + data: { name: 'Luke' }, + id: '0', + }, ], completed: [{ id: '0' }], hasNext: true, @@ -2047,14 +1959,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - data: { name: 'Luke' }, - id: '0', - }, { items: [{ id: '2' }], id: '1', }, + { + data: { name: 'Luke' }, + id: '0', + }, ], completed: [{ id: '0' }], hasNext: true, diff --git a/src/execution/execute.ts b/src/execution/execute.ts index dfb7f36074..1c9a9024e2 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1,3 +1,4 @@ +import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import { inspect } from '../jsutils/inspect.js'; import { invariant } from '../jsutils/invariant.js'; import { isAsyncIterable } from '../jsutils/isAsyncIterable.js'; @@ -62,6 +63,7 @@ import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import type { CancellableStreamRecord, + DeferredFragmentRecord, DeferredGroupedFieldSetRecord, DeferredGroupedFieldSetResult, ExecutionResult, @@ -69,12 +71,9 @@ import type { IncrementalDataRecord, StreamItemsRecord, StreamItemsResult, - SubsequentResultRecord, -} from './types.js'; -import { - DeferredFragmentRecord, - isReconcilableStreamItemsResult, + StreamRecord, } from './types.js'; +import { isReconcilableStreamItemsResult } from './types.js'; import { getArgumentValues, getDirectiveValues, @@ -1096,12 +1095,12 @@ async function completeAsyncIteratorValue( while (true) { if (streamUsage && index >= streamUsage.initialCount) { const returnFn = asyncIterator.return; - let streamRecord: SubsequentResultRecord | CancellableStreamRecord; + let streamRecord: StreamRecord | CancellableStreamRecord; if (returnFn === undefined) { streamRecord = { label: streamUsage.label, path, - } as SubsequentResultRecord; + } as StreamRecord; } else { streamRecord = { label: streamUsage.label, @@ -1268,7 +1267,7 @@ function completeIterableValue( const item = iteration.value; if (streamUsage && index >= streamUsage.initialCount) { - const streamRecord: SubsequentResultRecord = { + const streamRecord: StreamRecord = { label: streamUsage.label, path, }; @@ -1674,11 +1673,11 @@ function addNewDeferredFragments( : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. - const deferredFragmentRecord = new DeferredFragmentRecord({ + const deferredFragmentRecord: DeferredFragmentRecord = { path, label: newDeferUsage.label, parent, - }); + }; // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); @@ -2091,9 +2090,15 @@ function executeDeferredGroupedFieldSets( deferMap, ); + const deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord = { + deferredFragmentRecords, + result: + undefined as unknown as BoxedPromiseOrValue, + }; + const executor = () => executeDeferredGroupedFieldSet( - deferredFragmentRecords, + deferredGroupedFieldSetRecord, exeContext, parentType, sourceValue, @@ -2106,12 +2111,11 @@ function executeDeferredGroupedFieldSets( deferMap, ); - const deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord = { - deferredFragmentRecords, - result: shouldDefer(parentDeferUsages, deferUsageSet) + deferredGroupedFieldSetRecord.result = new BoxedPromiseOrValue( + shouldDefer(parentDeferUsages, deferUsageSet) ? Promise.resolve().then(executor) : executor(), - }; + ); newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } @@ -2136,7 +2140,7 @@ function shouldDefer( } function executeDeferredGroupedFieldSet( - deferredFragmentRecords: ReadonlyArray, + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, exeContext: ExecutionContext, parentType: GraphQLObjectType, sourceValue: unknown, @@ -2158,7 +2162,7 @@ function executeDeferredGroupedFieldSet( ); } catch (error) { return { - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path: pathToArray(path), errors: withError(incrementalContext.errors, error), }; @@ -2169,12 +2173,12 @@ function executeDeferredGroupedFieldSet( (resolved) => buildDeferredGroupedFieldSetResult( incrementalContext.errors, - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path, resolved, ), (error) => ({ - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path: pathToArray(path), errors: withError(incrementalContext.errors, error), }), @@ -2183,7 +2187,7 @@ function executeDeferredGroupedFieldSet( return buildDeferredGroupedFieldSetResult( incrementalContext.errors, - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path, result, ); @@ -2191,12 +2195,12 @@ function executeDeferredGroupedFieldSet( function buildDeferredGroupedFieldSetResult( errors: ReadonlyArray | undefined, - deferredFragmentRecords: ReadonlyArray, + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, path: Path | undefined, result: GraphQLWrappedResult>, ): DeferredGroupedFieldSetResult { return { - deferredFragmentRecords, + deferredGroupedFieldSetRecord, path: pathToArray(path), result: errors === undefined ? { data: result[0] } : { data: result[0], errors }, @@ -2214,7 +2218,7 @@ function getDeferredFragmentRecords( } function firstSyncStreamItems( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, initialItem: PromiseOrValue, initialIndex: number, iterator: Iterator, @@ -2225,65 +2229,76 @@ function firstSyncStreamItems( ): StreamItemsRecord { return { streamRecord, - result: Promise.resolve().then(() => { - const path = streamRecord.path; - const initialPath = addPath(path, initialIndex, undefined); + result: new BoxedPromiseOrValue( + Promise.resolve().then(() => { + const path = streamRecord.path; + const initialPath = addPath(path, initialIndex, undefined); - let result = completeStreamItems( - streamRecord, - initialPath, - initialItem, - exeContext, - { errors: undefined }, - fieldGroup, - info, - itemType, - ); - const firstStreamItems = { result }; - let currentStreamItems = firstStreamItems; - let currentIndex = initialIndex; - let iteration = iterator.next(); - let erroredSynchronously = false; - while (!iteration.done) { - if (!isPromise(result) && !isReconcilableStreamItemsResult(result)) { - erroredSynchronously = true; - break; - } - const item = iteration.value; - currentIndex++; - const currentPath = addPath(path, currentIndex, undefined); - result = completeStreamItems( - streamRecord, - currentPath, - item, - exeContext, - { errors: undefined }, - fieldGroup, - info, - itemType, + let result = new BoxedPromiseOrValue( + completeStreamItems( + streamRecord, + initialPath, + initialItem, + exeContext, + { errors: undefined }, + fieldGroup, + info, + itemType, + ), ); + const firstStreamItems = { result }; + let currentStreamItems = firstStreamItems; + let currentIndex = initialIndex; + let iteration = iterator.next(); + let erroredSynchronously = false; + while (!iteration.done) { + const value = result.value; + if (!isPromise(value) && !isReconcilableStreamItemsResult(value)) { + erroredSynchronously = true; + break; + } + const item = iteration.value; + currentIndex++; + const currentPath = addPath(path, currentIndex, undefined); + result = new BoxedPromiseOrValue( + completeStreamItems( + streamRecord, + currentPath, + item, + exeContext, + { errors: undefined }, + fieldGroup, + info, + itemType, + ), + ); - const nextStreamItems: StreamItemsRecord = { streamRecord, result }; - currentStreamItems.result = prependNextStreamItems( - currentStreamItems.result, - nextStreamItems, - ); - currentStreamItems = nextStreamItems; + const nextStreamItems: StreamItemsRecord = { streamRecord, result }; + currentStreamItems.result = new BoxedPromiseOrValue( + prependNextStreamItems( + currentStreamItems.result.value, + nextStreamItems, + ), + ); + currentStreamItems = nextStreamItems; - iteration = iterator.next(); - } + iteration = iterator.next(); + } - // If a non-reconcilable stream items result was encountered, then the stream terminates in error. - // Otherwise, add a stream terminator. - if (!erroredSynchronously) { - currentStreamItems.result = prependNextStreamItems( - currentStreamItems.result, - { streamRecord, result: { streamRecord } }, - ); - } + // If a non-reconcilable stream items result was encountered, then the stream terminates in error. + // Otherwise, add a stream terminator. + if (!erroredSynchronously) { + currentStreamItems.result = new BoxedPromiseOrValue( + prependNextStreamItems(currentStreamItems.result.value, { + streamRecord, + result: new BoxedPromiseOrValue({ streamRecord }), + }), + ); + } - return firstStreamItems.result; - }), + return firstStreamItems.result.value; + }), + ), }; } @@ -2317,7 +2332,7 @@ function prependNextResolvedStreamItems( } function firstAsyncStreamItems( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, path: Path, initialIndex: number, asyncIterator: AsyncIterator, @@ -2328,22 +2343,24 @@ function firstAsyncStreamItems( ): StreamItemsRecord { const firstStreamItems: StreamItemsRecord = { streamRecord, - result: getNextAsyncStreamItemsResult( - streamRecord, - path, - initialIndex, - asyncIterator, - exeContext, - fieldGroup, - info, - itemType, + result: new BoxedPromiseOrValue( + getNextAsyncStreamItemsResult( + streamRecord, + path, + initialIndex, + asyncIterator, + exeContext, + fieldGroup, + info, + itemType, + ), ), }; return firstStreamItems; } async function getNextAsyncStreamItemsResult( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, path: Path, index: number, asyncIterator: AsyncIterator, @@ -2381,15 +2398,17 @@ async function getNextAsyncStreamItemsResult( const nextStreamItems: StreamItemsRecord = { streamRecord, - result: getNextAsyncStreamItemsResult( - streamRecord, - path, - index, - asyncIterator, - exeContext, - fieldGroup, - info, - itemType, + result: new BoxedPromiseOrValue( + getNextAsyncStreamItemsResult( + streamRecord, + path, + index, + asyncIterator, + exeContext, + fieldGroup, + info, + itemType, + ), ), }; @@ -2397,7 +2416,7 @@ async function getNextAsyncStreamItemsResult( } function completeStreamItems( - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, itemPath: Path, item: unknown, exeContext: ExecutionContext, @@ -2497,7 +2516,7 @@ function completeStreamItems( function buildStreamItemsResult( errors: ReadonlyArray | undefined, - streamRecord: SubsequentResultRecord, + streamRecord: StreamRecord, result: GraphQLWrappedResult, ): StreamItemsResult { return { diff --git a/src/execution/types.ts b/src/execution/types.ts index d2fd84827b..5c44e6dea8 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -1,6 +1,6 @@ +import type { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; -import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import type { GraphQLError, @@ -166,12 +166,6 @@ export interface FormattedCompletedResult { errors?: ReadonlyArray; } -export function isDeferredFragmentRecord( - subsequentResultRecord: SubsequentResultRecord, -): subsequentResultRecord is DeferredFragmentRecord { - return 'parent' in subsequentResultRecord; -} - export function isDeferredGroupedFieldSetRecord( incrementalDataRecord: IncrementalDataRecord, ): incrementalDataRecord is DeferredGroupedFieldSetRecord { @@ -185,22 +179,21 @@ export type DeferredGroupedFieldSetResult = export function isDeferredGroupedFieldSetResult( subsequentResult: DeferredGroupedFieldSetResult | StreamItemsResult, ): subsequentResult is DeferredGroupedFieldSetResult { - return 'deferredFragmentRecords' in subsequentResult; + return 'deferredGroupedFieldSetRecord' in subsequentResult; } -interface ReconcilableDeferredGroupedFieldSetResult { - deferredFragmentRecords: ReadonlyArray; +export interface ReconcilableDeferredGroupedFieldSetResult { + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord; path: Array; result: BareDeferredGroupedFieldSetResult; incrementalDataRecords: ReadonlyArray | undefined; - sent?: true | undefined; errors?: never; } interface NonReconcilableDeferredGroupedFieldSetResult { - errors: ReadonlyArray; - deferredFragmentRecords: ReadonlyArray; + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord; path: Array; + errors: ReadonlyArray; result?: never; } @@ -212,42 +205,25 @@ export function isNonReconcilableDeferredGroupedFieldSetResult( export interface DeferredGroupedFieldSetRecord { deferredFragmentRecords: ReadonlyArray; - result: PromiseOrValue; + result: BoxedPromiseOrValue; } -export interface SubsequentResultRecord { +export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; + +export interface DeferredFragmentRecord { path: Path | undefined; label: string | undefined; id?: string | undefined; + parent: DeferredFragmentRecord | undefined; } -/** @internal */ -export class DeferredFragmentRecord implements SubsequentResultRecord { - path: Path | undefined; +export interface StreamRecord { + path: Path; label: string | undefined; id?: string | undefined; - parent: DeferredFragmentRecord | undefined; - expectedReconcilableResults: number; - results: Array; - reconcilableResults: Array; - children: Set; - - constructor(opts: { - path: Path | undefined; - label: string | undefined; - parent: DeferredFragmentRecord | undefined; - }) { - this.path = opts.path; - this.label = opts.label; - this.parent = opts.parent; - this.expectedReconcilableResults = 0; - this.results = []; - this.reconcilableResults = []; - this.children = new Set(); - } } -export interface CancellableStreamRecord extends SubsequentResultRecord { +export interface CancellableStreamRecord extends StreamRecord { earlyReturn: () => Promise; } @@ -258,7 +234,7 @@ export function isCancellableStreamRecord( } interface ReconcilableStreamItemsResult { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; result: BareStreamItemsResult; incrementalDataRecords: ReadonlyArray | undefined; errors?: never; @@ -271,14 +247,14 @@ export function isReconcilableStreamItemsResult( } interface TerminatingStreamItemsResult { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; result?: never; incrementalDataRecords?: never; errors?: never; } interface NonReconcilableStreamItemsResult { - streamRecord: SubsequentResultRecord; + streamRecord: StreamRecord; errors: ReadonlyArray; result?: never; } @@ -289,8 +265,8 @@ export type StreamItemsResult = | NonReconcilableStreamItemsResult; export interface StreamItemsRecord { - streamRecord: SubsequentResultRecord; - result: PromiseOrValue; + streamRecord: StreamRecord; + result: BoxedPromiseOrValue; } export type IncrementalDataRecord = diff --git a/src/jsutils/BoxedPromiseOrValue.ts b/src/jsutils/BoxedPromiseOrValue.ts new file mode 100644 index 0000000000..7f6f758270 --- /dev/null +++ b/src/jsutils/BoxedPromiseOrValue.ts @@ -0,0 +1,26 @@ +import { isPromise } from './isPromise.js'; +import type { PromiseOrValue } from './PromiseOrValue.js'; + +/** + * A BoxedPromiseOrValue is a container for a value or promise where the value + * will be updated when the promise resolves. + * + * A BoxedPromiseOrValue may only be used with promises whose possible + * rejection has already been handled, otherwise this will lead to unhandled + * promise rejections. + * + * @internal + * */ +export class BoxedPromiseOrValue { + value: PromiseOrValue; + + constructor(value: PromiseOrValue) { + this.value = value; + if (isPromise(value)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + value.then((resolved) => { + this.value = resolved; + }); + } + } +} diff --git a/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts b/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts new file mode 100644 index 0000000000..19bc79a4bb --- /dev/null +++ b/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts @@ -0,0 +1,30 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; + +import { BoxedPromiseOrValue } from '../BoxedPromiseOrValue.js'; + +describe('BoxedPromiseOrValue', () => { + it('can box a value', () => { + const boxed = new BoxedPromiseOrValue(42); + + expect(boxed.value).to.equal(42); + }); + + it('can box a promise', () => { + const promise = Promise.resolve(42); + const boxed = new BoxedPromiseOrValue(promise); + + expect(boxed.value).to.equal(promise); + }); + + it('resets the boxed value when the passed promise resolves', async () => { + const promise = Promise.resolve(42); + const boxed = new BoxedPromiseOrValue(promise); + + await resolveOnNextTick(); + + expect(boxed.value).to.equal(42); + }); +});