diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index cf5e95c285..6520f90cdb 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,4 +1,5 @@ import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; +import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; @@ -7,6 +8,7 @@ import type { GraphQLError } from '../error/GraphQLError.js'; import type { DeferredFragmentRecord, DeferredGroupedFieldSetRecord, + DeferredGroupedFieldSetResult, IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, @@ -14,130 +16,59 @@ import type { StreamRecord, SubsequentResultRecord, } from './types.js'; -import { isDeferredGroupedFieldSetRecord } from './types.js'; - -interface DeferredFragmentNode { - deferredFragmentRecord: DeferredFragmentRecord; - deferredGroupedFieldSetRecords: Set; - reconcilableResults: Set; - children: Array; -} - -function isDeferredFragmentNode( - node: DeferredFragmentNode | undefined, -): node is DeferredFragmentNode { - return node !== undefined; -} - -function isStreamNode( - record: SubsequentResultNode | IncrementalDataRecord, -): record is StreamRecord { - return 'streamItemQueue' in record; -} - -type SubsequentResultNode = DeferredFragmentNode | StreamRecord; +import { + isDeferredFragmentRecord, + isDeferredGroupedFieldSetRecord, +} from './types.js'; /** * @internal */ export class IncrementalGraph { - private _pending: Set; - private _deferredFragmentNodes: Map< - DeferredFragmentRecord, - DeferredFragmentNode - >; + private _rootNodes: Set; - private _newPending: Set; - private _newIncrementalDataRecords: Set; private _completedQueue: Array; private _nextQueue: Array< (iterable: IteratorResult>) => void >; constructor() { - this._pending = new Set(); - this._deferredFragmentNodes = new Map(); - this._newIncrementalDataRecords = new Set(); - this._newPending = new Set(); + this._rootNodes = new Set(); this._completedQueue = []; this._nextQueue = []; } - addIncrementalDataRecords( + getNewRootNodes( incrementalDataRecords: ReadonlyArray, - ): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); - } else { - this._addStreamRecord(incrementalDataRecord); - } - } + ): ReadonlyArray { + const initialResultChildren = new Set(); + this._addIncrementalDataRecords( + incrementalDataRecords, + undefined, + initialResultChildren, + ); + return this._promoteNonEmptyToRoot(initialResultChildren); } addCompletedReconcilableDeferredGroupedFieldSet( reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, ): void { - const deferredFragmentNodes: Array = - reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords - .map((deferredFragmentRecord) => - this._deferredFragmentNodes.get(deferredFragmentRecord), - ) - .filter(isDeferredFragmentNode); - for (const deferredFragmentNode of deferredFragmentNodes) { - deferredFragmentNode.deferredGroupedFieldSetRecords.delete( + for (const deferredFragmentRecord of reconcilableResult + .deferredGroupedFieldSetRecord.deferredFragmentRecords) { + deferredFragmentRecord.deferredGroupedFieldSetRecords.delete( reconcilableResult.deferredGroupedFieldSetRecord, ); - deferredFragmentNode.reconcilableResults.add(reconcilableResult); - } - } - - getNewPending(): ReadonlyArray { - const newPending: Array = []; - for (const node of this._newPending) { - if (isStreamNode(node)) { - this._pending.add(node); - newPending.push(node); - this._newIncrementalDataRecords.add(node); - } else if (node.deferredGroupedFieldSetRecords.size > 0) { - for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); - } - this._pending.add(node); - newPending.push(node.deferredFragmentRecord); - } else { - for (const child of node.children) { - this._newPending.add(child); - } - } + deferredFragmentRecord.reconcilableResults.add(reconcilableResult); } - this._newPending.clear(); - - for (const incrementalDataRecord of this._newIncrementalDataRecords) { - if (isStreamNode(incrementalDataRecord)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._onStreamItems( - incrementalDataRecord, - incrementalDataRecord.streamItemQueue, - ); - } else { - const deferredGroupedFieldSetResult = incrementalDataRecord.result; - const result = - deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue - ? deferredGroupedFieldSetResult.value - : deferredGroupedFieldSetResult().value; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => this._enqueue(resolved)); - } else { - this._enqueue(result); - } - } + const incrementalDataRecords = reconcilableResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + this._addIncrementalDataRecords( + incrementalDataRecords, + reconcilableResult.deferredGroupedFieldSetRecord + .deferredFragmentRecords, + ); } - this._newIncrementalDataRecords.clear(); - - return newPending; } completedIncrementalData() { @@ -174,135 +105,170 @@ export class IncrementalGraph { } hasNext(): boolean { - return this._pending.size > 0; + return this._rootNodes.size > 0; } - completeDeferredFragment( - deferredFragmentRecord: DeferredFragmentRecord, - ): Array | undefined { - const deferredFragmentNode = this._deferredFragmentNodes.get( - deferredFragmentRecord, - ); - // TODO: add test case? - /* c8 ignore next 3 */ - if (deferredFragmentNode === undefined) { - return undefined; - } - if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) { + completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): + | { + newRootNodes: ReadonlyArray; + reconcilableResults: ReadonlyArray; + } + | undefined { + if ( + !this._rootNodes.has(deferredFragmentRecord) || + deferredFragmentRecord.deferredGroupedFieldSetRecords.size > 0 + ) { return; } const reconcilableResults = Array.from( - deferredFragmentNode.reconcilableResults, + deferredFragmentRecord.reconcilableResults, ); + this._removeRootNode(deferredFragmentRecord); for (const reconcilableResult of reconcilableResults) { for (const otherDeferredFragmentRecord of reconcilableResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const otherDeferredFragmentNode = this._deferredFragmentNodes.get( - otherDeferredFragmentRecord, - ); - if (otherDeferredFragmentNode === undefined) { - continue; - } - otherDeferredFragmentNode.reconcilableResults.delete( + otherDeferredFragmentRecord.reconcilableResults.delete( reconcilableResult, ); } } - this._removePending(deferredFragmentNode); - for (const child of deferredFragmentNode.children) { - this._newPending.add(child); - } - return reconcilableResults; + const newRootNodes = this._promoteNonEmptyToRoot( + deferredFragmentRecord.children, + ); + return { newRootNodes, reconcilableResults }; } removeDeferredFragment( deferredFragmentRecord: DeferredFragmentRecord, ): boolean { - const deferredFragmentNode = this._deferredFragmentNodes.get( - deferredFragmentRecord, - ); - if (deferredFragmentNode === undefined) { + if (!this._rootNodes.has(deferredFragmentRecord)) { return false; } - this._removePending(deferredFragmentNode); - this._deferredFragmentNodes.delete(deferredFragmentRecord); - // TODO: add test case for an erroring deferred fragment with child defers - /* c8 ignore next 3 */ - for (const child of deferredFragmentNode.children) { - this.removeDeferredFragment(child.deferredFragmentRecord); - } + this._removeRootNode(deferredFragmentRecord); return true; } removeStream(streamRecord: StreamRecord): void { - this._removePending(streamRecord); + this._removeRootNode(streamRecord); } - private _removePending(subsequentResultNode: SubsequentResultNode): void { - this._pending.delete(subsequentResultNode); - if (this._pending.size === 0) { + private _removeRootNode( + subsequentResultRecord: SubsequentResultRecord, + ): void { + this._rootNodes.delete(subsequentResultRecord); + if (this._rootNodes.size === 0) { for (const resolve of this._nextQueue) { resolve({ value: undefined, done: true }); } } } - private _addDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + private _addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + parents: ReadonlyArray | undefined, + initialResultChildren?: Set | undefined, ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const deferredFragmentNode = this._addDeferredFragmentNode( - deferredFragmentRecord, - ); - if (this._pending.has(deferredFragmentNode)) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord); + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { + this._addDeferredFragment( + deferredFragmentRecord, + initialResultChildren, + ); + deferredFragmentRecord.deferredGroupedFieldSetRecords.add( + incrementalDataRecord, + ); + } + if (this._completesRootNode(incrementalDataRecord)) { + this._onDeferredGroupedFieldSet(incrementalDataRecord); + } + } else if (parents === undefined) { + invariant(initialResultChildren !== undefined); + initialResultChildren.add(incrementalDataRecord); + } else { + for (const parent of parents) { + this._addDeferredFragment(parent, initialResultChildren); + parent.children.add(incrementalDataRecord); + } } - deferredFragmentNode.deferredGroupedFieldSetRecords.add( - deferredGroupedFieldSetRecord, - ); } } - private _addStreamRecord(streamRecord: StreamRecord): void { - this._newPending.add(streamRecord); + private _promoteNonEmptyToRoot( + maybeEmptyNewRootNodes: Set, + ): ReadonlyArray { + const newRootNodes: Array = []; + for (const node of maybeEmptyNewRootNodes) { + if (isDeferredFragmentRecord(node)) { + if (node.deferredGroupedFieldSetRecords.size > 0) { + node.setAsPending(); + for (const deferredGroupedFieldSetRecord of node.deferredGroupedFieldSetRecords) { + if (!this._completesRootNode(deferredGroupedFieldSetRecord)) { + this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord); + } + } + this._rootNodes.add(node); + newRootNodes.push(node); + continue; + } + for (const child of node.children) { + maybeEmptyNewRootNodes.add(child); + } + } else { + this._rootNodes.add(node); + newRootNodes.push(node); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._onStreamItems(node); + } + } + return newRootNodes; } - private _addDeferredFragmentNode( - deferredFragmentRecord: DeferredFragmentRecord, - ): DeferredFragmentNode { - let deferredFragmentNode = this._deferredFragmentNodes.get( - deferredFragmentRecord, + private _completesRootNode( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): boolean { + return deferredGroupedFieldSetRecord.deferredFragmentRecords.some( + (deferredFragmentRecord) => this._rootNodes.has(deferredFragmentRecord), ); - if (deferredFragmentNode !== undefined) { - return deferredFragmentNode; + } + + private _addDeferredFragment( + deferredFragmentRecord: DeferredFragmentRecord, + subsequentResultRecords: Set | undefined, + ): void { + if (this._rootNodes.has(deferredFragmentRecord)) { + return; } - deferredFragmentNode = { - deferredFragmentRecord, - deferredGroupedFieldSetRecords: new Set(), - reconcilableResults: new Set(), - children: [], - }; - this._deferredFragmentNodes.set( - deferredFragmentRecord, - deferredFragmentNode, - ); const parent = deferredFragmentRecord.parent; if (parent === undefined) { - this._newPending.add(deferredFragmentNode); - return deferredFragmentNode; + invariant(subsequentResultRecords !== undefined); + subsequentResultRecords.add(deferredFragmentRecord); + return; + } + parent.children.add(deferredFragmentRecord); + this._addDeferredFragment(parent, subsequentResultRecords); + } + + private _onDeferredGroupedFieldSet( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): void { + const result = ( + deferredGroupedFieldSetRecord.result as BoxedPromiseOrValue + ).value; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => this._enqueue(resolved)); + } else { + this._enqueue(result); } - const parentNode = this._addDeferredFragmentNode(parent); - parentNode.children.push(deferredFragmentNode); - return deferredFragmentNode; } - private async _onStreamItems( - streamRecord: StreamRecord, - streamItemQueue: Array, - ): Promise { + private async _onStreamItems(streamRecord: StreamRecord): Promise { let items: Array = []; let errors: Array = []; let incrementalDataRecords: Array = []; + const streamItemQueue = streamRecord.streamItemQueue; let streamItemRecord: StreamItemRecord | undefined; while ((streamItemRecord = streamItemQueue.shift()) !== undefined) { let result = diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index b453bde0d3..55d8bb2ef7 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -60,12 +60,10 @@ interface SubsequentIncrementalExecutionResultContext { */ class IncrementalPublisher { private _context: IncrementalPublisherContext; - private _nextId: number; private _incrementalGraph: IncrementalGraph; constructor(context: IncrementalPublisherContext) { this._context = context; - this._nextId = 0; this._incrementalGraph = new IncrementalGraph(); } @@ -74,10 +72,11 @@ class IncrementalPublisher { errors: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - const newPending = this._incrementalGraph.getNewPending(); + const newRootNodes = this._incrementalGraph.getNewRootNodes( + incrementalDataRecords, + ); - const pending = this._pendingSourcesToResults(newPending); + const pending = this._toPendingResults(newRootNodes); const initialResult: InitialIncrementalExecutionResult = errors === undefined @@ -90,29 +89,23 @@ class IncrementalPublisher { }; } - private _pendingSourcesToResults( - newPending: ReadonlyArray, + private _toPendingResults( + newRootNodes: ReadonlyArray, ): Array { const pendingResults: Array = []; - for (const pendingSource of newPending) { - const id = String(this._getNextId()); - pendingSource.id = id; + for (const node of newRootNodes) { const pendingResult: PendingResult = { - id, - path: pathToArray(pendingSource.path), + id: node.id, + path: pathToArray(node.path), }; - if (pendingSource.label !== undefined) { - pendingResult.label = pendingSource.label; + if (node.label !== undefined) { + pendingResult.label = node.label; } pendingResults.push(pendingResult); } return pendingResults; } - private _getNextId(): string { - return String(this._nextId++); - } - private _subscribe(): AsyncGenerator< SubsequentIncrementalExecutionResult, void, @@ -217,8 +210,6 @@ class IncrementalPublisher { } else { this._handleCompletedStreamItems(completedIncrementalData, context); } - const newPending = this._incrementalGraph.getNewPending(); - context.pending.push(...this._pendingSourcesToResults(newPending)); } private _handleCompletedDeferredGroupedFieldSet( @@ -232,16 +223,14 @@ class IncrementalPublisher { ) { for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const id = deferredFragmentRecord.id; if ( !this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord) ) { // This can occur if multiple deferred grouped field sets error for a fragment. continue; } - invariant(id !== undefined); context.completed.push({ - id, + id: deferredFragmentRecord.id, errors: deferredGroupedFieldSetResult.errors, }); } @@ -252,22 +241,18 @@ class IncrementalPublisher { deferredGroupedFieldSetResult, ); - const incrementalDataRecords = - deferredGroupedFieldSetResult.incrementalDataRecords; - if (incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const reconcilableResults = - this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); - if (reconcilableResults === undefined) { + const completion = this._incrementalGraph.completeDeferredFragment( + deferredFragmentRecord, + ); + if (completion === undefined) { continue; } - const id = deferredFragmentRecord.id; - invariant(id !== undefined); const incremental = context.incremental; + const { newRootNodes, reconcilableResults } = completion; + context.pending.push(...this._toPendingResults(newRootNodes)); + const id = deferredFragmentRecord.id; for (const reconcilableResult of reconcilableResults) { const { bestId, subPath } = this._getBestIdAndSubPath( id, @@ -293,7 +278,6 @@ class IncrementalPublisher { ): void { const streamRecord = streamItemsResult.streamRecord; const id = streamRecord.id; - invariant(id !== undefined); if (streamItemsResult.errors !== undefined) { context.completed.push({ id, @@ -323,10 +307,12 @@ class IncrementalPublisher { context.incremental.push(incrementalEntry); - if (streamItemsResult.incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords( - streamItemsResult.incrementalDataRecords, + const incrementalDataRecords = streamItemsResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + const newRootNodes = this._incrementalGraph.getNewRootNodes( + incrementalDataRecords, ); + context.pending.push(...this._toPendingResults(newRootNodes)); } } } diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index e74aebb9ae..b28ce39cbb 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -1,10 +1,12 @@ -import { expect } from 'chai'; +import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; +import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; + import type { DocumentNode } from '../../language/ast.js'; import { parse } from '../../language/parser.js'; @@ -605,12 +607,12 @@ describe('Execute: defer directive', () => { data: { hero: {}, }, - pending: [{ id: '0', path: ['hero'] }], + pending: [{ id: '1', path: ['hero'] }], hasNext: true, }, { - incremental: [{ data: { name: 'Luke' }, id: '0' }], - completed: [{ id: '0' }], + incremental: [{ data: { name: 'Luke' }, id: '1' }], + completed: [{ id: '1' }], hasNext: false, }, ]); @@ -785,8 +787,8 @@ describe('Execute: defer directive', () => { hero: {}, }, pending: [ - { id: '0', path: ['hero'], label: 'DeferID' }, - { id: '1', path: [], label: 'DeferName' }, + { id: '1', path: ['hero'], label: 'DeferID' }, + { id: '0', path: [], label: 'DeferName' }, ], hasNext: true, }, @@ -796,17 +798,17 @@ describe('Execute: defer directive', () => { data: { id: '1', }, - id: '0', + id: '1', }, { data: { name: 'Luke', }, - id: '1', + id: '0', subPath: ['hero'], }, ], - completed: [{ id: '0' }, { id: '1' }], + completed: [{ id: '1' }, { id: '0' }], hasNext: false, }, ]); @@ -856,6 +858,134 @@ describe('Execute: defer directive', () => { ]); }); + it('Initiates all deferred grouped field sets immediately if and only if they have been released as pending', async () => { + const document = parse(` + query { + ... @defer { + a { + ... @defer { + b { + c { d } + } + } + } + } + ... @defer { + a { + someField + ... @defer { + b { + e { f } + } + } + } + } + } + `); + + const { promise: slowFieldPromise, resolve: resolveSlowField } = + promiseWithResolvers(); + let cResolverCalled = false; + let eResolverCalled = false; + const executeResult = experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + a: { + someField: slowFieldPromise, + b: { + c: () => { + cResolverCalled = true; + return { d: 'd' }; + }, + e: () => { + eResolverCalled = true; + return { f: 'f' }; + }, + }, + }, + }, + enableEarlyExecution: false, + }); + + assert('initialResult' in executeResult); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }); + + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + expect(cResolverCalled).to.equal(false); + expect(eResolverCalled).to.equal(false); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + value: { + pending: [{ id: '2', path: ['a'] }], + incremental: [ + { + data: { a: {} }, + id: '0', + }, + { + data: { b: {} }, + id: '2', + }, + { + data: { c: { d: 'd' } }, + id: '2', + subPath: ['b'], + }, + ], + completed: [{ id: '0' }, { id: '2' }], + hasNext: true, + }, + done: false, + }); + + expect(cResolverCalled).to.equal(true); + expect(eResolverCalled).to.equal(false); + + resolveSlowField('someField'); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + pending: [{ id: '3', path: ['a'] }], + incremental: [ + { + data: { someField: 'someField' }, + id: '1', + subPath: ['a'], + }, + { + data: { e: { f: 'f' } }, + id: '3', + subPath: ['b'], + }, + ], + completed: [{ id: '1' }, { id: '3' }], + hasNext: false, + }, + done: false, + }); + + expect(eResolverCalled).to.equal(true); + + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('Can deduplicate multiple defers on the same object', async () => { const document = parse(` query { @@ -889,18 +1019,18 @@ describe('Execute: defer directive', () => { data: { hero: { friends: [{}, {}, {}] } }, pending: [ { id: '0', path: ['hero', 'friends', 0] }, - { id: '1', path: ['hero', 'friends', 1] }, - { id: '2', path: ['hero', 'friends', 2] }, + { id: '4', path: ['hero', 'friends', 1] }, + { id: '8', path: ['hero', 'friends', 2] }, ], hasNext: true, }, { incremental: [ { data: { id: '2', name: 'Han' }, id: '0' }, - { data: { id: '3', name: 'Leia' }, id: '1' }, - { data: { id: '4', name: 'C-3PO' }, id: '2' }, + { data: { id: '3', name: 'Leia' }, id: '4' }, + { data: { id: '4', name: 'C-3PO' }, id: '8' }, ], - completed: [{ id: '0' }, { id: '1' }, { id: '2' }], + completed: [{ id: '0' }, { id: '4' }, { id: '8' }], hasNext: false, }, ]); @@ -1145,8 +1275,8 @@ describe('Execute: defer directive', () => { }, }, pending: [ - { id: '0', path: ['hero', 'nestedObject', 'deeperObject'] }, { id: '1', path: ['hero', 'nestedObject', 'deeperObject'] }, + { id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }, ], hasNext: true, }, @@ -1156,16 +1286,16 @@ describe('Execute: defer directive', () => { data: { foo: 'foo', }, - id: '0', + id: '1', }, { data: { bar: 'bar', }, - id: '1', + id: '2', }, ], - completed: [{ id: '0' }, { id: '1' }], + completed: [{ id: '1' }, { id: '2' }], hasNext: false, }, ]); @@ -1221,8 +1351,8 @@ describe('Execute: defer directive', () => { }, }, pending: [ - { id: '0', path: ['a', 'b'] }, - { id: '1', path: [] }, + { id: '1', path: ['a', 'b'] }, + { id: '0', path: [] }, ], hasNext: true, }, @@ -1230,14 +1360,14 @@ describe('Execute: defer directive', () => { incremental: [ { data: { e: { f: 'f' } }, - id: '0', + id: '1', }, { data: { g: { h: 'h' } }, - id: '1', + id: '0', }, ], - completed: [{ id: '0' }, { id: '1' }], + completed: [{ id: '1' }, { id: '0' }], hasNext: false, }, ]); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 15ad4028a5..b8eaf695d7 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1978,14 +1978,14 @@ describe('Execute: stream directive', () => { nestedFriendList: [], }, }, - pending: [{ id: '0', path: ['nestedObject', 'nestedFriendList'] }], + pending: [{ id: '1', path: ['nestedObject', 'nestedFriendList'] }], hasNext: true, }, { incremental: [ { items: [{ id: '1', name: 'Luke' }], - id: '0', + id: '1', }, ], hasNext: true, @@ -1994,13 +1994,13 @@ describe('Execute: stream directive', () => { incremental: [ { items: [{ id: '2', name: 'Han' }], - id: '0', + id: '1', }, ], hasNext: true, }, { - completed: [{ id: '0' }], + completed: [{ id: '1' }], hasNext: false, }, ]); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index d01b6ee768..4fab9c460f 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -63,7 +63,6 @@ import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import type { CancellableStreamRecord, - DeferredFragmentRecord, DeferredGroupedFieldSetRecord, DeferredGroupedFieldSetResult, ExecutionResult, @@ -73,6 +72,7 @@ import type { StreamItemResult, StreamRecord, } from './types.js'; +import { DeferredFragmentRecord } from './types.js'; import { getArgumentValues, getDirectiveValues, @@ -142,6 +142,7 @@ export interface ExecutionContext { subscribeFieldResolver: GraphQLFieldResolver; enableEarlyExecution: boolean; errors: Array | undefined; + nextId: number; cancellableStreams: Set | undefined; } @@ -299,7 +300,11 @@ function executeOperation( const fieldPLan = buildFieldPlan(groupedFieldSet); groupedFieldSet = fieldPLan.groupedFieldSet; const newGroupedFieldSets = fieldPLan.newGroupedFieldSets; - const newDeferMap = addNewDeferredFragments(newDeferUsages, new Map()); + const newDeferMap = addNewDeferredFragments( + exeContext, + newDeferUsages, + new Map(), + ); graphqlWrappedResult = executeRootGroupedFieldSet( exeContext, @@ -505,6 +510,7 @@ export function buildExecutionContext( subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, enableEarlyExecution: enableEarlyExecution === true, errors: undefined, + nextId: 0, cancellableStreams: undefined, }; } @@ -1113,6 +1119,7 @@ async function completeAsyncIteratorValue( streamRecord = { label: streamUsage.label, path, + id: String(exeContext.nextId++), streamItemQueue, }; } else { @@ -1120,6 +1127,7 @@ async function completeAsyncIteratorValue( label: streamUsage.label, path, streamItemQueue, + id: String(exeContext.nextId++), earlyReturn: returnFn.bind(asyncIterator), }; if (exeContext.cancellableStreams === undefined) { @@ -1274,6 +1282,7 @@ function completeIterableValue( const syncStreamRecord: StreamRecord = { label: streamUsage.label, path, + id: String(exeContext.nextId++), streamItemQueue: buildSyncStreamItemQueue( item, index, @@ -1662,6 +1671,7 @@ function invalidReturnTypeError( * */ function addNewDeferredFragments( + exeContext: ExecutionContext, newDeferUsages: ReadonlyArray, newDeferMap: Map, path?: Path | undefined, @@ -1676,11 +1686,12 @@ function addNewDeferredFragments( : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. - const deferredFragmentRecord: DeferredFragmentRecord = { + const deferredFragmentRecord = new DeferredFragmentRecord( path, - label: newDeferUsage.label, + newDeferUsage.label, + String(exeContext.nextId++), parent, - }; + ); // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); @@ -1733,6 +1744,7 @@ function collectAndExecuteSubfields( groupedFieldSet = subFieldPlan.groupedFieldSet; const newGroupedFieldSets = subFieldPlan.newGroupedFieldSets; const newDeferMap = addNewDeferredFragments( + exeContext, newDeferUsages, new Map(deferMap), path, @@ -2114,16 +2126,25 @@ function executeDeferredGroupedFieldSets( deferMap, ); - const shouldDeferThisDeferUsageSet = shouldDefer( - parentDeferUsages, - deferUsageSet, - ); - - deferredGroupedFieldSetRecord.result = shouldDeferThisDeferUsageSet - ? exeContext.enableEarlyExecution - ? new BoxedPromiseOrValue(Promise.resolve().then(executor)) - : () => new BoxedPromiseOrValue(executor()) - : new BoxedPromiseOrValue(executor()); + if (exeContext.enableEarlyExecution) { + deferredGroupedFieldSetRecord.result = new BoxedPromiseOrValue( + shouldDefer(parentDeferUsages, deferUsageSet) + ? Promise.resolve().then(executor) + : executor(), + ); + } else { + deferredGroupedFieldSetRecord.result = () => + new BoxedPromiseOrValue(executor()); + const resolveThunk = () => { + const maybeThunk = deferredGroupedFieldSetRecord.result; + if (!(maybeThunk instanceof BoxedPromiseOrValue)) { + deferredGroupedFieldSetRecord.result = maybeThunk(); + } + }; + for (const deferredFragmentRecord of deferredFragmentRecords) { + deferredFragmentRecord.onPending(resolveThunk); + } + } newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } diff --git a/src/execution/types.ts b/src/execution/types.ts index 50f9a083f8..8eec6c121a 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -214,11 +214,57 @@ export interface DeferredGroupedFieldSetRecord { export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; -export interface DeferredFragmentRecord { +/** @internal */ +export class DeferredFragmentRecord { path: Path | undefined; label: string | undefined; - id?: string | undefined; + id: string; parent: DeferredFragmentRecord | undefined; + deferredGroupedFieldSetRecords: Set; + reconcilableResults: Set; + children: Set; + + private pending: boolean; + private fns: Array<() => void>; + + constructor( + path: Path | undefined, + label: string | undefined, + id: string, + parent: DeferredFragmentRecord | undefined, + ) { + this.path = path; + this.label = label; + this.id = id; + this.parent = parent; + this.deferredGroupedFieldSetRecords = new Set(); + this.reconcilableResults = new Set(); + this.children = new Set(); + this.pending = false; + this.fns = []; + } + + onPending(fn: () => void): void { + if (this.pending) { + fn(); + } else { + this.fns.push(fn); + } + } + + setAsPending(): void { + this.pending = true; + let fn; + while ((fn = this.fns.shift()) !== undefined) { + fn(); + } + } +} + +export function isDeferredFragmentRecord( + subsequentResultRecord: SubsequentResultRecord, +): subsequentResultRecord is DeferredFragmentRecord { + return subsequentResultRecord instanceof DeferredFragmentRecord; } export interface StreamItemResult { @@ -232,7 +278,7 @@ export type StreamItemRecord = ThunkIncrementalResult; export interface StreamRecord { path: Path; label: string | undefined; - id?: string | undefined; + id: string; streamItemQueue: Array; }