Skip to content

Commit c785f21

Browse files
committed
fix(incremental): abort async iteration when stream errors
Currently: When list item completion fails for a stream with a non-nullable list: 1. the entire list must be nulled within the given AsyncPayloadRecord 2. any other pending AsyncPayloadRecords must be filtered 3. async iteration powering the stream must be cancelled Currently, the third objective is accomplished by way of the second; during AsyncPayloadRecord filtering, if a stream record is filtered and has an associated asyncIterator, its return() method is called, which _should_ end the stream. This can go wrong in a few ways: A: The return() method may not exist; by specification, the return() method exists for the caller to notify the callee that the caller no longer intends to call next(), allowing for early cleanup. The method is optional, however, and so should not be relied on. B: The return method, even if it exists, may not be set up to block any next() calls while it operates. Async generators have next and return methods that always settle in call order, but async iterables do not. This PR adds tests addressing these scenarios and fixes the test failures. Tests are made to pass by adding independent handling for the third objective using a context object global to the stream that can be used to check/set the errored status.
1 parent 7a609a2 commit c785f21

File tree

2 files changed

+164
-3
lines changed

2 files changed

+164
-3
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { assert, expect } from 'chai';
22
import { describe, it } from 'mocha';
33

44
import { expectJSON } from '../../__testUtils__/expectJSON.js';
5+
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
56

67
import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js';
78

@@ -1134,7 +1135,7 @@ describe('Execute: stream directive', () => {
11341135
},
11351136
]);
11361137
});
1137-
it('Handles async errors thrown by completeValue after initialCount is reached from async iterable for a non-nullable list', async () => {
1138+
it('Handles async errors thrown by completeValue after initialCount is reached from async generator for a non-nullable list', async () => {
11381139
const document = parse(`
11391140
query {
11401141
nonNullFriendList @stream(initialCount: 1) {
@@ -1174,9 +1175,152 @@ describe('Execute: stream directive', () => {
11741175
],
11751176
},
11761177
],
1178+
hasNext: false,
1179+
},
1180+
]);
1181+
});
1182+
it('Handles async errors thrown by completeValue after initialCount is reached from async iterable for a non-nullable list when the async iterable does not provide a return method) ', async () => {
1183+
const document = parse(`
1184+
query {
1185+
nonNullFriendList @stream(initialCount: 1) {
1186+
nonNullName
1187+
}
1188+
}
1189+
`);
1190+
let count = 0;
1191+
const result = await complete(document, {
1192+
nonNullFriendList: {
1193+
[Symbol.asyncIterator]: () => ({
1194+
next: async () => {
1195+
switch (count++) {
1196+
case 0:
1197+
return Promise.resolve({
1198+
done: false,
1199+
value: { nonNullName: friends[0].name },
1200+
});
1201+
case 1:
1202+
return Promise.resolve({
1203+
done: false,
1204+
value: {
1205+
nonNullName: () => Promise.reject(new Error('Oops')),
1206+
},
1207+
});
1208+
case 2:
1209+
return Promise.resolve({
1210+
done: false,
1211+
value: { nonNullName: friends[1].name },
1212+
});
1213+
// Not reached
1214+
/* c8 ignore next 5 */
1215+
case 3:
1216+
return Promise.resolve({
1217+
done: false,
1218+
value: { nonNullName: friends[2].name },
1219+
});
1220+
}
1221+
},
1222+
}),
1223+
},
1224+
});
1225+
expectJSON(result).toDeepEqual([
1226+
{
1227+
data: {
1228+
nonNullFriendList: [{ nonNullName: 'Luke' }],
1229+
},
11771230
hasNext: true,
11781231
},
11791232
{
1233+
incremental: [
1234+
{
1235+
items: null,
1236+
path: ['nonNullFriendList', 1],
1237+
errors: [
1238+
{
1239+
message: 'Oops',
1240+
locations: [{ line: 4, column: 11 }],
1241+
path: ['nonNullFriendList', 1, 'nonNullName'],
1242+
},
1243+
],
1244+
},
1245+
],
1246+
hasNext: false,
1247+
},
1248+
]);
1249+
});
1250+
it('Handles async errors thrown by completeValue after initialCount is reached from async iterable for a non-nullable list when the async iterable provides concurrent next/return methods and has a slow return ', async () => {
1251+
const document = parse(`
1252+
query {
1253+
nonNullFriendList @stream(initialCount: 1) {
1254+
nonNullName
1255+
}
1256+
}
1257+
`);
1258+
let count = 0;
1259+
let returned = false;
1260+
const result = await complete(document, {
1261+
nonNullFriendList: {
1262+
[Symbol.asyncIterator]: () => ({
1263+
next: async () => {
1264+
/* c8 ignore next 3 */
1265+
if (returned) {
1266+
return Promise.resolve({ done: true });
1267+
}
1268+
switch (count++) {
1269+
case 0:
1270+
return Promise.resolve({
1271+
done: false,
1272+
value: { nonNullName: friends[0].name },
1273+
});
1274+
case 1:
1275+
return Promise.resolve({
1276+
done: false,
1277+
value: {
1278+
nonNullName: () => Promise.reject(new Error('Oops')),
1279+
},
1280+
});
1281+
case 2:
1282+
return Promise.resolve({
1283+
done: false,
1284+
value: { nonNullName: friends[1].name },
1285+
});
1286+
// Not reached
1287+
/* c8 ignore next 5 */
1288+
case 3:
1289+
return Promise.resolve({
1290+
done: false,
1291+
value: { nonNullName: friends[2].name },
1292+
});
1293+
}
1294+
},
1295+
return: async () => {
1296+
await resolveOnNextTick();
1297+
returned = true;
1298+
return { done: true };
1299+
},
1300+
}),
1301+
},
1302+
});
1303+
expectJSON(result).toDeepEqual([
1304+
{
1305+
data: {
1306+
nonNullFriendList: [{ nonNullName: 'Luke' }],
1307+
},
1308+
hasNext: true,
1309+
},
1310+
{
1311+
incremental: [
1312+
{
1313+
items: null,
1314+
path: ['nonNullFriendList', 1],
1315+
errors: [
1316+
{
1317+
message: 'Oops',
1318+
locations: [{ line: 4, column: 11 }],
1319+
path: ['nonNullFriendList', 1, 'nonNullName'],
1320+
},
1321+
],
1322+
},
1323+
],
11801324
hasNext: false,
11811325
},
11821326
]);

src/execution/execute.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,7 @@ async function completeAsyncIteratorValue(
10361036
typeof stream.initialCount === 'number' &&
10371037
index >= stream.initialCount
10381038
) {
1039+
const streamContext = { errored: false };
10391040
// eslint-disable-next-line @typescript-eslint/no-floating-promises
10401041
executeStreamIterator(
10411042
index,
@@ -1045,6 +1046,7 @@ async function completeAsyncIteratorValue(
10451046
info,
10461047
itemType,
10471048
path,
1049+
streamContext,
10481050
stream.label,
10491051
asyncPayloadRecord,
10501052
);
@@ -1139,6 +1141,7 @@ function completeListValue(
11391141
typeof stream.initialCount === 'number' &&
11401142
index >= stream.initialCount
11411143
) {
1144+
const streamContext = { errored: false };
11421145
previousAsyncPayloadRecord = executeStreamField(
11431146
path,
11441147
itemPath,
@@ -1147,6 +1150,7 @@ function completeListValue(
11471150
fieldNodes,
11481151
info,
11491152
itemType,
1153+
streamContext,
11501154
stream.label,
11511155
previousAsyncPayloadRecord,
11521156
);
@@ -1813,12 +1817,14 @@ function executeStreamField(
18131817
fieldNodes: ReadonlyArray<FieldNode>,
18141818
info: GraphQLResolveInfo,
18151819
itemType: GraphQLOutputType,
1820+
streamContext: StreamContext,
18161821
label?: string,
18171822
parentContext?: AsyncPayloadRecord,
18181823
): AsyncPayloadRecord {
18191824
const asyncPayloadRecord = new StreamRecord({
18201825
label,
18211826
path: itemPath,
1827+
streamContext,
18221828
parentContext,
18231829
exeContext,
18241830
});
@@ -1835,6 +1841,7 @@ function executeStreamField(
18351841
(value) => [value],
18361842
(error) => {
18371843
asyncPayloadRecord.errors.push(error);
1844+
streamContext.errored = true;
18381845
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
18391846
return null;
18401847
},
@@ -1867,6 +1874,7 @@ function executeStreamField(
18671874
}
18681875
} catch (error) {
18691876
asyncPayloadRecord.errors.push(error);
1877+
streamContext.errored = true;
18701878
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
18711879
asyncPayloadRecord.addItems(null);
18721880
return asyncPayloadRecord;
@@ -1888,6 +1896,7 @@ function executeStreamField(
18881896
(value) => [value],
18891897
(error) => {
18901898
asyncPayloadRecord.errors.push(error);
1899+
streamContext.errored = true;
18911900
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
18921901
return null;
18931902
},
@@ -1965,19 +1974,20 @@ async function executeStreamIterator(
19651974
info: GraphQLResolveInfo,
19661975
itemType: GraphQLOutputType,
19671976
path: Path,
1977+
streamContext: StreamContext,
19681978
label?: string,
19691979
parentContext?: AsyncPayloadRecord,
19701980
): Promise<void> {
19711981
let index = initialIndex;
19721982
let previousAsyncPayloadRecord = parentContext ?? undefined;
1973-
// eslint-disable-next-line no-constant-condition
1974-
while (true) {
1983+
while (!streamContext.errored) {
19751984
const itemPath = addPath(path, index, undefined);
19761985
const asyncPayloadRecord = new StreamRecord({
19771986
label,
19781987
path: itemPath,
19791988
parentContext: previousAsyncPayloadRecord,
19801989
iterator,
1990+
streamContext,
19811991
exeContext,
19821992
});
19831993

@@ -2014,6 +2024,7 @@ async function executeStreamIterator(
20142024
(value) => [value],
20152025
(error) => {
20162026
asyncPayloadRecord.errors.push(error);
2027+
streamContext.errored = true;
20172028
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
20182029
return null;
20192030
},
@@ -2211,6 +2222,9 @@ class DeferredFragmentRecord {
22112222
this._resolve?.(data);
22122223
}
22132224
}
2225+
interface StreamContext {
2226+
errored: boolean;
2227+
}
22142228

22152229
class StreamRecord {
22162230
type: 'stream';
@@ -2221,6 +2235,7 @@ class StreamRecord {
22212235
promise: Promise<void>;
22222236
parentContext: AsyncPayloadRecord | undefined;
22232237
iterator: AsyncIterator<unknown> | undefined;
2238+
streamContext: StreamContext;
22242239
isCompletedIterator?: boolean;
22252240
isCompleted: boolean;
22262241
_exeContext: ExecutionContext;
@@ -2229,6 +2244,7 @@ class StreamRecord {
22292244
label: string | undefined;
22302245
path: Path | undefined;
22312246
iterator?: AsyncIterator<unknown>;
2247+
streamContext: StreamContext;
22322248
parentContext: AsyncPayloadRecord | undefined;
22332249
exeContext: ExecutionContext;
22342250
}) {
@@ -2238,6 +2254,7 @@ class StreamRecord {
22382254
this.path = pathToArray(opts.path);
22392255
this.parentContext = opts.parentContext;
22402256
this.iterator = opts.iterator;
2257+
this.streamContext = opts.streamContext;
22412258
this.errors = [];
22422259
this._exeContext = opts.exeContext;
22432260
this._exeContext.subsequentPayloads.add(this);

0 commit comments

Comments
 (0)