@@ -122,6 +122,7 @@ export interface ExecutionContext {
122
122
subscribeFieldResolver : GraphQLFieldResolver < any , any > ;
123
123
errors : Array < GraphQLError > ;
124
124
subsequentPayloads : Set < AsyncPayloadRecord > ;
125
+ streams : Set < StreamContext > ;
125
126
}
126
127
127
128
/**
@@ -504,6 +505,7 @@ export function buildExecutionContext(
504
505
typeResolver : typeResolver ?? defaultTypeResolver ,
505
506
subscribeFieldResolver : subscribeFieldResolver ?? defaultFieldResolver ,
506
507
subsequentPayloads : new Set ( ) ,
508
+ streams : new Set ( ) ,
507
509
errors : [ ] ,
508
510
} ;
509
511
}
@@ -516,6 +518,7 @@ function buildPerEventExecutionContext(
516
518
...exeContext ,
517
519
rootValue : payload ,
518
520
subsequentPayloads : new Set ( ) ,
521
+ streams : new Set ( ) ,
519
522
errors : [ ] ,
520
523
} ;
521
524
}
@@ -1036,6 +1039,8 @@ async function completeAsyncIteratorValue(
1036
1039
typeof stream . initialCount === 'number' &&
1037
1040
index >= stream . initialCount
1038
1041
) {
1042
+ const streamContext : StreamContext = { path : pathToArray ( path ) } ;
1043
+ exeContext . streams . add ( streamContext ) ;
1039
1044
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1040
1045
executeStreamIterator (
1041
1046
index ,
@@ -1045,6 +1050,7 @@ async function completeAsyncIteratorValue(
1045
1050
info ,
1046
1051
itemType ,
1047
1052
path ,
1053
+ streamContext ,
1048
1054
stream . label ,
1049
1055
asyncPayloadRecord ,
1050
1056
) ;
@@ -1129,6 +1135,7 @@ function completeListValue(
1129
1135
let previousAsyncPayloadRecord = asyncPayloadRecord ;
1130
1136
const completedResults : Array < unknown > = [ ] ;
1131
1137
let index = 0 ;
1138
+ let streamContext : StreamContext | undefined ;
1132
1139
for ( const item of result ) {
1133
1140
// No need to modify the info object containing the path,
1134
1141
// since from here on it is not ever accessed by resolver functions.
@@ -1139,6 +1146,8 @@ function completeListValue(
1139
1146
typeof stream . initialCount === 'number' &&
1140
1147
index >= stream . initialCount
1141
1148
) {
1149
+ streamContext = { path : pathToArray ( path ) } ;
1150
+ exeContext . streams . add ( streamContext ) ;
1142
1151
previousAsyncPayloadRecord = executeStreamField (
1143
1152
path ,
1144
1153
itemPath ,
@@ -1147,6 +1156,7 @@ function completeListValue(
1147
1156
fieldNodes ,
1148
1157
info ,
1149
1158
itemType ,
1159
+ streamContext ,
1150
1160
stream . label ,
1151
1161
previousAsyncPayloadRecord ,
1152
1162
) ;
@@ -1173,6 +1183,10 @@ function completeListValue(
1173
1183
index ++ ;
1174
1184
}
1175
1185
1186
+ if ( streamContext ) {
1187
+ exeContext . streams . delete ( streamContext ) ;
1188
+ }
1189
+
1176
1190
return containsPromise ? Promise . all ( completedResults ) : completedResults ;
1177
1191
}
1178
1192
@@ -1813,12 +1827,14 @@ function executeStreamField(
1813
1827
fieldNodes : ReadonlyArray < FieldNode > ,
1814
1828
info : GraphQLResolveInfo ,
1815
1829
itemType : GraphQLOutputType ,
1830
+ streamContext : StreamContext ,
1816
1831
label ?: string ,
1817
1832
parentContext ?: AsyncPayloadRecord ,
1818
1833
) : AsyncPayloadRecord {
1819
1834
const asyncPayloadRecord = new StreamRecord ( {
1820
1835
label,
1821
1836
path : itemPath ,
1837
+ streamContext,
1822
1838
parentContext,
1823
1839
exeContext,
1824
1840
} ) ;
@@ -1965,19 +1981,20 @@ async function executeStreamIterator(
1965
1981
info : GraphQLResolveInfo ,
1966
1982
itemType : GraphQLOutputType ,
1967
1983
path : Path ,
1984
+ streamContext : StreamContext ,
1968
1985
label ?: string ,
1969
1986
parentContext ?: AsyncPayloadRecord ,
1970
1987
) : Promise < void > {
1971
1988
let index = initialIndex ;
1972
1989
let previousAsyncPayloadRecord = parentContext ?? undefined ;
1973
- // eslint-disable-next-line no-constant-condition
1974
- while ( true ) {
1990
+ while ( exeContext . streams . has ( streamContext ) ) {
1975
1991
const itemPath = addPath ( path , index , undefined ) ;
1976
1992
const asyncPayloadRecord = new StreamRecord ( {
1977
1993
label,
1978
1994
path : itemPath ,
1979
1995
parentContext : previousAsyncPayloadRecord ,
1980
1996
iterator,
1997
+ streamContext,
1981
1998
exeContext,
1982
1999
} ) ;
1983
2000
@@ -2003,6 +2020,7 @@ async function executeStreamIterator(
2003
2020
// ignore errors
2004
2021
} ) ;
2005
2022
}
2023
+ exeContext . streams . delete ( streamContext ) ;
2006
2024
return ;
2007
2025
}
2008
2026
@@ -2025,6 +2043,7 @@ async function executeStreamIterator(
2025
2043
asyncPayloadRecord . addItems ( completedItems ) ;
2026
2044
2027
2045
if ( done ) {
2046
+ exeContext . streams . delete ( streamContext ) ;
2028
2047
break ;
2029
2048
}
2030
2049
previousAsyncPayloadRecord = asyncPayloadRecord ;
@@ -2038,6 +2057,15 @@ function filterSubsequentPayloads(
2038
2057
currentAsyncRecord : AsyncPayloadRecord | undefined ,
2039
2058
) : void {
2040
2059
const nullPathArray = pathToArray ( nullPath ) ;
2060
+ exeContext . streams . forEach ( ( stream ) => {
2061
+ for ( let i = 0 ; i < nullPathArray . length ; i ++ ) {
2062
+ if ( stream . path [ i ] !== nullPathArray [ i ] ) {
2063
+ // stream points to a path unaffected by this payload
2064
+ return ;
2065
+ }
2066
+ }
2067
+ exeContext . streams . delete ( stream ) ;
2068
+ } ) ;
2041
2069
exeContext . subsequentPayloads . forEach ( ( asyncRecord ) => {
2042
2070
if ( asyncRecord === currentAsyncRecord ) {
2043
2071
// don't remove payload from where error originates
@@ -2055,7 +2083,7 @@ function filterSubsequentPayloads(
2055
2083
// ignore error
2056
2084
} ) ;
2057
2085
}
2058
- exeContext . subsequentPayloads . delete ( asyncRecord ) ;
2086
+ exeContext . subsequentPayloads . delete ( asyncRecord ) ;
2059
2087
} ) ;
2060
2088
}
2061
2089
@@ -2211,6 +2239,9 @@ class DeferredFragmentRecord {
2211
2239
this . _resolve ?.( data ) ;
2212
2240
}
2213
2241
}
2242
+ interface StreamContext {
2243
+ path : Array < string | number > ;
2244
+ }
2214
2245
2215
2246
class StreamRecord {
2216
2247
type : 'stream' ;
@@ -2221,6 +2252,7 @@ class StreamRecord {
2221
2252
promise : Promise < void > ;
2222
2253
parentContext : AsyncPayloadRecord | undefined ;
2223
2254
iterator : AsyncIterator < unknown > | undefined ;
2255
+ streamContext : StreamContext ;
2224
2256
isCompletedIterator ?: boolean ;
2225
2257
isCompleted : boolean ;
2226
2258
_exeContext : ExecutionContext ;
@@ -2229,6 +2261,7 @@ class StreamRecord {
2229
2261
label : string | undefined ;
2230
2262
path : Path | undefined ;
2231
2263
iterator ?: AsyncIterator < unknown > ;
2264
+ streamContext : StreamContext ;
2232
2265
parentContext : AsyncPayloadRecord | undefined ;
2233
2266
exeContext : ExecutionContext ;
2234
2267
} ) {
@@ -2238,6 +2271,7 @@ class StreamRecord {
2238
2271
this . path = pathToArray ( opts . path ) ;
2239
2272
this . parentContext = opts . parentContext ;
2240
2273
this . iterator = opts . iterator ;
2274
+ this . streamContext = opts . streamContext ;
2241
2275
this . errors = [ ] ;
2242
2276
this . _exeContext = opts . exeContext ;
2243
2277
this . _exeContext . subsequentPayloads . add ( this ) ;
0 commit comments