@@ -139,56 +139,175 @@ ExecuteQuery(query, schema, variableValues, initialValue):
139
139
- If {defers} is an empty map and {streams} is an empty list:
140
140
- Return an unordered map containing {data} and {errors}.
141
141
- Else:
142
- - Return {IncrementalEventStream(data, errors, defers, streams,
142
+ - Let {responseStream} be a new event stream {responseStream}.
143
+ - Call {IncrementalEventStream(responseStream, data, errors, defers, streams,
143
144
variableValues)}.
145
+ - Return {responseStream}.
144
146
145
- IncrementalEventStream(data, errors, defers, streams, variableValues):
147
+ IncrementalEventStream(responseStream, data, errors, initialDefers,
148
+ initialStreams, variableValues):
146
149
147
- - Let {responseStream} be a new event stream {responseStream}.
150
+ - Let {nextId} be {0}.
151
+ - Let {remainingDefers} be an empty set.
152
+ - Let {remainingStreams} be an empty set.
148
153
- Let {hasNext} be {true}.
149
- - Yield to {responseStream} an unordered map containing {data}, {errors}, and
150
- {hasNext}.
151
- - Let {remainingPendings} be an empty list.
152
- - Let {nextPayloads} be an empty list.
153
- - Let {nextPendings} be an empty list.
154
- - Let {nextCompleteds} be an empty list.
155
- - In parallel, for each new entry {pending} added to {remainingPendings}:
156
- - Let {id} be the unique identifier for {pending}.
157
- - Let {path} be the root path for {pending}.
158
- - Let {nextPending} be an unordered map containing {id}, {path}.
159
- - Add {nextPending} to {nextPendings}.
160
- - Let {moreIncrementalPayloads}, {morePendings} be the result of running
161
- {ExecutePending(pending, variableValues)}.
162
- - For each entry {payload} in {moreIncrementalPayloads}:
163
- - Add {payload} to {nextPayloads}.
164
- - For each {newPending} in {morePendings}:
165
- - Add {newPending} to {remainingPendings}.
166
- - Let {nextCompleted} be an unordered map containing {id}.
167
- - Add {nextCompleted} to {nextCompleteds}.
168
- - Remove {pending} from {remainingPendings}.
169
- - Call {FlushIncremental(responseStream, nextPayloads, nextPendings,
170
- nextCompleteds)}, and reset {nextPayloads}, {nextPendings}, {nextCompleteds}
171
- to empty lists.
172
- - TODO: allow this to be batched more - we can group the results from
173
- multiple {pending} executions into a single incremental response.
174
- - For each {pending} in {pendings}:
175
- - Add {pending} tp {remainingPendings}.
176
- - When {remainingPendings} is empty:
177
- - Call {FlushIncremental(stream, nextPayloads, newPending, nextCompleteds,
178
- false)}.
179
- - Complete {responseStream}.
180
- - Return {responseStream}.
181
-
182
- FlushIncremental(responseStream, nextPayloads, nextPendings, nextCompleteds,
154
+ - Let {pending} be an empty list.
155
+ - If {initialDefers} is not an empty object:
156
+ - Let {id} be {nextId} and increment {nextId} by one.
157
+ - Let {path} be the longest common path prefix list for every path list key in
158
+ {initialDefers}, or the empty list if no such list exists.
159
+ - TODO: This really needs rewording!
160
+ - Let {pendingPayload} be an unordered map containing {id}, {path}.
161
+ - Add {pendingPayload} to {pending}.
162
+ - Let {defers} be {initialDefers}.
163
+ - Let {pendingDefer} be an unordered map containing {id}, {defers}.
164
+ - Add {pendingDefer} to {remainingDefers}.
165
+ - If {initialStreams} is not an empty list:
166
+ - For each entry {streamDetails} in {initialStreams}:
167
+ - Let {id} be {nextId} and increment {nextId} by one.
168
+ - Let {path} be the value for the key {path} in {streamDetails}.
169
+ - Let {pendingPayload} be an unordered map containing {id}, {path}.
170
+ - Add {pendingPayload} to {pending}.
171
+ - Let {pendingStream} be an unordered map containing {id}, {streamDetails}.
172
+ - Add {pendingStream} to {remainingStreams}.
173
+ - Let {initialResponse} be an unordered map containing {data}, {errors},
174
+ {pending}, and {hasNext}.
175
+ - Yield to {responseStream} the value {initialResponse}.
176
+ - Let {incremental} be an empty list.
177
+ - Let {errors} be an empty list.
178
+ - Let {pending} be an empty list.
179
+ - Let {completed} be an empty list.
180
+ - When you {FlushStream(hasNext)}:
181
+ - If {hasNext} is not provided, initialize it to {true}.
182
+ - Let {incrementalPayload} be an empty unordered map.
183
+ - Add {hasNext} to {incrementalPayload}.
184
+ - If {incremental} is not empty:
185
+ - Add {incremental} to {incrementalPayload}.
186
+ - If {errors} is not empty:
187
+ - Add {errors} to {incrementalPayload}.
188
+ - If {pending} is not empty:
189
+ - Add {pending} to {incrementalPayload}.
190
+ - If {pending} is not empty:
191
+ - Add {pending} to {incrementalPayload}.
192
+ - Yield to {responseStream} the value {incrementalPayload}.
193
+ - Reset {incremental} to an empty list.
194
+ - Reset {errors} to an empty list.
195
+ - Reset {pending} to an empty list.
196
+ - Reset {completed} to an empty list.
197
+ - While {remainingDefers} is not empty or {remainingStreams} is not empty:
198
+ - If {remainingDefers} is not empty:
199
+ - Let {pendingDefer} be the first entry in {remainingDefers}.
200
+ - Remove {pendingDefer} from {remainingDefers}.
201
+ - Let {id} be the value for key {id} in {pendingDefer}.
202
+ - Let {defers} be the value for key {defers} in {pendingDefer}.
203
+ - Let {batchIncremental} be an empty list.
204
+ - Let {batchErrors} be an empty list.
205
+ - Let {batchDefers} be an empty unordered map.
206
+ - Let {batchStreams} be an empty list.
207
+ - For each key {path} and value {deferredDetailsForPath} in {defers}, in
208
+ parallel:
209
+ - Let {objectType} be the value for key {objectType} in
210
+ {deferredDetailsForPath}.
211
+ - Let {objectValue} be the value for key {objectValue} in
212
+ {deferredDetailsForPath}.
213
+ - Let {allFieldDetails} be the value for key {allFieldDetails} in
214
+ {deferredDetailsForPath}.
215
+ - Let {fields} be a list of all the values of the {field} key in the
216
+ entries of {allFieldDetails}.
217
+ - Let {selectionSet} be a new selection set consisting of {fields}.
218
+ - Let {data}, {childDefers} and {childStreams} be
219
+ {ExecuteSelectionSet(selectionSet, objectType, objectValue,
220
+ variableValues, path)}.
221
+ - Let {errors} be the list of all _ field error_ raised while executing the
222
+ defer.
223
+ - Let {incrementalPayload} be an unordered object containing {path},
224
+ {data}, {errors}.
225
+ - Append {incrementalPayload} to {batchIncremental}.
226
+ - Add the entries of {childDefers} into {batchDefers}. Note: {childDefers}
227
+ and {batchDefers} will never have keys in common.
228
+ - For each entry {stream} in {childStreams}, append {stream} to
229
+ {batchStreams}.
230
+ - For each entry {incrementalPayload} in {batchIncremental}, append
231
+ {incrementalPayload} to {incremental}.
232
+ - If {batchDefers} is not an empty object:
233
+ - Let {id} be {nextId} and increment {nextId} by one.
234
+ - Let {path} be the longest common path prefix list for every path list
235
+ key in {batchDefers}, or the empty list if no such list exists.
236
+ - TODO: This really needs rewording!
237
+ - Let {pendingPayload} be an unordered map containing {id}, {path}.
238
+ - Add {pendingPayload} to {pending}.
239
+ - Let {defers} be {batchDefers}.
240
+ - Let {pendingDefer} be an unordered map containing {id}, {defers}.
241
+ - Add {pendingDefer} to {remainingDefers}.
242
+ - If {batchStreams} is not an empty list:
243
+ - For each entry {streamDetails} in {batchStreams}:
244
+ - Let {id} be {nextId} and increment {nextId} by one.
245
+ - Let {path} be the value for the key {path} in {streamDetails}.
246
+ - Let {pendingPayload} be an unordered map containing {id}, {path}.
247
+ - Add {pendingPayload} to {pending}.
248
+ - Let {pendingStream} be an unordered map containing {id},
249
+ {streamDetails}.
250
+ - Add {pendingStream} to {remainingStreams}.
251
+ - Optionally, {FlushStream()}.
252
+ - Else
253
+ - Assert: {remainingStreams} is not empty.
254
+ - Let {stream} be the first entry in {remainingStreams}.
255
+ - Remove {stream} from {remainingStreams}.
256
+ - {FlushStream(false)}.
257
+ - Complete {responseStream}.
258
+
259
+ - TODO: Delete this
260
+ - Let {nextCompleted} be an empty list.
261
+ - ...
262
+ - Add {defers} to {remainingDefers}.
263
+ - Add each entry in {streams} to {remainingStreams}.
264
+ - ...
265
+ - Yield to {responseStream} an unordered map containing {data}, {errors}, and
266
+ {hasNext}.
267
+ - ...
268
+ - Let {nextIncremental} be an empty list.
269
+ - Let {nextPending} be an empty list.
270
+ - Let {nextCompleted} be an empty list.
271
+ - In parallel, for each entry {defers} in {remainingDefers} (including new
272
+ entries added during this loop):
273
+ - Let {id} be {index}, then increment {index} by one.
274
+ - Let {path} be the longest common path prefix list for every path list key
275
+ in {defers}, or the empty list if no such list exists.
276
+ - TODO: This really needs rewording!
277
+ - Let {nextPending} be an unordered map containing {id}, {path}.
278
+ - Add {nextPending} to {nextPending}.
279
+ - Let {moreIncrementalPayloads}, {morePendings} be the result of running
280
+ {ExecutePending(pending, variableValues)}.
281
+ - For each entry {payload} in {moreIncrementalPayloads}:
282
+ - Add {payload} to {nextIncremental}.
283
+ - For each {newPending} in {morePendings}:
284
+ - Add {newPending} to {remainingPendings}.
285
+ - Let {nextCompleted} be an unordered map containing {id}.
286
+ - Add {nextCompleted} to {nextCompleted}.
287
+ - Remove {pending} from {remainingPendings}.
288
+ - Call {FlushIncremental(responseStream, nextIncremental, nextPending,
289
+ nextCompleted)}, and reset {nextIncremental}, {nextPending},
290
+ {nextCompleted} to empty lists.
291
+ - TODO: allow this to be batched more - we can group the results from
292
+ multiple {pending} executions into a single incremental response.
293
+ - For each {pending} in {pendings}:
294
+ - Add {pending} tp {remainingPendings}.
295
+ - When {remainingPendings} is empty:
296
+ - Call {FlushIncremental(stream, nextIncremental, newPending, nextCompleted,
297
+ false)}.
298
+ - Complete {responseStream}.
299
+ - Return {responseStream}.
300
+
301
+ FlushIncremental(responseStream, nextIncremental, nextPending, nextCompleted,
183
302
hasNext):
184
303
185
304
- If {hasNext} is not provided, initialize it to {true}.
186
- - If {nextPayloads } is not empty:
187
- - Let {incremental} be {nextPayloads }.
188
- - If {nextPendings } is not empty:
189
- - Let {pending} be {nextPendings }.
190
- - If {nextCompleteds } is not empty:
191
- - Let {completed} be {nextCompleteds }.
305
+ - If {nextIncremental } is not empty:
306
+ - Let {incremental} be {nextIncremental }.
307
+ - If {nextPending } is not empty:
308
+ - Let {pending} be {nextPending }.
309
+ - If {nextCompleted } is not empty:
310
+ - Let {completed} be {nextCompleted }.
192
311
- Let {payload} be an unordered map containing {incremental}, {pending},
193
312
{completed} and {hasNext}.
194
313
- Yield an event to {responseStream} containing {payload}.
@@ -424,9 +543,13 @@ parentPath):
424
543
{objectType}.
425
544
- If {fieldType} is defined:
426
545
- If every entry in {fieldDetails} has {isDeferred} set to {true}:
427
- - Let {deferredGroupForPath} be the list in {defers} for {path}; if no
428
- such list exists, create it as an empty list.
429
- - Append {fieldDetails} to {deferredGroupForPath}.
546
+ - Let {deferredDetailsForPath} be the object in {defers} for {path}; if no
547
+ such list exists, create it as an unordered map containing {objectType},
548
+ {objectValue} and {allFieldDetails}, where {allFieldDetails} is an empty
549
+ list.
550
+ - Let {group} be the value in {deferredDetailsForPath} for key
551
+ {allFieldDetails}.
552
+ - Append {fieldDetails} to {allFieldDetails}.
430
553
- Else:
431
554
- Let {responseValue}, {childDefers}, {childStreams} be
432
555
{ExecuteField(objectType, objectValue, fieldType, fieldDetails,
@@ -451,7 +574,7 @@ parentPath):
451
574
- Set {initialValues} as the value for {responseKey} in {resultMap}.
452
575
- If there are (or may be) values in {remainingValues}:
453
576
- Let {streamDetails} be an unordered map containing {path},
454
- {remainingValues} and {fieldDetails}.
577
+ {remainingValues}, {initialCount} and {fieldDetails}.
455
578
- Append {streamDetails} to {streams}.
456
579
- Else:
457
580
- Set {responseValue} as the value for {responseKey} in {resultMap}.
0 commit comments