From 32b97d90252a62ea534020625aa0c3fbf7a4227f Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 14 Jun 2024 13:42:42 +0300 Subject: [PATCH 1/5] polish(IncrementalPublisher): remove unnecessary check and method call = completeDeferredFragment will always return undefined if the deferredFragmentRecord has already been removed from the graph, so this does not need to be checked separately. = removeDeferredFragment need not be called twice. --- src/execution/IncrementalPublisher.ts | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 87fe548628..b453bde0d3 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -244,7 +244,6 @@ class IncrementalPublisher { id, errors: deferredGroupedFieldSetResult.errors, }); - this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord); } return; } @@ -261,19 +260,13 @@ class IncrementalPublisher { for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const id = deferredFragmentRecord.id; - // TODO: add test case for this. - // Presumably, this can occur if an error causes a fragment to be completed early, - // while an asynchronous deferred grouped field set result is enqueued. - /* c8 ignore next 3 */ - if (id === undefined) { - continue; - } const reconcilableResults = this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); if (reconcilableResults === undefined) { continue; } + const id = deferredFragmentRecord.id; + invariant(id !== undefined); const incremental = context.incremental; for (const reconcilableResult of reconcilableResults) { const { bestId, subPath } = this._getBestIdAndSubPath( From 7b6319f12da419a65961eaebc9df8ca4c1fcb4fd Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 14 Jun 2024 13:41:06 +0300 Subject: [PATCH 2/5] use map instead of mutating --- src/execution/IncrementalPublisher.ts | 16 +++++++++++----- src/execution/types.ts | 2 -- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index b453bde0d3..3b7d92ab1a 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -60,11 +60,13 @@ interface SubsequentIncrementalExecutionResultContext { */ class IncrementalPublisher { private _context: IncrementalPublisherContext; + private _ids: Map; private _nextId: number; private _incrementalGraph: IncrementalGraph; constructor(context: IncrementalPublisherContext) { this._context = context; + this._ids = new Map(); this._nextId = 0; this._incrementalGraph = new IncrementalGraph(); } @@ -96,7 +98,7 @@ class IncrementalPublisher { const pendingResults: Array = []; for (const pendingSource of newPending) { const id = String(this._getNextId()); - pendingSource.id = id; + this._ids.set(pendingSource, id); const pendingResult: PendingResult = { id, path: pathToArray(pendingSource.path), @@ -232,14 +234,15 @@ class IncrementalPublisher { ) { for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const id = deferredFragmentRecord.id; if ( !this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord) ) { // This can occur if multiple deferred grouped field sets error for a fragment. continue; } + const id = this._ids.get(deferredFragmentRecord); invariant(id !== undefined); + this._ids.delete(deferredFragmentRecord); context.completed.push({ id, errors: deferredGroupedFieldSetResult.errors, @@ -265,7 +268,7 @@ class IncrementalPublisher { if (reconcilableResults === undefined) { continue; } - const id = deferredFragmentRecord.id; + const id = this._ids.get(deferredFragmentRecord); invariant(id !== undefined); const incremental = context.incremental; for (const reconcilableResult of reconcilableResults) { @@ -283,6 +286,7 @@ class IncrementalPublisher { } incremental.push(incrementalEntry); } + this._ids.delete(deferredFragmentRecord); context.completed.push({ id }); } } @@ -292,9 +296,10 @@ class IncrementalPublisher { context: SubsequentIncrementalExecutionResultContext, ): void { const streamRecord = streamItemsResult.streamRecord; - const id = streamRecord.id; + const id = this._ids.get(streamRecord); invariant(id !== undefined); if (streamItemsResult.errors !== undefined) { + this._ids.delete(streamRecord); context.completed.push({ id, errors: streamItemsResult.errors, @@ -309,6 +314,7 @@ class IncrementalPublisher { }); } } else if (streamItemsResult.result === undefined) { + this._ids.delete(streamRecord); context.completed.push({ id }); this._incrementalGraph.removeStream(streamRecord); if (isCancellableStreamRecord(streamRecord)) { @@ -344,7 +350,7 @@ class IncrementalPublisher { if (deferredFragmentRecord === initialDeferredFragmentRecord) { continue; } - const id = deferredFragmentRecord.id; + const id = this._ids.get(deferredFragmentRecord); // TODO: add test case for when an fragment has not been released, but might be processed for the shortest path. /* c8 ignore next 3 */ if (id === undefined) { diff --git a/src/execution/types.ts b/src/execution/types.ts index 50f9a083f8..4deb841a03 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -217,7 +217,6 @@ export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; export interface DeferredFragmentRecord { path: Path | undefined; label: string | undefined; - id?: string | undefined; parent: DeferredFragmentRecord | undefined; } @@ -232,7 +231,6 @@ export type StreamItemRecord = ThunkIncrementalResult; export interface StreamRecord { path: Path; label: string | undefined; - id?: string | undefined; streamItemQueue: Array; } From 605bd1fd970f3ba7db6dc126863002cfcdae2949 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 16 Jun 2024 17:34:44 +0300 Subject: [PATCH 3/5] polish(incremental): refactor getNewPending return newPending alongside incremental results --- src/execution/IncrementalGraph.ts | 257 ++++++++++++++------------ src/execution/IncrementalPublisher.ts | 30 ++- 2 files changed, 157 insertions(+), 130 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index cf5e95c285..e6f7df24f7 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,4 +1,5 @@ import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; +import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; @@ -20,19 +21,13 @@ interface DeferredFragmentNode { deferredFragmentRecord: DeferredFragmentRecord; deferredGroupedFieldSetRecords: Set; reconcilableResults: Set; - children: Array; + children: Set; } function isDeferredFragmentNode( - node: DeferredFragmentNode | undefined, + node: SubsequentResultNode | undefined, ): node is DeferredFragmentNode { - return node !== undefined; -} - -function isStreamNode( - record: SubsequentResultNode | IncrementalDataRecord, -): record is StreamRecord { - return 'streamItemQueue' in record; + return node !== undefined && 'deferredFragmentRecord' in node; } type SubsequentResultNode = DeferredFragmentNode | StreamRecord; @@ -47,8 +42,6 @@ export class IncrementalGraph { DeferredFragmentNode >; - private _newPending: Set; - private _newIncrementalDataRecords: Set; private _completedQueue: Array; private _nextQueue: Array< (iterable: IteratorResult>) => void @@ -57,87 +50,42 @@ export class IncrementalGraph { constructor() { this._pending = new Set(); this._deferredFragmentNodes = new Map(); - this._newIncrementalDataRecords = new Set(); - this._newPending = new Set(); this._completedQueue = []; this._nextQueue = []; } - addIncrementalDataRecords( + getNewPending( incrementalDataRecords: ReadonlyArray, - ): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); - } else { - this._addStreamRecord(incrementalDataRecord); - } - } + ): ReadonlyArray { + const newPending = new Set(); + this._addIncrementalDataRecords( + incrementalDataRecords, + undefined, + newPending, + ); + return this._pendingNodesToResults(newPending); } addCompletedReconcilableDeferredGroupedFieldSet( reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, ): void { - const deferredFragmentNodes: Array = - reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords - .map((deferredFragmentRecord) => - this._deferredFragmentNodes.get(deferredFragmentRecord), - ) - .filter(isDeferredFragmentNode); - for (const deferredFragmentNode of deferredFragmentNodes) { + for (const deferredFragmentNode of this._fragmentsToNodes( + reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords, + )) { deferredFragmentNode.deferredGroupedFieldSetRecords.delete( reconcilableResult.deferredGroupedFieldSetRecord, ); deferredFragmentNode.reconcilableResults.add(reconcilableResult); } - } - - getNewPending(): ReadonlyArray { - const newPending: Array = []; - for (const node of this._newPending) { - if (isStreamNode(node)) { - this._pending.add(node); - newPending.push(node); - this._newIncrementalDataRecords.add(node); - } else if (node.deferredGroupedFieldSetRecords.size > 0) { - for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); - } - this._pending.add(node); - newPending.push(node.deferredFragmentRecord); - } else { - for (const child of node.children) { - this._newPending.add(child); - } - } - } - this._newPending.clear(); - - for (const incrementalDataRecord of this._newIncrementalDataRecords) { - if (isStreamNode(incrementalDataRecord)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._onStreamItems( - incrementalDataRecord, - incrementalDataRecord.streamItemQueue, - ); - } else { - const deferredGroupedFieldSetResult = incrementalDataRecord.result; - const result = - deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue - ? deferredGroupedFieldSetResult.value - : deferredGroupedFieldSetResult().value; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => this._enqueue(resolved)); - } else { - this._enqueue(result); - } - } + const incrementalDataRecords = reconcilableResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + this._addIncrementalDataRecords( + incrementalDataRecords, + reconcilableResult.deferredGroupedFieldSetRecord + .deferredFragmentRecords, + ); } - this._newIncrementalDataRecords.clear(); - - return newPending; } completedIncrementalData() { @@ -177,9 +125,12 @@ export class IncrementalGraph { return this._pending.size > 0; } - completeDeferredFragment( - deferredFragmentRecord: DeferredFragmentRecord, - ): Array | undefined { + completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): + | { + newPending: ReadonlyArray; + reconcilableResults: ReadonlyArray; + } + | undefined { const deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, ); @@ -194,25 +145,21 @@ export class IncrementalGraph { const reconcilableResults = Array.from( deferredFragmentNode.reconcilableResults, ); + this._removePending(deferredFragmentNode); for (const reconcilableResult of reconcilableResults) { - for (const otherDeferredFragmentRecord of reconcilableResult - .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const otherDeferredFragmentNode = this._deferredFragmentNodes.get( - otherDeferredFragmentRecord, - ); - if (otherDeferredFragmentNode === undefined) { - continue; - } + for (const otherDeferredFragmentNode of this._fragmentsToNodes( + reconcilableResult.deferredGroupedFieldSetRecord + .deferredFragmentRecords, + )) { otherDeferredFragmentNode.reconcilableResults.delete( reconcilableResult, ); } } - this._removePending(deferredFragmentNode); - for (const child of deferredFragmentNode.children) { - this._newPending.add(child); - } - return reconcilableResults; + const newPending = this._pendingNodesToResults( + deferredFragmentNode.children, + ); + return { newPending, reconcilableResults }; } removeDeferredFragment( @@ -227,9 +174,11 @@ export class IncrementalGraph { this._removePending(deferredFragmentNode); this._deferredFragmentNodes.delete(deferredFragmentRecord); // TODO: add test case for an erroring deferred fragment with child defers - /* c8 ignore next 3 */ + /* c8 ignore next 5 */ for (const child of deferredFragmentNode.children) { - this.removeDeferredFragment(child.deferredFragmentRecord); + if (isDeferredFragmentNode(child)) { + this.removeDeferredFragment(child.deferredFragmentRecord); + } } return true; } @@ -247,28 +196,92 @@ export class IncrementalGraph { } } - private _addDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + private _addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + parents: ReadonlyArray | undefined, + newPending?: Set | undefined, ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const deferredFragmentNode = this._addDeferredFragmentNode( - deferredFragmentRecord, - ); - if (this._pending.has(deferredFragmentNode)) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord); + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { + const deferredFragmentNode = this._addDeferredFragmentNode( + deferredFragmentRecord, + newPending, + ); + deferredFragmentNode.deferredGroupedFieldSetRecords.add( + incrementalDataRecord, + ); + } + if (this._hasPendingFragment(incrementalDataRecord)) { + this._onDeferredGroupedFieldSet(incrementalDataRecord); + } + } else if (parents === undefined) { + invariant(newPending !== undefined); + newPending.add(incrementalDataRecord); + } else { + for (const parent of parents) { + const deferredFragmentNode = this._addDeferredFragmentNode( + parent, + newPending, + ); + deferredFragmentNode.children.add(incrementalDataRecord); + } } - deferredFragmentNode.deferredGroupedFieldSetRecords.add( - deferredGroupedFieldSetRecord, - ); } } - private _addStreamRecord(streamRecord: StreamRecord): void { - this._newPending.add(streamRecord); + private _pendingNodesToResults( + newPendingNodes: Set, + ): ReadonlyArray { + const newPendingResults: Array = []; + for (const node of newPendingNodes) { + if (isDeferredFragmentNode(node)) { + if (node.deferredGroupedFieldSetRecords.size > 0) { + for (const deferredGroupedFieldSetRecord of node.deferredGroupedFieldSetRecords) { + if (!this._hasPendingFragment(deferredGroupedFieldSetRecord)) { + this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord); + } + } + this._pending.add(node); + newPendingResults.push(node.deferredFragmentRecord); + continue; + } + this._deferredFragmentNodes.delete(node.deferredFragmentRecord); + for (const child of node.children) { + newPendingNodes.add(child); + } + } else { + this._pending.add(node); + newPendingResults.push(node); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._onStreamItems(node); + } + } + return newPendingResults; + } + + private _hasPendingFragment( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): boolean { + return this._fragmentsToNodes( + deferredGroupedFieldSetRecord.deferredFragmentRecords, + ).some((node) => this._pending.has(node)); + } + + private _fragmentsToNodes( + deferredFragmentRecords: ReadonlyArray, + ): Array { + return deferredFragmentRecords + .map((deferredFragmentRecord) => + this._deferredFragmentNodes.get(deferredFragmentRecord), + ) + .filter(isDeferredFragmentNode); } private _addDeferredFragmentNode( deferredFragmentRecord: DeferredFragmentRecord, + newPending: Set | undefined, ): DeferredFragmentNode { let deferredFragmentNode = this._deferredFragmentNodes.get( deferredFragmentRecord, @@ -280,7 +293,7 @@ export class IncrementalGraph { deferredFragmentRecord, deferredGroupedFieldSetRecords: new Set(), reconcilableResults: new Set(), - children: [], + children: new Set(), }; this._deferredFragmentNodes.set( deferredFragmentRecord, @@ -288,21 +301,37 @@ export class IncrementalGraph { ); const parent = deferredFragmentRecord.parent; if (parent === undefined) { - this._newPending.add(deferredFragmentNode); + invariant(newPending !== undefined); + newPending.add(deferredFragmentNode); return deferredFragmentNode; } - const parentNode = this._addDeferredFragmentNode(parent); - parentNode.children.push(deferredFragmentNode); + const parentNode = this._addDeferredFragmentNode(parent, newPending); + parentNode.children.add(deferredFragmentNode); return deferredFragmentNode; } - private async _onStreamItems( - streamRecord: StreamRecord, - streamItemQueue: Array, - ): Promise { + private _onDeferredGroupedFieldSet( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): void { + const deferredGroupedFieldSetResult = deferredGroupedFieldSetRecord.result; + const result = + deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue + ? deferredGroupedFieldSetResult.value + : deferredGroupedFieldSetResult().value; + + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => this._enqueue(resolved)); + } else { + this._enqueue(result); + } + } + + private async _onStreamItems(streamRecord: StreamRecord): Promise { let items: Array = []; let errors: Array = []; let incrementalDataRecords: Array = []; + const streamItemQueue = streamRecord.streamItemQueue; let streamItemRecord: StreamItemRecord | undefined; while ((streamItemRecord = streamItemQueue.shift()) !== undefined) { let result = diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 3b7d92ab1a..1d93fdabc1 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -76,8 +76,9 @@ class IncrementalPublisher { errors: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - const newPending = this._incrementalGraph.getNewPending(); + const newPending = this._incrementalGraph.getNewPending( + incrementalDataRecords, + ); const pending = this._pendingSourcesToResults(newPending); @@ -219,8 +220,6 @@ class IncrementalPublisher { } else { this._handleCompletedStreamItems(completedIncrementalData, context); } - const newPending = this._incrementalGraph.getNewPending(); - context.pending.push(...this._pendingSourcesToResults(newPending)); } private _handleCompletedDeferredGroupedFieldSet( @@ -255,22 +254,19 @@ class IncrementalPublisher { deferredGroupedFieldSetResult, ); - const incrementalDataRecords = - deferredGroupedFieldSetResult.incrementalDataRecords; - if (incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const reconcilableResults = - this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); - if (reconcilableResults === undefined) { + const completion = this._incrementalGraph.completeDeferredFragment( + deferredFragmentRecord, + ); + if (completion === undefined) { continue; } const id = this._ids.get(deferredFragmentRecord); invariant(id !== undefined); const incremental = context.incremental; + const { newPending, reconcilableResults } = completion; + context.pending.push(...this._pendingSourcesToResults(newPending)); for (const reconcilableResult of reconcilableResults) { const { bestId, subPath } = this._getBestIdAndSubPath( id, @@ -329,10 +325,12 @@ class IncrementalPublisher { context.incremental.push(incrementalEntry); - if (streamItemsResult.incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords( - streamItemsResult.incrementalDataRecords, + const incrementalDataRecords = streamItemsResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + const newPending = this._incrementalGraph.getNewPending( + incrementalDataRecords, ); + context.pending.push(...this._pendingSourcesToResults(newPending)); } } } From 74f2af71305c29ab9f2955cef7ba8fc7b94f4573 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 17 Jun 2024 22:25:07 +0300 Subject: [PATCH 4/5] fix(incremental): properly initiate nested deferred grouped field sets when early execution is disabled, deferred grouped field sets should start immediately if and only if one of their deferred fragments is released as pending see: graphql/defer-stream-wg#84 (reply in thread) --- src/execution/IncrementalGraph.ts | 11 +-- src/execution/__tests__/defer-test.ts | 132 +++++++++++++++++++++++++- src/execution/execute.ts | 45 ++++++--- src/execution/types.ts | 28 +++++- 4 files changed, 194 insertions(+), 22 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index e6f7df24f7..e6897e4c5d 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -8,6 +8,7 @@ import type { GraphQLError } from '../error/GraphQLError.js'; import type { DeferredFragmentRecord, DeferredGroupedFieldSetRecord, + DeferredGroupedFieldSetResult, IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, @@ -237,6 +238,7 @@ export class IncrementalGraph { for (const node of newPendingNodes) { if (isDeferredFragmentNode(node)) { if (node.deferredGroupedFieldSetRecords.size > 0) { + node.deferredFragmentRecord.setAsPending(); for (const deferredGroupedFieldSetRecord of node.deferredGroupedFieldSetRecords) { if (!this._hasPendingFragment(deferredGroupedFieldSetRecord)) { this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord); @@ -313,12 +315,9 @@ export class IncrementalGraph { private _onDeferredGroupedFieldSet( deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, ): void { - const deferredGroupedFieldSetResult = deferredGroupedFieldSetRecord.result; - const result = - deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue - ? deferredGroupedFieldSetResult.value - : deferredGroupedFieldSetResult().value; - + const result = ( + deferredGroupedFieldSetRecord.result as BoxedPromiseOrValue + ).value; if (isPromise(result)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises result.then((resolved) => this._enqueue(resolved)); diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index e74aebb9ae..2853415bd0 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -1,10 +1,12 @@ -import { expect } from 'chai'; +import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; +import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; + import type { DocumentNode } from '../../language/ast.js'; import { parse } from '../../language/parser.js'; @@ -856,6 +858,134 @@ describe('Execute: defer directive', () => { ]); }); + it('Initiates all deferred grouped field sets immediately if and only if they have been released as pending', async () => { + const document = parse(` + query { + ... @defer { + a { + ... @defer { + b { + c { d } + } + } + } + } + ... @defer { + a { + someField + ... @defer { + b { + e { f } + } + } + } + } + } + `); + + const { promise: slowFieldPromise, resolve: resolveSlowField } = + promiseWithResolvers(); + let cResolverCalled = false; + let eResolverCalled = false; + const executeResult = experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + a: { + someField: slowFieldPromise, + b: { + c: () => { + cResolverCalled = true; + return { d: 'd' }; + }, + e: () => { + eResolverCalled = true; + return { f: 'f' }; + }, + }, + }, + }, + enableEarlyExecution: false, + }); + + assert('initialResult' in executeResult); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }); + + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + expect(cResolverCalled).to.equal(false); + expect(eResolverCalled).to.equal(false); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + value: { + pending: [{ id: '2', path: ['a'] }], + incremental: [ + { + data: { a: {} }, + id: '0', + }, + { + data: { b: {} }, + id: '2', + }, + { + data: { c: { d: 'd' } }, + id: '2', + subPath: ['b'], + }, + ], + completed: [{ id: '0' }, { id: '2' }], + hasNext: true, + }, + done: false, + }); + + expect(cResolverCalled).to.equal(true); + expect(eResolverCalled).to.equal(false); + + resolveSlowField('someField'); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + pending: [{ id: '3', path: ['a'] }], + incremental: [ + { + data: { someField: 'someField' }, + id: '1', + subPath: ['a'], + }, + { + data: { e: { f: 'f' } }, + id: '3', + subPath: ['b'], + }, + ], + completed: [{ id: '1' }, { id: '3' }], + hasNext: false, + }, + done: false, + }); + + expect(eResolverCalled).to.equal(true); + + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('Can deduplicate multiple defers on the same object', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index d01b6ee768..ba7137b4c4 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -63,7 +63,6 @@ import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import type { CancellableStreamRecord, - DeferredFragmentRecord, DeferredGroupedFieldSetRecord, DeferredGroupedFieldSetResult, ExecutionResult, @@ -73,6 +72,7 @@ import type { StreamItemResult, StreamRecord, } from './types.js'; +import { DeferredFragmentRecord } from './types.js'; import { getArgumentValues, getDirectiveValues, @@ -1676,11 +1676,11 @@ function addNewDeferredFragments( : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. - const deferredFragmentRecord: DeferredFragmentRecord = { + const deferredFragmentRecord = new DeferredFragmentRecord( path, - label: newDeferUsage.label, + newDeferUsage.label, parent, - }; + ); // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); @@ -2114,16 +2114,33 @@ function executeDeferredGroupedFieldSets( deferMap, ); - const shouldDeferThisDeferUsageSet = shouldDefer( - parentDeferUsages, - deferUsageSet, - ); - - deferredGroupedFieldSetRecord.result = shouldDeferThisDeferUsageSet - ? exeContext.enableEarlyExecution - ? new BoxedPromiseOrValue(Promise.resolve().then(executor)) - : () => new BoxedPromiseOrValue(executor()) - : new BoxedPromiseOrValue(executor()); + if (exeContext.enableEarlyExecution) { + deferredGroupedFieldSetRecord.result = new BoxedPromiseOrValue( + shouldDefer(parentDeferUsages, deferUsageSet) + ? Promise.resolve().then(executor) + : executor(), + ); + } else if ( + deferredFragmentRecords.some( + (deferredFragmentRecord) => deferredFragmentRecord.pending, + ) + ) { + deferredGroupedFieldSetRecord.result = new BoxedPromiseOrValue( + executor(), + ); + } else { + deferredGroupedFieldSetRecord.result = () => + new BoxedPromiseOrValue(executor()); + const resolveThunk = () => { + const maybeThunk = deferredGroupedFieldSetRecord.result; + if (!(maybeThunk instanceof BoxedPromiseOrValue)) { + deferredGroupedFieldSetRecord.result = maybeThunk(); + } + }; + for (const deferredFragmentRecord of deferredFragmentRecords) { + deferredFragmentRecord.onPending(resolveThunk); + } + } newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } diff --git a/src/execution/types.ts b/src/execution/types.ts index 4deb841a03..078ea155ef 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -214,10 +214,36 @@ export interface DeferredGroupedFieldSetRecord { export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; -export interface DeferredFragmentRecord { +/** @internal */ +export class DeferredFragmentRecord { path: Path | undefined; label: string | undefined; parent: DeferredFragmentRecord | undefined; + fns: Array<() => void>; + pending: boolean; + + constructor( + path: Path | undefined, + label: string | undefined, + parent: DeferredFragmentRecord | undefined, + ) { + this.path = path; + this.label = label; + this.parent = parent; + this.pending = false; + this.fns = []; + } + + onPending(fn: () => void): void { + this.fns.push(fn); + } + + setAsPending(): void { + this.pending = true; + for (const fn of this.fns) { + fn(); + } + } } export interface StreamItemResult { From 75194bd566d88eb256bd3e0cac710a71f7cbaca3 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 18 Jun 2024 10:04:24 +0300 Subject: [PATCH 5/5] experiment(IncrementalGraph): track pending incremental data instead of results --- src/execution/IncrementalGraph.ts | 96 +++++++++++++++++++-------- src/execution/IncrementalPublisher.ts | 13 ++-- src/execution/types.ts | 2 +- 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index e6897e4c5d..c213d65678 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -11,6 +11,7 @@ import type { DeferredGroupedFieldSetResult, IncrementalDataRecord, IncrementalDataRecordResult, + NonReconcilableDeferredGroupedFieldSetResult, ReconcilableDeferredGroupedFieldSetResult, StreamItemRecord, StreamRecord, @@ -37,7 +38,7 @@ type SubsequentResultNode = DeferredFragmentNode | StreamRecord; * @internal */ export class IncrementalGraph { - private _pending: Set; + private _pendingIncrementalData: Set; private _deferredFragmentNodes: Map< DeferredFragmentRecord, DeferredFragmentNode @@ -49,7 +50,7 @@ export class IncrementalGraph { >; constructor() { - this._pending = new Set(); + this._pendingIncrementalData = new Set(); this._deferredFragmentNodes = new Map(); this._completedQueue = []; this._nextQueue = []; @@ -123,7 +124,7 @@ export class IncrementalGraph { } hasNext(): boolean { - return this._pending.size > 0; + return this._pendingIncrementalData.size > 0; } completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): @@ -146,24 +147,47 @@ export class IncrementalGraph { const reconcilableResults = Array.from( deferredFragmentNode.reconcilableResults, ); - this._removePending(deferredFragmentNode); for (const reconcilableResult of reconcilableResults) { + const deferredGroupedFieldSetRecord = + reconcilableResult.deferredGroupedFieldSetRecord; + this._removePendingIncrementalData(deferredGroupedFieldSetRecord); for (const otherDeferredFragmentNode of this._fragmentsToNodes( - reconcilableResult.deferredGroupedFieldSetRecord - .deferredFragmentRecords, + deferredGroupedFieldSetRecord.deferredFragmentRecords, )) { otherDeferredFragmentNode.reconcilableResults.delete( reconcilableResult, ); } } + this._deferredFragmentNodes.delete( + deferredFragmentNode.deferredFragmentRecord, + ); const newPending = this._pendingNodesToResults( deferredFragmentNode.children, ); return { newPending, reconcilableResults }; } - removeDeferredFragment( + removeNonReconcilableDeferredGroupedFieldSet( + nonReconcilableResult: NonReconcilableDeferredGroupedFieldSetResult, + ): ReadonlyArray { + const deferredFragmentRecords: Array = []; + const deferredGroupedFieldSetRecord = + nonReconcilableResult.deferredGroupedFieldSetRecord; + this._removePendingIncrementalData(deferredGroupedFieldSetRecord); + for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { + if (this._removeDeferredFragment(deferredFragmentRecord)) { + deferredFragmentRecords.push(deferredFragmentRecord); + } + } + return deferredFragmentRecords; + } + + removeStream(streamRecord: StreamRecord): void { + this._removePendingIncrementalData(streamRecord); + } + + private _removeDeferredFragment( deferredFragmentRecord: DeferredFragmentRecord, ): boolean { const deferredFragmentNode = this._deferredFragmentNodes.get( @@ -172,25 +196,40 @@ export class IncrementalGraph { if (deferredFragmentNode === undefined) { return false; } - this._removePending(deferredFragmentNode); - this._deferredFragmentNodes.delete(deferredFragmentRecord); + this._deferredFragmentNodes.delete( + deferredFragmentNode.deferredFragmentRecord, + ); + const deferredGroupedFieldSetRecords = [ + ...deferredFragmentNode.deferredGroupedFieldSetRecords, + ...Array.from(deferredFragmentNode.reconcilableResults).map( + (result) => result.deferredGroupedFieldSetRecord, + ), + ]; + for (const deferredGroupedFieldSetRecord of deferredGroupedFieldSetRecords) { + if ( + !deferredGroupedFieldSetRecord.deferredFragmentRecords.some( + (otherDeferredFragmentRecord) => + this._deferredFragmentNodes.has(otherDeferredFragmentRecord), + ) + ) { + this._pendingIncrementalData.delete(deferredGroupedFieldSetRecord); + } + } // TODO: add test case for an erroring deferred fragment with child defers /* c8 ignore next 5 */ for (const child of deferredFragmentNode.children) { if (isDeferredFragmentNode(child)) { - this.removeDeferredFragment(child.deferredFragmentRecord); + this._removeDeferredFragment(child.deferredFragmentRecord); } } return true; } - removeStream(streamRecord: StreamRecord): void { - this._removePending(streamRecord); - } - - private _removePending(subsequentResultNode: SubsequentResultNode): void { - this._pending.delete(subsequentResultNode); - if (this._pending.size === 0) { + private _removePendingIncrementalData( + incrementalDataRecord: IncrementalDataRecord, + ): void { + this._pendingIncrementalData.delete(incrementalDataRecord); + if (this._pendingIncrementalData.size === 0) { for (const resolve of this._nextQueue) { resolve({ value: undefined, done: true }); } @@ -213,13 +252,19 @@ export class IncrementalGraph { incrementalDataRecord, ); } - if (this._hasPendingFragment(incrementalDataRecord)) { + if ( + incrementalDataRecord.deferredFragmentRecords.some( + (deferredFragmentRecord) => deferredFragmentRecord.pending, + ) + ) { this._onDeferredGroupedFieldSet(incrementalDataRecord); } } else if (parents === undefined) { + this._pendingIncrementalData.add(incrementalDataRecord); invariant(newPending !== undefined); newPending.add(incrementalDataRecord); } else { + this._pendingIncrementalData.add(incrementalDataRecord); for (const parent of parents) { const deferredFragmentNode = this._addDeferredFragmentNode( parent, @@ -240,11 +285,12 @@ export class IncrementalGraph { if (node.deferredGroupedFieldSetRecords.size > 0) { node.deferredFragmentRecord.setAsPending(); for (const deferredGroupedFieldSetRecord of node.deferredGroupedFieldSetRecords) { - if (!this._hasPendingFragment(deferredGroupedFieldSetRecord)) { + if ( + !this._pendingIncrementalData.has(deferredGroupedFieldSetRecord) + ) { this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord); } } - this._pending.add(node); newPendingResults.push(node.deferredFragmentRecord); continue; } @@ -253,7 +299,6 @@ export class IncrementalGraph { newPendingNodes.add(child); } } else { - this._pending.add(node); newPendingResults.push(node); // eslint-disable-next-line @typescript-eslint/no-floating-promises @@ -263,14 +308,6 @@ export class IncrementalGraph { return newPendingResults; } - private _hasPendingFragment( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - ): boolean { - return this._fragmentsToNodes( - deferredGroupedFieldSetRecord.deferredFragmentRecords, - ).some((node) => this._pending.has(node)); - } - private _fragmentsToNodes( deferredFragmentRecords: ReadonlyArray, ): Array { @@ -315,6 +352,7 @@ export class IncrementalGraph { private _onDeferredGroupedFieldSet( deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, ): void { + this._pendingIncrementalData.add(deferredGroupedFieldSetRecord); const result = ( deferredGroupedFieldSetRecord.result as BoxedPromiseOrValue ).value; diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 1d93fdabc1..86a095d18f 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -231,14 +231,11 @@ class IncrementalPublisher { deferredGroupedFieldSetResult, ) ) { - for (const deferredFragmentRecord of deferredGroupedFieldSetResult - .deferredGroupedFieldSetRecord.deferredFragmentRecords) { - if ( - !this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord) - ) { - // This can occur if multiple deferred grouped field sets error for a fragment. - continue; - } + const deferredFragmentRecords = + this._incrementalGraph.removeNonReconcilableDeferredGroupedFieldSet( + deferredGroupedFieldSetResult, + ); + for (const deferredFragmentRecord of deferredFragmentRecords) { const id = this._ids.get(deferredFragmentRecord); invariant(id !== undefined); this._ids.delete(deferredFragmentRecord); diff --git a/src/execution/types.ts b/src/execution/types.ts index 078ea155ef..c723d17dcb 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -190,7 +190,7 @@ export interface ReconcilableDeferredGroupedFieldSetResult { errors?: never; } -interface NonReconcilableDeferredGroupedFieldSetResult { +export interface NonReconcilableDeferredGroupedFieldSetResult { deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord; path: Array; errors: ReadonlyArray;