From e40e016c8c5ff6b397d15eaaa9621dcf53626b0e Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 4 Aug 2023 02:38:34 +0300 Subject: [PATCH 1/5] avoid additional stack for completing non-nullable types --- src/execution/execute.ts | 47 +++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index af68c286e1..f9dda0f48a 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -31,6 +31,7 @@ import type { GraphQLFieldResolver, GraphQLLeafType, GraphQLList, + GraphQLNullableOutputType, GraphQLObjectType, GraphQLOutputType, GraphQLResolveInfo, @@ -822,36 +823,28 @@ function completeValue( throw result; } - // If field type is NonNull, complete for inner type, and throw field error - // if result is null. + let nullableType: GraphQLNullableOutputType; if (isNonNullType(returnType)) { - const completed = completeValue( - exeContext, - returnType.ofType, - fieldGroup, - info, - path, - result, - incrementalDataRecord, - ); - if (completed === null) { + // If result value is null or undefined then throw an error. + if (result == null) { throw new Error( `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, ); } - return completed; - } - - // If result value is null or undefined then return null. - if (result == null) { - return null; + nullableType = returnType.ofType; + } else { + // If result value is null or undefined then return null. + if (result == null) { + return null; + } + nullableType = returnType; } // If field type is List, complete each item in the list with the inner type - if (isListType(returnType)) { + if (isListType(nullableType)) { return completeListValue( exeContext, - returnType, + nullableType, fieldGroup, info, path, @@ -862,16 +855,16 @@ function completeValue( // If field type is a leaf type, Scalar or Enum, serialize to a valid value, // returning null if serialization is not possible. - if (isLeafType(returnType)) { - return completeLeafValue(returnType, result); + if (isLeafType(nullableType)) { + return completeLeafValue(nullableType, result); } // If field type is an abstract type, Interface or Union, determine the // runtime Object type and complete for that type. - if (isAbstractType(returnType)) { + if (isAbstractType(nullableType)) { return completeAbstractValue( exeContext, - returnType, + nullableType, fieldGroup, info, path, @@ -881,10 +874,10 @@ function completeValue( } // If field type is Object, execute and complete all sub-selections. - if (isObjectType(returnType)) { + if (isObjectType(nullableType)) { return completeObjectValue( exeContext, - returnType, + nullableType, fieldGroup, info, path, @@ -896,7 +889,7 @@ function completeValue( // Not reachable, all possible output types have been considered. invariant( false, - 'Cannot complete value of unexpected output type: ' + inspect(returnType), + 'Cannot complete value of unexpected output type: ' + inspect(nullableType), ); } From 00052271ae0ff8b208ae8dcd38b8652b972ae60d Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 8 Aug 2023 13:58:26 +0300 Subject: [PATCH 2/5] convert completeValue to completeNonLeafValue --- src/execution/__tests__/stream-test.ts | 96 ++++++++++++- src/execution/execute.ts | 184 ++++++++++++++++++------- 2 files changed, 228 insertions(+), 52 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index ce3b920895..3fd7365605 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -151,6 +151,35 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Can stream a list field that returns an async iterable', async () => { + const document = parse('{ scalarList @stream(initialCount: 1) }'); + const result = await complete(document, { + async *scalarList() { + yield await Promise.resolve('apple'); + yield await Promise.resolve('banana'); + yield await Promise.resolve('coconut'); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarList: ['apple'], + }, + hasNext: true, + }, + { + incremental: [{ items: ['banana'], path: ['scalarList', 1] }], + hasNext: true, + }, + { + incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); it('Can use default value of initialCount', async () => { const document = parse('{ scalarList @stream }'); const result = await complete(document, { @@ -536,7 +565,7 @@ describe('Execute: stream directive', () => { }, ]); }); - it('Can stream a field that returns an async iterable', async () => { + it('Can stream an object field that returns an async iterable', async () => { const document = parse(` query { friendList @stream { @@ -770,6 +799,71 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Handles null returned in list items after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + name + } + } + `); + const result = await complete(document, { + friendList: () => [friends[0], null], + }); + + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ name: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles null returned in async iterable list items after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + name + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(null); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ name: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); it('Handles null returned in non-null list items after initialCount is reached', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index f9dda0f48a..1829b26f25 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -641,8 +641,9 @@ function executeFields( /** * Implements the "Executing fields" section of the spec * In particular, this function figures out the value that the field returns by - * calling its resolve function, then calls completeValue to complete promises, - * serialize scalars, or execute the sub-selection-set for objects. + * calling its resolve function, checks for promises, and then serializes leaf + * values or calls completeNonLeafValue to execute the sub-selection-set for + * objects and/or complete lists as necessary. */ function executeField( exeContext: ExecutionContext, @@ -699,9 +700,32 @@ function executeField( ); } - const completed = completeValue( + if (result instanceof Error) { + throw result; + } + + let nullableType: GraphQLNullableOutputType; + if (isNonNullType(returnType)) { + if (result == null) { + throw new Error( + `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, + ); + } + nullableType = returnType.ofType; + } else { + if (result == null) { + return null; + } + nullableType = returnType; + } + + if (isLeafType(nullableType)) { + return completeLeafValue(nullableType, result); + } + + const completed = completeNonLeafValue( exeContext, - returnType, + nullableType, fieldGroup, info, path, @@ -789,57 +813,27 @@ function handleFieldError( } /** - * Implements the instructions for completeValue as defined in the + * Implements the instructions for completing non-leaf values as defined in the * "Value Completion" section of the spec. * - * If the field type is Non-Null, then this recursively completes the value - * for the inner type. It throws a field error if that completion returns null, - * as per the "Nullability" section of the spec. - * * If the field type is a List, then this recursively completes the value * for the inner type on each item in the list. * - * If the field type is a Scalar or Enum, ensures the completed value is a legal - * value of the type by calling the `serialize` method of GraphQL type - * definition. - * * If the field is an abstract type, determine the runtime type of the value * and then complete based on that type * * Otherwise, the field type expects a sub-selection set, and will complete the * value by executing all sub-selections. */ -function completeValue( +function completeNonLeafValue( exeContext: ExecutionContext, - returnType: GraphQLOutputType, + nullableType: GraphQLNullableOutputType, fieldGroup: FieldGroup, info: GraphQLResolveInfo, path: Path, result: unknown, incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue { - // If result is an Error, throw a located error. - if (result instanceof Error) { - throw result; - } - - let nullableType: GraphQLNullableOutputType; - if (isNonNullType(returnType)) { - // If result value is null or undefined then throw an error. - if (result == null) { - throw new Error( - `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, - ); - } - nullableType = returnType.ofType; - } else { - // If result value is null or undefined then return null. - if (result == null) { - return null; - } - nullableType = returnType; - } - // If field type is List, complete each item in the list with the inner type if (isListType(nullableType)) { return completeListValue( @@ -853,12 +847,6 @@ function completeValue( ); } - // If field type is a leaf type, Scalar or Enum, serialize to a valid value, - // returning null if serialization is not possible. - if (isLeafType(nullableType)) { - return completeLeafValue(nullableType, result); - } - // If field type is an abstract type, Interface or Union, determine the // runtime Object type and complete for that type. if (isAbstractType(nullableType)) { @@ -904,9 +892,33 @@ async function completePromisedValue( ): Promise { try { const resolved = await result; - let completed = completeValue( + + if (resolved instanceof Error) { + throw resolved; + } + + let nullableType: GraphQLNullableOutputType; + if (isNonNullType(returnType)) { + if (resolved == null) { + throw new Error( + `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, + ); + } + nullableType = returnType.ofType; + } else { + if (resolved == null) { + return null; + } + nullableType = returnType; + } + + if (isLeafType(nullableType)) { + return completeLeafValue(nullableType, resolved); + } + + let completed = completeNonLeafValue( exeContext, - returnType, + nullableType, fieldGroup, info, path, @@ -1179,9 +1191,34 @@ function completeListItemValue( } try { - const completedItem = completeValue( + if (item instanceof Error) { + throw item; + } + + let nullableType: GraphQLNullableOutputType; + if (isNonNullType(itemType)) { + if (item == null) { + throw new Error( + `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, + ); + } + nullableType = itemType.ofType; + } else { + if (item == null) { + completedResults.push(null); + return false; + } + nullableType = itemType; + } + + if (isLeafType(nullableType)) { + completedResults.push(completeLeafValue(nullableType, item)); + return false; + } + + const completedItem = completeNonLeafValue( exeContext, - itemType, + nullableType, fieldGroup, info, itemPath, @@ -1852,9 +1889,35 @@ function executeStreamField( let completedItem: PromiseOrValue; try { try { - completedItem = completeValue( + let nullableType: GraphQLNullableOutputType; + if (isNonNullType(itemType)) { + if (item == null) { + throw new Error( + `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, + ); + } + nullableType = itemType.ofType; + } else { + if (item == null) { + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [null], + ); + return incrementalDataRecord; + } + nullableType = itemType; + } + + if (isLeafType(nullableType)) { + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completeLeafValue(nullableType, item), + ]); + return incrementalDataRecord; + } + + completedItem = completeNonLeafValue( exeContext, - itemType, + nullableType, fieldGroup, info, itemPath, @@ -1945,9 +2008,28 @@ async function executeStreamAsyncIteratorItem( } let completedItem; try { - completedItem = completeValue( + let nullableType: GraphQLNullableOutputType; + if (isNonNullType(itemType)) { + if (item == null) { + throw new Error( + `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, + ); + } + nullableType = itemType.ofType; + } else { + if (item == null) { + return { done: false, value: null }; + } + nullableType = itemType; + } + + if (isLeafType(nullableType)) { + return { done: false, value: completeLeafValue(nullableType, item) }; + } + + completedItem = completeNonLeafValue( exeContext, - itemType, + nullableType, fieldGroup, info, itemPath, From e6911774399ae68b56a0fddb8f54153a82bf10ec Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 8 Aug 2023 14:48:16 +0300 Subject: [PATCH 3/5] separate error handling for non-leaf values --- src/execution/__tests__/stream-test.ts | 184 +++++++++++++++++++- src/execution/execute.ts | 225 +++++++++++++++++-------- 2 files changed, 336 insertions(+), 73 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 3fd7365605..194ed0d84b 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -799,6 +799,49 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Handles error returned in async iterable after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(new Error('bad')); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ name: 'Luke', id: '1' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'bad', + locations: [{ line: 3, column: 9 }], + path: ['friendList', 1], + }, + ], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); it('Handles null returned in list items after initialCount is reached', async () => { const document = parse(` query { @@ -949,7 +992,7 @@ describe('Execute: stream directive', () => { }, ]); }); - it('Handles errors thrown by completeValue after initialCount is reached', async () => { + it('Handles errors thrown by leaf value completion after initialCount is reached', async () => { const document = parse(` query { scalarList @stream(initialCount: 1) @@ -983,7 +1026,41 @@ describe('Execute: stream directive', () => { }, ]); }); - it('Handles async errors thrown by completeValue after initialCount is reached', async () => { + it('Handles errors returned by leaf value completion after initialCount is reached', async () => { + const document = parse(` + query { + scalarList @stream(initialCount: 1) + } + `); + const result = await complete(document, { + scalarList: () => [friends[0].name, new Error('Oops')], + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarList: ['Luke'], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['scalarList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 3, column: 9 }], + path: ['scalarList', 1], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles async errors thrown by leaf value completion after initialCount is reached', async () => { const document = parse(` query { friendList @stream(initialCount: 1) { @@ -1034,6 +1111,109 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Handles nested errors thrown by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + friendList: () => [ + { nonNullName: friends[0].name }, + { nonNullName: new Error('Oops') }, + { nonNullName: friends[1].name }, + ], + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['friendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ nonNullName: 'Han' }], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles nested errors thrown by completeValue after initialCount is reached from async iterable', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve({ nonNullName: friends[0].name }); + yield await Promise.resolve({ + nonNullName: () => new Error('Oops'), + }); + yield await Promise.resolve({ nonNullName: friends[1].name }); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['friendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ nonNullName: 'Han' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); it('Handles nested async errors thrown by completeValue after initialCount is reached', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1829b26f25..7a6f2ce27d 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -670,6 +670,8 @@ function executeField( path, ); + let result; + let nullableType: GraphQLNullableOutputType; // Get the resolve function, regardless of if its result is normal or abrupt (error). try { // Build a JS object of arguments from the field.arguments AST, using the @@ -686,7 +688,7 @@ function executeField( // used to represent an authenticated user, or request-specific caches. const contextValue = exeContext.contextValue; - const result = resolveFn(source, args, contextValue, info); + result = resolveFn(source, args, contextValue, info); if (isPromise(result)) { return completePromisedValue( @@ -704,7 +706,6 @@ function executeField( throw result; } - let nullableType: GraphQLNullableOutputType; if (isNonNullType(returnType)) { if (result == null) { throw new Error( @@ -722,8 +723,22 @@ function executeField( if (isLeafType(nullableType)) { return completeLeafValue(nullableType, result); } + } catch (rawError) { + handleFieldError( + rawError, + exeContext, + returnType, + fieldGroup, + path, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); + return null; + } - const completed = completeNonLeafValue( + let completed; + try { + completed = completeNonLeafValue( exeContext, nullableType, fieldGroup, @@ -732,24 +747,6 @@ function executeField( result, incrementalDataRecord, ); - - if (isPromise(completed)) { - // Note: we don't rely on a `catch` method, but we do expect "thenable" - // to take a second callback for the error case. - return completed.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - returnType, - fieldGroup, - path, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); - return null; - }); - } - return completed; } catch (rawError) { handleFieldError( rawError, @@ -762,6 +759,24 @@ function executeField( exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } + + if (isPromise(completed)) { + // Note: we don't rely on a `catch` method, but we do expect "thenable" + // to take a second callback for the error case. + return completed.then(undefined, (rawError) => { + handleFieldError( + rawError, + exeContext, + returnType, + fieldGroup, + path, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); + return null; + }); + } + return completed; } /** @@ -890,14 +905,15 @@ async function completePromisedValue( result: Promise, incrementalDataRecord: IncrementalDataRecord, ): Promise { + let resolved; + let nullableType: GraphQLNullableOutputType; try { - const resolved = await result; + resolved = await result; if (resolved instanceof Error) { throw resolved; } - let nullableType: GraphQLNullableOutputType; if (isNonNullType(returnType)) { if (resolved == null) { throw new Error( @@ -915,7 +931,20 @@ async function completePromisedValue( if (isLeafType(nullableType)) { return completeLeafValue(nullableType, resolved); } + } catch (rawError) { + handleFieldError( + rawError, + exeContext, + returnType, + fieldGroup, + path, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); + return null; + } + try { let completed = completeNonLeafValue( exeContext, nullableType, @@ -1190,12 +1219,12 @@ function completeListItemValue( return true; } + let nullableType: GraphQLNullableOutputType; try { if (item instanceof Error) { throw item; } - let nullableType: GraphQLNullableOutputType; if (isNonNullType(itemType)) { if (item == null) { throw new Error( @@ -1215,8 +1244,23 @@ function completeListItemValue( completedResults.push(completeLeafValue(nullableType, item)); return false; } + } catch (rawError) { + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + completedResults.push(null); + return false; + } - const completedItem = completeNonLeafValue( + let completedItem; + try { + completedItem = completeNonLeafValue( exeContext, nullableType, fieldGroup, @@ -1225,32 +1269,6 @@ function completeListItemValue( item, incrementalDataRecord, ); - - if (isPromise(completedItem)) { - // Note: we don't rely on a `catch` method, but we do expect "thenable" - // to take a second callback for the error case. - completedResults.push( - completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter( - itemPath, - incrementalDataRecord, - ); - return null; - }), - ); - - return true; - } - - completedResults.push(completedItem); } catch (rawError) { handleFieldError( rawError, @@ -1262,8 +1280,32 @@ function completeListItemValue( ); exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); completedResults.push(null); + return false; } + if (isPromise(completedItem)) { + // Note: we don't rely on a `catch` method, but we do expect "thenable" + // to take a second callback for the error case. + completedResults.push( + completedItem.then(undefined, (rawError) => { + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + return null; + }), + ); + + return true; + } + + completedResults.push(completedItem); + return false; } @@ -1888,8 +1930,12 @@ function executeStreamField( let completedItem: PromiseOrValue; try { + let nullableType: GraphQLNullableOutputType; try { - let nullableType: GraphQLNullableOutputType; + if (item instanceof Error) { + throw item; + } + if (isNonNullType(itemType)) { if (item == null) { throw new Error( @@ -1914,7 +1960,23 @@ function executeStreamField( ]); return incrementalDataRecord; } + } catch (rawError) { + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + null, + ]); + return incrementalDataRecord; + } + try { completedItem = completeNonLeafValue( exeContext, nullableType, @@ -1933,8 +1995,11 @@ function executeStreamField( itemPath, incrementalDataRecord, ); - completedItem = null; exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + null, + ]); + return incrementalDataRecord; } } catch (error) { incrementalPublisher.addFieldError(incrementalDataRecord, error); @@ -2006,9 +2071,13 @@ async function executeStreamAsyncIteratorItem( } catch (rawError) { throw locatedError(rawError, fieldGroup, pathToArray(path)); } - let completedItem; + + let nullableType: GraphQLNullableOutputType; try { - let nullableType: GraphQLNullableOutputType; + if (item instanceof Error) { + throw item; + } + if (isNonNullType(itemType)) { if (item == null) { throw new Error( @@ -2026,7 +2095,21 @@ async function executeStreamAsyncIteratorItem( if (isLeafType(nullableType)) { return { done: false, value: completeLeafValue(nullableType, item) }; } + } catch (rawError) { + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + return { done: false, value: null }; + } + let completedItem; + try { completedItem = completeNonLeafValue( exeContext, nullableType, @@ -2036,22 +2119,6 @@ async function executeStreamAsyncIteratorItem( item, incrementalDataRecord, ); - - if (isPromise(completedItem)) { - completedItem = completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); - return null; - }); - } - return { done: false, value: completedItem }; } catch (rawError) { handleFieldError( rawError, @@ -2064,6 +2131,22 @@ async function executeStreamAsyncIteratorItem( exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return { done: false, value: null }; } + + if (isPromise(completedItem)) { + completedItem = completedItem.then(undefined, (rawError) => { + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + return null; + }); + } + return { done: false, value: completedItem }; } async function executeStreamAsyncIterator( From f73300dc5f069ff015118e2d58d8c4e769c16cc7 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 11 Aug 2023 20:44:27 +0300 Subject: [PATCH 4/5] add Abort signals --- integrationTests/ts/tsconfig.json | 8 +- src/execution/__tests__/executor-test.ts | 293 +++++++++++++++++++++++ src/execution/__tests__/stream-test.ts | 80 +++++++ src/execution/execute.ts | 263 +++++++++++++++----- src/jsutils/addAbortListener.ts | 64 +++++ src/type/definition.ts | 1 + 6 files changed, 653 insertions(+), 56 deletions(-) create mode 100644 src/jsutils/addAbortListener.ts diff --git a/integrationTests/ts/tsconfig.json b/integrationTests/ts/tsconfig.json index e8505c2bb9..2f3b87af16 100644 --- a/integrationTests/ts/tsconfig.json +++ b/integrationTests/ts/tsconfig.json @@ -1,7 +1,13 @@ { "compilerOptions": { "module": "commonjs", - "lib": ["es2019", "es2020.promise", "es2020.bigint", "es2020.string"], + "lib": [ + "es2019", + "es2020.promise", + "es2020.bigint", + "es2020.string", + "dom" // Workaround for missing web-compatible globals in `@types/node` + ], "noEmit": true, "types": [], "strict": true, diff --git a/src/execution/__tests__/executor-test.ts b/src/execution/__tests__/executor-test.ts index c29b4ae60d..1eba3a8669 100644 --- a/src/execution/__tests__/executor-test.ts +++ b/src/execution/__tests__/executor-test.ts @@ -635,6 +635,299 @@ describe('Execute: Handles basic execution tasks', () => { expect(isAsyncResolverFinished).to.equal(true); }); + it('exits early on early abort', () => { + let isExecuted = false; + + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + field: { + type: GraphQLString, + /* c8 ignore next 3 */ + resolve() { + isExecuted = true; + }, + }, + }, + }), + }); + + const document = parse(` + { + field + } + `); + + const abortController = new AbortController(); + abortController.abort(); + + const result = execute({ + schema, + document, + abortSignal: abortController.signal, + }); + + expect(isExecuted).to.equal(false); + expectJSON(result).toDeepEqual({ + data: { field: null }, + errors: [ + { + message: 'This operation was aborted', + locations: [{ line: 3, column: 9 }], + path: ['field'], + }, + ], + }); + }); + + it('exits early on abort mid-execution', async () => { + let isExecuted = false; + + const asyncObjectType = new GraphQLObjectType({ + name: 'AsyncObject', + fields: { + field: { + type: GraphQLString, + /* c8 ignore next 3 */ + resolve() { + isExecuted = true; + }, + }, + }, + }); + + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + asyncObject: { + type: asyncObjectType, + async resolve() { + await resolveOnNextTick(); + return {}; + }, + }, + }, + }), + }); + + const document = parse(` + { + asyncObject { + field + } + } + `); + + const abortController = new AbortController(); + + const result = execute({ + schema, + document, + abortSignal: abortController.signal, + }); + + abortController.abort(); + + expect(isExecuted).to.equal(false); + expectJSON(await result).toDeepEqual({ + data: { asyncObject: { field: null } }, + errors: [ + { + message: 'This operation was aborted', + locations: [{ line: 4, column: 11 }], + path: ['asyncObject', 'field'], + }, + ], + }); + expect(isExecuted).to.equal(false); + }); + + it('exits early on abort mid-resolver', async () => { + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + asyncField: { + type: GraphQLString, + async resolve(_parent, _args, _context, _info, abortSignal) { + await resolveOnNextTick(); + abortSignal?.throwIfAborted(); + }, + }, + }, + }), + }); + + const document = parse(` + { + asyncField + } + `); + + const abortController = new AbortController(); + + const result = execute({ + schema, + document, + abortSignal: abortController.signal, + }); + + abortController.abort(); + + expectJSON(await result).toDeepEqual({ + data: { asyncField: null }, + errors: [ + { + message: 'This operation was aborted', + locations: [{ line: 3, column: 9 }], + path: ['asyncField'], + }, + ], + }); + }); + + it('exits early on abort mid-nested resolver', async () => { + const syncObjectType = new GraphQLObjectType({ + name: 'SyncObject', + fields: { + asyncField: { + type: GraphQLString, + async resolve(_parent, _args, _context, _info, abortSignal) { + await resolveOnNextTick(); + abortSignal?.throwIfAborted(); + }, + }, + }, + }); + + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + syncObject: { + type: syncObjectType, + resolve() { + return {}; + }, + }, + }, + }), + }); + + const document = parse(` + { + syncObject { + asyncField + } + } + `); + + const abortController = new AbortController(); + + const result = execute({ + schema, + document, + abortSignal: abortController.signal, + }); + + abortController.abort(); + + expectJSON(await result).toDeepEqual({ + data: { syncObject: { asyncField: null } }, + errors: [ + { + message: 'This operation was aborted', + locations: [{ line: 4, column: 11 }], + path: ['syncObject', 'asyncField'], + }, + ], + }); + }); + + it('exits early on error', async () => { + const objectType = new GraphQLObjectType({ + name: 'Object', + fields: { + nonNullNestedAsyncField: { + type: new GraphQLNonNull(GraphQLString), + async resolve() { + await resolveOnNextTick(); + throw new Error('Oops'); + }, + }, + nestedAsyncField: { + type: GraphQLString, + async resolve(_parent, _args, _context, _info, abortSignal) { + await resolveOnNextTick(); + abortSignal?.throwIfAborted(); + }, + }, + }, + }); + + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + object: { + type: objectType, + resolve() { + return {}; + }, + }, + asyncField: { + type: GraphQLString, + async resolve() { + await resolveOnNextTick(); + return 'asyncValue'; + }, + }, + }, + }), + }); + + const document = parse(` + { + object { + nonNullNestedAsyncField + nestedAsyncField + } + asyncField + } + `); + + const abortController = new AbortController(); + + const result = execute({ + schema, + document, + abortSignal: abortController.signal, + }); + + abortController.abort(); + + expectJSON(await result).toDeepEqual({ + data: { + object: null, + asyncField: 'asyncValue', + }, + errors: [ + { + message: 'This operation was aborted', + locations: [{ line: 5, column: 11 }], + path: ['object', 'nestedAsyncField'], + }, + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['object', 'nonNullNestedAsyncField'], + }, + ], + }); + }); + it('Full response path is included for non-nullable fields', () => { const A: GraphQLObjectType = new GraphQLObjectType({ name: 'A', diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 194ed0d84b..f4756af8bc 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1160,6 +1160,45 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Handles nested errors thrown by completeValue after initialCount is reached for a non-nullable list', async () => { + const document = parse(` + query { + nonNullFriendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + nonNullFriendList: () => [ + { nonNullName: friends[0].name }, + { nonNullName: new Error('Oops') }, + ], + }); + expectJSON(result).toDeepEqual([ + { + data: { + nonNullFriendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: null, + path: ['nonNullFriendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['nonNullFriendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); it('Handles nested errors thrown by completeValue after initialCount is reached from async iterable', async () => { const document = parse(` query { @@ -1214,6 +1253,47 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Handles nested errors thrown by completeValue after initialCount is reached from async iterable for a non-nullable list', async () => { + const document = parse(` + query { + nonNullFriendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + async *nonNullFriendList() { + yield await Promise.resolve({ nonNullName: friends[0].name }); + yield await Promise.resolve({ + nonNullName: () => new Error('Oops'), + }); /* c8 ignore start */ + } /* c8 ignore stop */, + }); + expectJSON(result).toDeepEqual([ + { + data: { + nonNullFriendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: null, + path: ['nonNullFriendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['nonNullFriendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); it('Handles nested async errors thrown by completeValue after initialCount is reached', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 7a6f2ce27d..ad500e110b 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1,3 +1,4 @@ +import { addAbortListener } from '../jsutils/addAbortListener.js'; import { inspect } from '../jsutils/inspect.js'; import { invariant } from '../jsutils/invariant.js'; import { isAsyncIterable } from '../jsutils/isAsyncIterable.js'; @@ -132,6 +133,7 @@ export interface ExecutionContext { typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; incrementalPublisher: IncrementalPublisher; + abortSignal: AbortSignal | undefined; } /** @@ -201,6 +203,7 @@ export interface ExecutionArgs { fieldResolver?: Maybe>; typeResolver?: Maybe>; subscribeFieldResolver?: Maybe>; + abortSignal?: AbortSignal; } const UNEXPECTED_EXPERIMENTAL_DIRECTIVES = @@ -389,6 +392,7 @@ export function buildExecutionContext( fieldResolver, typeResolver, subscribeFieldResolver, + abortSignal, } = args; // If the schema used for execution is invalid, throw an error. @@ -453,6 +457,7 @@ export function buildExecutionContext( typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, incrementalPublisher: new IncrementalPublisher(), + abortSignal, }; } @@ -473,8 +478,14 @@ function executeOperation( exeContext: ExecutionContext, initialResultRecord: InitialResultRecord, ): PromiseOrValue> { - const { operation, schema, fragments, variableValues, rootValue } = - exeContext; + const { + operation, + schema, + fragments, + variableValues, + rootValue, + abortSignal, + } = exeContext; const rootType = schema.getRootType(operation.operation); if (rootType == null) { throw new GraphQLError( @@ -502,6 +513,7 @@ function executeOperation( path, groupedFieldSet, initialResultRecord, + abortSignal, ); break; case OperationTypeNode.MUTATION: @@ -512,6 +524,7 @@ function executeOperation( path, groupedFieldSet, initialResultRecord, + abortSignal, ); break; case OperationTypeNode.SUBSCRIPTION: @@ -524,6 +537,7 @@ function executeOperation( path, groupedFieldSet, initialResultRecord, + abortSignal, ); } @@ -535,6 +549,7 @@ function executeOperation( rootValue, patchGroupedFieldSet, initialResultRecord, + abortSignal, label, path, ); @@ -554,6 +569,7 @@ function executeFieldsSerially( path: Path | undefined, groupedFieldSet: GroupedFieldSet, incrementalDataRecord: InitialResultRecord, + abortSignal: AbortSignal | undefined, ): PromiseOrValue> { return promiseReduce( groupedFieldSet, @@ -566,6 +582,7 @@ function executeFieldsSerially( fieldGroup, fieldPath, incrementalDataRecord, + abortSignal, ); if (result === undefined) { return results; @@ -594,6 +611,7 @@ function executeFields( path: Path | undefined, groupedFieldSet: GroupedFieldSet, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal | undefined, ): PromiseOrValue> { const results = Object.create(null); let containsPromise = false; @@ -608,6 +626,7 @@ function executeFields( fieldGroup, fieldPath, incrementalDataRecord, + abortSignal, ); if (result !== undefined) { @@ -652,6 +671,7 @@ function executeField( fieldGroup: FieldGroup, path: Path, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal | undefined, ): PromiseOrValue { const fieldName = fieldGroup[0].name.value; const fieldDef = exeContext.schema.getField(parentType, fieldName); @@ -688,7 +708,11 @@ function executeField( // used to represent an authenticated user, or request-specific caches. const contextValue = exeContext.contextValue; - result = resolveFn(source, args, contextValue, info); + if (abortSignal?.aborted) { + abortSignal.throwIfAborted(); + } + + result = resolveFn(source, args, contextValue, info, abortSignal); if (isPromise(result)) { return completePromisedValue( @@ -699,6 +723,7 @@ function executeField( path, result, incrementalDataRecord, + abortSignal, ); } @@ -736,6 +761,13 @@ function executeField( return null; } + const abortController = new AbortController(); + let removeAbortListener: (() => void) | undefined; + if (abortSignal !== undefined) { + removeAbortListener = addAbortListener(abortSignal, () => + abortController.abort(), + ); + } let completed; try { completed = completeNonLeafValue( @@ -746,8 +778,11 @@ function executeField( path, result, incrementalDataRecord, + abortController.signal, ); } catch (rawError) { + removeAbortListener?.(); + abortController.abort(); handleFieldError( rawError, exeContext, @@ -763,19 +798,29 @@ function executeField( if (isPromise(completed)) { // Note: we don't rely on a `catch` method, but we do expect "thenable" // to take a second callback for the error case. - return completed.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - returnType, - fieldGroup, - path, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); - return null; - }); + return completed.then( + (resolved) => { + removeAbortListener?.(); + return resolved; + }, + (rawError) => { + removeAbortListener?.(); + abortController.abort(); + handleFieldError( + rawError, + exeContext, + returnType, + fieldGroup, + path, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); + return null; + }, + ); } + + removeAbortListener?.(); return completed; } @@ -848,6 +893,7 @@ function completeNonLeafValue( path: Path, result: unknown, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): PromiseOrValue { // If field type is List, complete each item in the list with the inner type if (isListType(nullableType)) { @@ -859,6 +905,7 @@ function completeNonLeafValue( path, result, incrementalDataRecord, + abortSignal, ); } @@ -873,6 +920,7 @@ function completeNonLeafValue( path, result, incrementalDataRecord, + abortSignal, ); } @@ -886,6 +934,7 @@ function completeNonLeafValue( path, result, incrementalDataRecord, + abortSignal, ); } /* c8 ignore next 6 */ @@ -904,6 +953,7 @@ async function completePromisedValue( path: Path, result: Promise, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal | undefined, ): Promise { let resolved; let nullableType: GraphQLNullableOutputType; @@ -944,6 +994,13 @@ async function completePromisedValue( return null; } + const abortController = new AbortController(); + let removeAbortListener: (() => void) | undefined; + if (abortSignal !== undefined) { + removeAbortListener = addAbortListener(abortSignal, () => + abortController.abort(), + ); + } try { let completed = completeNonLeafValue( exeContext, @@ -953,12 +1010,16 @@ async function completePromisedValue( path, resolved, incrementalDataRecord, + abortController.signal, ); if (isPromise(completed)) { completed = await completed; } + removeAbortListener?.(); return completed; } catch (rawError) { + removeAbortListener?.(); + abortController.abort(); handleFieldError( rawError, exeContext, @@ -1041,6 +1102,7 @@ async function completeAsyncIteratorValue( path: Path, asyncIterator: AsyncIterator, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): Promise> { const stream = getStreamValues(exeContext, fieldGroup, path); let containsPromise = false; @@ -1063,6 +1125,7 @@ async function completeAsyncIteratorValue( itemType, path, incrementalDataRecord, + abortSignal, stream.label, ); break; @@ -1090,6 +1153,7 @@ async function completeAsyncIteratorValue( info, itemPath, incrementalDataRecord, + abortSignal, ) ) { containsPromise = true; @@ -1111,6 +1175,7 @@ function completeListValue( path: Path, result: unknown, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): PromiseOrValue> { const itemType = returnType.ofType; @@ -1125,6 +1190,7 @@ function completeListValue( path, asyncIterator, incrementalDataRecord, + abortSignal, ); } @@ -1161,6 +1227,7 @@ function completeListValue( info, itemType, previousIncrementalDataRecord, + abortSignal, stream.label, ); index++; @@ -1177,6 +1244,7 @@ function completeListValue( info, itemPath, incrementalDataRecord, + abortSignal, ) ) { containsPromise = true; @@ -1202,6 +1270,7 @@ function completeListItemValue( info: GraphQLResolveInfo, itemPath: Path, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): boolean { if (isPromise(item)) { completedResults.push( @@ -1213,6 +1282,7 @@ function completeListItemValue( itemPath, item, incrementalDataRecord, + abortSignal, ), ); @@ -1258,6 +1328,10 @@ function completeListItemValue( return false; } + const abortController = new AbortController(); + const removeAbortListener = addAbortListener(abortSignal, () => + abortController.abort(), + ); let completedItem; try { completedItem = completeNonLeafValue( @@ -1268,8 +1342,11 @@ function completeListItemValue( itemPath, item, incrementalDataRecord, + abortController.signal, ); } catch (rawError) { + removeAbortListener(); + abortController.abort(); handleFieldError( rawError, exeContext, @@ -1287,23 +1364,35 @@ function completeListItemValue( // Note: we don't rely on a `catch` method, but we do expect "thenable" // to take a second callback for the error case. completedResults.push( - completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); - return null; - }), + completedItem.then( + (resolved) => { + removeAbortListener(); + return resolved; + }, + (rawError) => { + removeAbortListener(); + abortController.abort(); + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter( + itemPath, + incrementalDataRecord, + ); + return null; + }, + ), ); return true; } + removeAbortListener(); completedResults.push(completedItem); return false; @@ -1339,6 +1428,7 @@ function completeAbstractValue( path: Path, result: unknown, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): PromiseOrValue> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; @@ -1361,6 +1451,7 @@ function completeAbstractValue( path, result, incrementalDataRecord, + abortSignal, ), ); } @@ -1380,6 +1471,7 @@ function completeAbstractValue( path, result, incrementalDataRecord, + abortSignal, ); } @@ -1449,6 +1541,7 @@ function completeObjectValue( path: Path, result: unknown, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): PromiseOrValue> { // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather @@ -1468,6 +1561,7 @@ function completeObjectValue( path, result, incrementalDataRecord, + abortSignal, ); }); } @@ -1484,6 +1578,7 @@ function completeObjectValue( path, result, incrementalDataRecord, + abortSignal, ); } @@ -1505,6 +1600,7 @@ function collectAndExecuteSubfields( path: Path, result: unknown, incrementalDataRecord: IncrementalDataRecord, + abortSignal: AbortSignal, ): PromiseOrValue> { // Collect sub-fields to execute to complete this value. const { groupedFieldSet: subGroupedFieldSet, patches: subPatches } = @@ -1517,6 +1613,7 @@ function collectAndExecuteSubfields( path, subGroupedFieldSet, incrementalDataRecord, + abortSignal, ); for (const subPatch of subPatches) { @@ -1527,6 +1624,7 @@ function collectAndExecuteSubfields( result, subPatchGroupedFieldSet, incrementalDataRecord, + abortSignal, label, path, ); @@ -1737,8 +1835,14 @@ function createSourceEventStreamImpl( function executeSubscription( exeContext: ExecutionContext, ): PromiseOrValue> { - const { schema, fragments, operation, variableValues, rootValue } = - exeContext; + const { + schema, + fragments, + operation, + variableValues, + rootValue, + abortSignal, + } = exeContext; const rootType = schema.getSubscriptionType(); if (rootType == null) { @@ -1793,7 +1897,7 @@ function executeSubscription( // Call the `subscribe()` resolver or the default resolver to produce an // AsyncIterable yielding raw payloads. const resolveFn = fieldDef.subscribe ?? exeContext.subscribeFieldResolver; - const result = resolveFn(rootValue, args, contextValue, info); + const result = resolveFn(rootValue, args, contextValue, info, abortSignal); if (isPromise(result)) { return result.then(assertEventStream).then(undefined, (error) => { @@ -1829,6 +1933,7 @@ function executeDeferredFragment( sourceValue: unknown, fields: GroupedFieldSet, parentContext: IncrementalDataRecord, + abortSignal: AbortSignal | undefined, label?: string, path?: Path, ): void { @@ -1849,6 +1954,7 @@ function executeDeferredFragment( path, fields, incrementalDataRecord, + abortSignal, ); if (isPromise(promiseOrData)) { @@ -1890,6 +1996,7 @@ function executeStreamField( info: GraphQLResolveInfo, itemType: GraphQLOutputType, parentContext: IncrementalDataRecord, + abortSignal: AbortSignal, label?: string, ): SubsequentDataRecord { const incrementalPublisher = exeContext.incrementalPublisher; @@ -1909,6 +2016,7 @@ function executeStreamField( itemPath, item, incrementalDataRecord, + abortSignal, ).then( (value) => incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ @@ -1929,8 +2037,8 @@ function executeStreamField( } let completedItem: PromiseOrValue; + let nullableType: GraphQLNullableOutputType; try { - let nullableType: GraphQLNullableOutputType; try { if (item instanceof Error) { throw item; @@ -1975,7 +2083,18 @@ function executeStreamField( ]); return incrementalDataRecord; } + } catch (error) { + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, null); + return incrementalDataRecord; + } + const abortController = new AbortController(); + const removeAbortListener = addAbortListener(abortSignal, () => + abortController.abort(), + ); + try { try { completedItem = completeNonLeafValue( exeContext, @@ -1985,8 +2104,11 @@ function executeStreamField( itemPath, item, incrementalDataRecord, + abortController.signal, ); } catch (rawError) { + removeAbortListener(); + abortController.abort(); handleFieldError( rawError, exeContext, @@ -2010,18 +2132,29 @@ function executeStreamField( if (isPromise(completedItem)) { completedItem - .then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); - return null; - }) + .then( + (resolvedItem) => { + removeAbortListener(); + return resolvedItem; + }, + (rawError) => { + removeAbortListener(); + abortController.abort(); + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter( + itemPath, + incrementalDataRecord, + ); + return null; + }, + ) .then( (value) => incrementalPublisher.completeStreamItemsRecord( @@ -2041,6 +2174,7 @@ function executeStreamField( return incrementalDataRecord; } + removeAbortListener(); incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ completedItem, ]); @@ -2056,6 +2190,7 @@ async function executeStreamAsyncIteratorItem( incrementalDataRecord: StreamItemsRecord, path: Path, itemPath: Path, + abortSignal: AbortSignal, ): Promise> { let item; try { @@ -2108,6 +2243,10 @@ async function executeStreamAsyncIteratorItem( return { done: false, value: null }; } + const abortController = new AbortController(); + const removeAbortListener = addAbortListener(abortSignal, () => + abortController.abort(), + ); let completedItem; try { completedItem = completeNonLeafValue( @@ -2118,8 +2257,11 @@ async function executeStreamAsyncIteratorItem( itemPath, item, incrementalDataRecord, + abortController.signal, ); } catch (rawError) { + removeAbortListener(); + abortController.abort(); handleFieldError( rawError, exeContext, @@ -2133,19 +2275,28 @@ async function executeStreamAsyncIteratorItem( } if (isPromise(completedItem)) { - completedItem = completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); - return null; - }); + completedItem = completedItem.then( + (resolvedItem) => { + removeAbortListener(); + return resolvedItem; + }, + (rawError) => { + removeAbortListener(); + abortController.abort(); + handleFieldError( + rawError, + exeContext, + itemType, + fieldGroup, + itemPath, + incrementalDataRecord, + ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + return null; + }, + ); } + removeAbortListener(); return { done: false, value: completedItem }; } @@ -2158,6 +2309,7 @@ async function executeStreamAsyncIterator( itemType: GraphQLOutputType, path: Path, parentContext: IncrementalDataRecord, + abortSignal: AbortSignal, label?: string, ): Promise { const incrementalPublisher = exeContext.incrementalPublisher; @@ -2186,6 +2338,7 @@ async function executeStreamAsyncIterator( incrementalDataRecord, path, itemPath, + abortSignal, ); } catch (error) { incrementalPublisher.addFieldError(incrementalDataRecord, error); diff --git a/src/jsutils/addAbortListener.ts b/src/jsutils/addAbortListener.ts new file mode 100644 index 0000000000..cdada90531 --- /dev/null +++ b/src/jsutils/addAbortListener.ts @@ -0,0 +1,64 @@ +type Callback = () => void; +interface AbortInfo { + listeners: Set; + dispose: Callback; +} +type Cache = WeakMap; + +let maybeCache: Cache | undefined; + +/** + * Helper function to add a callback to be triggered when the abort signal fires. + * Returns a function that will remove the callback when called. + * + * This helper function also avoids hitting the max listener limit on AbortSignals, + * which could be a common occurrence when setting up multiple contingent + * abort signals. + */ +export function addAbortListener( + abortSignal: AbortSignal, + callback: Callback, +): Callback { + if (abortSignal.aborted) { + callback(); + return () => { + /* noop */ + }; + } + + const cache = (maybeCache ??= new WeakMap()); + + const abortInfo = cache.get(abortSignal); + + if (abortInfo !== undefined) { + abortInfo.listeners.add(callback); + return () => removeAbortListener(abortInfo, callback); + } + + const listeners = new Set([callback]); + const onAbort = () => triggerCallbacks(listeners); + const dispose = () => { + abortSignal.removeEventListener('abort', onAbort); + }; + const newAbortInfo = { listeners, dispose }; + cache.set(abortSignal, newAbortInfo); + abortSignal.addEventListener('abort', onAbort); + + return () => removeAbortListener(newAbortInfo, callback); +} + +function triggerCallbacks(listeners: Set): void { + for (const listener of listeners) { + listener(); + } +} + +function removeAbortListener(abortInfo: AbortInfo, callback: Callback): void { + const listeners = abortInfo.listeners; + + listeners.delete(callback); + + if (listeners.size === 0) { + abortInfo.dispose(); + } +} diff --git a/src/type/definition.ts b/src/type/definition.ts index 0ca4152bd2..140d891bf3 100644 --- a/src/type/definition.ts +++ b/src/type/definition.ts @@ -883,6 +883,7 @@ export type GraphQLFieldResolver< args: TArgs, context: TContext, info: GraphQLResolveInfo, + abortSignal: AbortSignal | undefined, ) => TResult; export interface GraphQLResolveInfo { From bfba92c9cc35a79ed1015d9a85b25203c56ab74e Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 14 Aug 2023 19:33:40 +0300 Subject: [PATCH 5/5] fix version number for TS v5 --- integrationTests/ts/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrationTests/ts/package.json b/integrationTests/ts/package.json index e003b253fd..f2bea673d8 100644 --- a/integrationTests/ts/package.json +++ b/integrationTests/ts/package.json @@ -14,6 +14,6 @@ "typescript-4.7": "npm:typescript@4.7.x", "typescript-4.8": "npm:typescript@4.8.x", "typescript-4.9": "npm:typescript@4.9.x", - "typescript-4.9": "npm:typescript@5.0.x" + "typescript-5.0": "npm:typescript@5.0.x" } }