diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index cf5e95c285..76f1a32351 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,4 +1,5 @@ import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; +import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; @@ -20,19 +21,13 @@ interface DeferredFragmentNode { deferredFragmentRecord: DeferredFragmentRecord; deferredGroupedFieldSetRecords: Set; reconcilableResults: Set; - children: Array; + children: Set; } function isDeferredFragmentNode( - node: DeferredFragmentNode | undefined, + node: SubsequentResultNode | undefined, ): node is DeferredFragmentNode { - return node !== undefined; -} - -function isStreamNode( - record: SubsequentResultNode | IncrementalDataRecord, -): record is StreamRecord { - return 'streamItemQueue' in record; + return node !== undefined && 'deferredFragmentRecord' in node; } type SubsequentResultNode = DeferredFragmentNode | StreamRecord; @@ -41,103 +36,56 @@ type SubsequentResultNode = DeferredFragmentNode | StreamRecord; * @internal */ export class IncrementalGraph { - private _pending: Set; + private _rootNodes: Set; private _deferredFragmentNodes: Map< DeferredFragmentRecord, DeferredFragmentNode >; - private _newPending: Set; - private _newIncrementalDataRecords: Set; private _completedQueue: Array; private _nextQueue: Array< (iterable: IteratorResult>) => void >; constructor() { - this._pending = new Set(); + this._rootNodes = new Set(); this._deferredFragmentNodes = new Map(); - this._newIncrementalDataRecords = new Set(); - this._newPending = new Set(); this._completedQueue = []; this._nextQueue = []; } - addIncrementalDataRecords( + getNewRootNodes( incrementalDataRecords: ReadonlyArray, - ): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); - } else { - this._addStreamRecord(incrementalDataRecord); - } - } + ): ReadonlyArray { + const initialResultChildren = new Set(); + this._addIncrementalDataRecords( + incrementalDataRecords, + undefined, + initialResultChildren, + ); + return this._promoteNonEmptyToRoot(initialResultChildren); } addCompletedReconcilableDeferredGroupedFieldSet( reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, ): void { - const deferredFragmentNodes: Array = - reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords - .map((deferredFragmentRecord) => - this._deferredFragmentNodes.get(deferredFragmentRecord), - ) - .filter(isDeferredFragmentNode); - for (const deferredFragmentNode of deferredFragmentNodes) { + for (const deferredFragmentNode of this._fragmentsToNodes( + reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords, + )) { deferredFragmentNode.deferredGroupedFieldSetRecords.delete( reconcilableResult.deferredGroupedFieldSetRecord, ); deferredFragmentNode.reconcilableResults.add(reconcilableResult); } - } - - getNewPending(): ReadonlyArray { - const newPending: Array = []; - for (const node of this._newPending) { - if (isStreamNode(node)) { - this._pending.add(node); - newPending.push(node); - this._newIncrementalDataRecords.add(node); - } else if (node.deferredGroupedFieldSetRecords.size > 0) { - for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); - } - this._pending.add(node); - newPending.push(node.deferredFragmentRecord); - } else { - for (const child of node.children) { - this._newPending.add(child); - } - } - } - this._newPending.clear(); - for (const incrementalDataRecord of this._newIncrementalDataRecords) { - if (isStreamNode(incrementalDataRecord)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._onStreamItems( - incrementalDataRecord, - incrementalDataRecord.streamItemQueue, - ); - } else { - const deferredGroupedFieldSetResult = incrementalDataRecord.result; - const result = - deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue - ? deferredGroupedFieldSetResult.value - : deferredGroupedFieldSetResult().value; - - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => this._enqueue(resolved)); - } else { - this._enqueue(result); - } - } + const incrementalDataRecords = reconcilableResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + this._addIncrementalDataRecords( + incrementalDataRecords, + reconcilableResult.deferredGroupedFieldSetRecord + .deferredFragmentRecords, + ); } - this._newIncrementalDataRecords.clear(); - - return newPending; } completedIncrementalData() { @@ -174,19 +122,22 @@ export class IncrementalGraph { } hasNext(): boolean { - return this._pending.size > 0; + return this._rootNodes.size > 0; } - completeDeferredFragment( - deferredFragmentRecord: DeferredFragmentRecord, - ): Array | undefined { + completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): + | { + newRootNodes: ReadonlyArray; + reconcilableResults: ReadonlyArray; + } + | undefined { const deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, ); // TODO: add test case? /* c8 ignore next 3 */ if (deferredFragmentNode === undefined) { - return undefined; + return; } if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) { return; @@ -194,25 +145,21 @@ export class IncrementalGraph { const reconcilableResults = Array.from( deferredFragmentNode.reconcilableResults, ); + this._removeRootNode(deferredFragmentNode); for (const reconcilableResult of reconcilableResults) { - for (const otherDeferredFragmentRecord of reconcilableResult - .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const otherDeferredFragmentNode = this._deferredFragmentNodes.get( - otherDeferredFragmentRecord, - ); - if (otherDeferredFragmentNode === undefined) { - continue; - } + for (const otherDeferredFragmentNode of this._fragmentsToNodes( + reconcilableResult.deferredGroupedFieldSetRecord + .deferredFragmentRecords, + )) { otherDeferredFragmentNode.reconcilableResults.delete( reconcilableResult, ); } } - this._removePending(deferredFragmentNode); - for (const child of deferredFragmentNode.children) { - this._newPending.add(child); - } - return reconcilableResults; + const newRootNodes = this._promoteNonEmptyToRoot( + deferredFragmentNode.children, + ); + return { newRootNodes, reconcilableResults }; } removeDeferredFragment( @@ -224,51 +171,117 @@ export class IncrementalGraph { if (deferredFragmentNode === undefined) { return false; } - this._removePending(deferredFragmentNode); + this._removeRootNode(deferredFragmentNode); this._deferredFragmentNodes.delete(deferredFragmentRecord); // TODO: add test case for an erroring deferred fragment with child defers - /* c8 ignore next 3 */ + /* c8 ignore next 5 */ for (const child of deferredFragmentNode.children) { - this.removeDeferredFragment(child.deferredFragmentRecord); + if (isDeferredFragmentNode(child)) { + this.removeDeferredFragment(child.deferredFragmentRecord); + } } return true; } removeStream(streamRecord: StreamRecord): void { - this._removePending(streamRecord); + this._removeRootNode(streamRecord); } - private _removePending(subsequentResultNode: SubsequentResultNode): void { - this._pending.delete(subsequentResultNode); - if (this._pending.size === 0) { + private _removeRootNode(subsequentResultNode: SubsequentResultNode): void { + this._rootNodes.delete(subsequentResultNode); + if (this._rootNodes.size === 0) { for (const resolve of this._nextQueue) { resolve({ value: undefined, done: true }); } } } - private _addDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + private _addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + parents: ReadonlyArray | undefined, + initialResultChildren?: Set | undefined, ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const deferredFragmentNode = this._addDeferredFragmentNode( - deferredFragmentRecord, - ); - if (this._pending.has(deferredFragmentNode)) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord); + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { + const deferredFragmentNode = this._addDeferredFragmentNode( + deferredFragmentRecord, + initialResultChildren, + ); + deferredFragmentNode.deferredGroupedFieldSetRecords.add( + incrementalDataRecord, + ); + } + if (this._completesRootNode(incrementalDataRecord)) { + this._onDeferredGroupedFieldSet(incrementalDataRecord); + } + } else if (parents === undefined) { + invariant(initialResultChildren !== undefined); + initialResultChildren.add(incrementalDataRecord); + } else { + for (const parent of parents) { + const deferredFragmentNode = this._addDeferredFragmentNode( + parent, + initialResultChildren, + ); + deferredFragmentNode.children.add(incrementalDataRecord); + } + } + } + } + + private _promoteNonEmptyToRoot( + maybeEmptyNewRootNodes: Set, + ): ReadonlyArray { + const newRootNodes: Array = []; + for (const node of maybeEmptyNewRootNodes) { + if (isDeferredFragmentNode(node)) { + if (node.deferredGroupedFieldSetRecords.size > 0) { + for (const deferredGroupedFieldSetRecord of node.deferredGroupedFieldSetRecords) { + if (!this._completesRootNode(deferredGroupedFieldSetRecord)) { + this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord); + } + } + this._rootNodes.add(node); + newRootNodes.push(node.deferredFragmentRecord); + continue; + } + this._deferredFragmentNodes.delete(node.deferredFragmentRecord); + for (const child of node.children) { + maybeEmptyNewRootNodes.add(child); + } + } else { + this._rootNodes.add(node); + newRootNodes.push(node); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._onStreamItems(node); } - deferredFragmentNode.deferredGroupedFieldSetRecords.add( - deferredGroupedFieldSetRecord, - ); } + return newRootNodes; } - private _addStreamRecord(streamRecord: StreamRecord): void { - this._newPending.add(streamRecord); + private _completesRootNode( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): boolean { + return this._fragmentsToNodes( + deferredGroupedFieldSetRecord.deferredFragmentRecords, + ).some((node) => this._rootNodes.has(node)); + } + + private _fragmentsToNodes( + deferredFragmentRecords: ReadonlyArray, + ): Array { + return deferredFragmentRecords + .map((deferredFragmentRecord) => + this._deferredFragmentNodes.get(deferredFragmentRecord), + ) + .filter(isDeferredFragmentNode); } private _addDeferredFragmentNode( deferredFragmentRecord: DeferredFragmentRecord, + initialResultChildren: Set | undefined, ): DeferredFragmentNode { let deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, @@ -280,7 +293,7 @@ export class IncrementalGraph { deferredFragmentRecord, deferredGroupedFieldSetRecords: new Set(), reconcilableResults: new Set(), - children: [], + children: new Set(), }; this._deferredFragmentNodes.set( deferredFragmentRecord, @@ -288,21 +301,40 @@ export class IncrementalGraph { ); const parent = deferredFragmentRecord.parent; if (parent === undefined) { - this._newPending.add(deferredFragmentNode); + invariant(initialResultChildren !== undefined); + initialResultChildren.add(deferredFragmentNode); return deferredFragmentNode; } - const parentNode = this._addDeferredFragmentNode(parent); - parentNode.children.push(deferredFragmentNode); + const parentNode = this._addDeferredFragmentNode( + parent, + initialResultChildren, + ); + parentNode.children.add(deferredFragmentNode); return deferredFragmentNode; } - private async _onStreamItems( - streamRecord: StreamRecord, - streamItemQueue: Array, - ): Promise { + private _onDeferredGroupedFieldSet( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): void { + const deferredGroupedFieldSetResult = deferredGroupedFieldSetRecord.result; + const result = + deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue + ? deferredGroupedFieldSetResult.value + : deferredGroupedFieldSetResult().value; + + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => this._enqueue(resolved)); + } else { + this._enqueue(result); + } + } + + private async _onStreamItems(streamRecord: StreamRecord): Promise { let items: Array = []; let errors: Array = []; let incrementalDataRecords: Array = []; + const streamItemQueue = streamRecord.streamItemQueue; let streamItemRecord: StreamItemRecord | undefined; while ((streamItemRecord = streamItemQueue.shift()) !== undefined) { let result = diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index b453bde0d3..a625b0e098 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -74,10 +74,11 @@ class IncrementalPublisher { errors: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - const newPending = this._incrementalGraph.getNewPending(); + const newRootNodes = this._incrementalGraph.getNewRootNodes( + incrementalDataRecords, + ); - const pending = this._pendingSourcesToResults(newPending); + const pending = this._toPendingResults(newRootNodes); const initialResult: InitialIncrementalExecutionResult = errors === undefined @@ -90,19 +91,19 @@ class IncrementalPublisher { }; } - private _pendingSourcesToResults( - newPending: ReadonlyArray, + private _toPendingResults( + newRootNodes: ReadonlyArray, ): Array { const pendingResults: Array = []; - for (const pendingSource of newPending) { + for (const node of newRootNodes) { const id = String(this._getNextId()); - pendingSource.id = id; + node.id = id; const pendingResult: PendingResult = { id, - path: pathToArray(pendingSource.path), + path: pathToArray(node.path), }; - if (pendingSource.label !== undefined) { - pendingResult.label = pendingSource.label; + if (node.label !== undefined) { + pendingResult.label = node.label; } pendingResults.push(pendingResult); } @@ -217,8 +218,6 @@ class IncrementalPublisher { } else { this._handleCompletedStreamItems(completedIncrementalData, context); } - const newPending = this._incrementalGraph.getNewPending(); - context.pending.push(...this._pendingSourcesToResults(newPending)); } private _handleCompletedDeferredGroupedFieldSet( @@ -252,22 +251,19 @@ class IncrementalPublisher { deferredGroupedFieldSetResult, ); - const incrementalDataRecords = - deferredGroupedFieldSetResult.incrementalDataRecords; - if (incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const reconcilableResults = - this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); - if (reconcilableResults === undefined) { + const completion = this._incrementalGraph.completeDeferredFragment( + deferredFragmentRecord, + ); + if (completion === undefined) { continue; } const id = deferredFragmentRecord.id; invariant(id !== undefined); const incremental = context.incremental; + const { newRootNodes, reconcilableResults } = completion; + context.pending.push(...this._toPendingResults(newRootNodes)); for (const reconcilableResult of reconcilableResults) { const { bestId, subPath } = this._getBestIdAndSubPath( id, @@ -323,10 +319,12 @@ class IncrementalPublisher { context.incremental.push(incrementalEntry); - if (streamItemsResult.incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords( - streamItemsResult.incrementalDataRecords, + const incrementalDataRecords = streamItemsResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + const newRootNodes = this._incrementalGraph.getNewRootNodes( + incrementalDataRecords, ); + context.pending.push(...this._toPendingResults(newRootNodes)); } } }