Skip to content

Commit deb73ea

Browse files
committed
fixes test failures
1 parent 6c7c3a4 commit deb73ea

File tree

2 files changed

+47
-22
lines changed

2 files changed

+47
-22
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,25 +1344,19 @@ describe('Execute: stream directive', () => {
13441344
} /* c8 ignore stop */,
13451345
},
13461346
});
1347-
expectJSON(result).toDeepEqual([
1348-
{
1349-
errors: [
1350-
{
1351-
message:
1352-
'Cannot return null for non-nullable field NestedObject.nonNullScalarField.',
1353-
locations: [{ line: 4, column: 11 }],
1354-
path: ['nestedObject', 'nonNullScalarField'],
1355-
},
1356-
],
1357-
data: {
1358-
nestedObject: null,
1347+
expectJSON(result).toDeepEqual({
1348+
errors: [
1349+
{
1350+
message:
1351+
'Cannot return null for non-nullable field NestedObject.nonNullScalarField.',
1352+
locations: [{ line: 4, column: 11 }],
1353+
path: ['nestedObject', 'nonNullScalarField'],
13591354
},
1360-
hasNext: true,
1361-
},
1362-
{
1363-
hasNext: false,
1355+
],
1356+
data: {
1357+
nestedObject: null,
13641358
},
1365-
]);
1359+
});
13661360
});
13671361
it('Filters payloads that are nulled by a later synchronous error', async () => {
13681362
const document = parse(`
@@ -1503,9 +1497,6 @@ describe('Execute: stream directive', () => {
15031497
],
15041498
},
15051499
],
1506-
hasNext: true,
1507-
},
1508-
{
15091500
hasNext: false,
15101501
},
15111502
]);

src/execution/execute.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ export interface ExecutionContext {
122122
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
123123
errors: Array<GraphQLError>;
124124
subsequentPayloads: Set<AsyncPayloadRecord>;
125+
streams: Set<StreamContext>;
125126
}
126127

127128
/**
@@ -504,6 +505,7 @@ export function buildExecutionContext(
504505
typeResolver: typeResolver ?? defaultTypeResolver,
505506
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
506507
subsequentPayloads: new Set(),
508+
streams: new Set(),
507509
errors: [],
508510
};
509511
}
@@ -516,6 +518,7 @@ function buildPerEventExecutionContext(
516518
...exeContext,
517519
rootValue: payload,
518520
subsequentPayloads: new Set(),
521+
streams: new Set(),
519522
errors: [],
520523
};
521524
}
@@ -1036,6 +1039,8 @@ async function completeAsyncIteratorValue(
10361039
typeof stream.initialCount === 'number' &&
10371040
index >= stream.initialCount
10381041
) {
1042+
const streamContext: StreamContext = { path: pathToArray(path) };
1043+
exeContext.streams.add(streamContext);
10391044
// eslint-disable-next-line @typescript-eslint/no-floating-promises
10401045
executeStreamIterator(
10411046
index,
@@ -1045,6 +1050,7 @@ async function completeAsyncIteratorValue(
10451050
info,
10461051
itemType,
10471052
path,
1053+
streamContext,
10481054
stream.label,
10491055
asyncPayloadRecord,
10501056
);
@@ -1129,6 +1135,7 @@ function completeListValue(
11291135
let previousAsyncPayloadRecord = asyncPayloadRecord;
11301136
const completedResults: Array<unknown> = [];
11311137
let index = 0;
1138+
let streamContext: StreamContext | undefined;
11321139
for (const item of result) {
11331140
// No need to modify the info object containing the path,
11341141
// since from here on it is not ever accessed by resolver functions.
@@ -1139,6 +1146,8 @@ function completeListValue(
11391146
typeof stream.initialCount === 'number' &&
11401147
index >= stream.initialCount
11411148
) {
1149+
streamContext = { path: pathToArray(path) };
1150+
exeContext.streams.add(streamContext);
11421151
previousAsyncPayloadRecord = executeStreamField(
11431152
path,
11441153
itemPath,
@@ -1147,6 +1156,7 @@ function completeListValue(
11471156
fieldNodes,
11481157
info,
11491158
itemType,
1159+
streamContext,
11501160
stream.label,
11511161
previousAsyncPayloadRecord,
11521162
);
@@ -1173,6 +1183,10 @@ function completeListValue(
11731183
index++;
11741184
}
11751185

1186+
if (streamContext) {
1187+
exeContext.streams.delete(streamContext);
1188+
}
1189+
11761190
return containsPromise ? Promise.all(completedResults) : completedResults;
11771191
}
11781192

@@ -1813,12 +1827,14 @@ function executeStreamField(
18131827
fieldNodes: ReadonlyArray<FieldNode>,
18141828
info: GraphQLResolveInfo,
18151829
itemType: GraphQLOutputType,
1830+
streamContext: StreamContext,
18161831
label?: string,
18171832
parentContext?: AsyncPayloadRecord,
18181833
): AsyncPayloadRecord {
18191834
const asyncPayloadRecord = new StreamRecord({
18201835
label,
18211836
path: itemPath,
1837+
streamContext,
18221838
parentContext,
18231839
exeContext,
18241840
});
@@ -1965,19 +1981,20 @@ async function executeStreamIterator(
19651981
info: GraphQLResolveInfo,
19661982
itemType: GraphQLOutputType,
19671983
path: Path,
1984+
streamContext: StreamContext,
19681985
label?: string,
19691986
parentContext?: AsyncPayloadRecord,
19701987
): Promise<void> {
19711988
let index = initialIndex;
19721989
let previousAsyncPayloadRecord = parentContext ?? undefined;
1973-
// eslint-disable-next-line no-constant-condition
1974-
while (true) {
1990+
while (exeContext.streams.has(streamContext)) {
19751991
const itemPath = addPath(path, index, undefined);
19761992
const asyncPayloadRecord = new StreamRecord({
19771993
label,
19781994
path: itemPath,
19791995
parentContext: previousAsyncPayloadRecord,
19801996
iterator,
1997+
streamContext,
19811998
exeContext,
19821999
});
19832000

@@ -2003,6 +2020,7 @@ async function executeStreamIterator(
20032020
// ignore errors
20042021
});
20052022
}
2023+
exeContext.streams.delete(streamContext);
20062024
return;
20072025
}
20082026

@@ -2025,6 +2043,7 @@ async function executeStreamIterator(
20252043
asyncPayloadRecord.addItems(completedItems);
20262044

20272045
if (done) {
2046+
exeContext.streams.delete(streamContext);
20282047
break;
20292048
}
20302049
previousAsyncPayloadRecord = asyncPayloadRecord;
@@ -2038,6 +2057,15 @@ function filterSubsequentPayloads(
20382057
currentAsyncRecord: AsyncPayloadRecord | undefined,
20392058
): void {
20402059
const nullPathArray = pathToArray(nullPath);
2060+
exeContext.streams.forEach((stream) => {
2061+
for (let i = 0; i < nullPathArray.length; i++) {
2062+
if (stream.path[i] !== nullPathArray[i]) {
2063+
// stream points to a path unaffected by this payload
2064+
return;
2065+
}
2066+
}
2067+
exeContext.streams.delete(stream);
2068+
});
20412069
exeContext.subsequentPayloads.forEach((asyncRecord) => {
20422070
if (asyncRecord === currentAsyncRecord) {
20432071
// don't remove payload from where error originates
@@ -2211,6 +2239,9 @@ class DeferredFragmentRecord {
22112239
this._resolve?.(data);
22122240
}
22132241
}
2242+
interface StreamContext {
2243+
path: Array<string | number>;
2244+
}
22142245

22152246
class StreamRecord {
22162247
type: 'stream';
@@ -2221,6 +2252,7 @@ class StreamRecord {
22212252
promise: Promise<void>;
22222253
parentContext: AsyncPayloadRecord | undefined;
22232254
iterator: AsyncIterator<unknown> | undefined;
2255+
streamContext: StreamContext;
22242256
isCompletedIterator?: boolean;
22252257
isCompleted: boolean;
22262258
_exeContext: ExecutionContext;
@@ -2229,6 +2261,7 @@ class StreamRecord {
22292261
label: string | undefined;
22302262
path: Path | undefined;
22312263
iterator?: AsyncIterator<unknown>;
2264+
streamContext: StreamContext;
22322265
parentContext: AsyncPayloadRecord | undefined;
22332266
exeContext: ExecutionContext;
22342267
}) {
@@ -2238,6 +2271,7 @@ class StreamRecord {
22382271
this.path = pathToArray(opts.path);
22392272
this.parentContext = opts.parentContext;
22402273
this.iterator = opts.iterator;
2274+
this.streamContext = opts.streamContext;
22412275
this.errors = [];
22422276
this._exeContext = opts.exeContext;
22432277
this._exeContext.subsequentPayloads.add(this);

0 commit comments

Comments
 (0)