@@ -131,12 +131,66 @@ ExecuteQuery(query, schema, variableValues, initialValue):
131
131
- Let {queryType} be the root Query type in {schema}.
132
132
- Assert: {queryType} is an Object type.
133
133
- Let {selectionSet} be the top level Selection Set in {query}.
134
- - Let {data} be the result of running {ExecuteSelectionSet(selectionSet,
135
- queryType, initialValue, variableValues)} _ normally _ (allowing
136
- parallelization).
134
+ - Let {data}, {pendings} be the result of running
135
+ {ExecuteSelectionSet(selectionSet, queryType, initialValue, variableValues)}
136
+ _ normally _ (allowing parallelization).
137
137
- Let {errors} be the list of all _ field error_ raised while executing the
138
138
selection set.
139
- - Return an unordered map containing {data} and {errors}.
139
+ - If {pendings} is empty:
140
+ - Return an unordered map containing {data} and {errors}.
141
+ - Else:
142
+ - Return {IncrementalStream(data, errors, pendings, variableValues)}.
143
+
144
+ IncrementalStream(data, errors, pendings, variableValues):
145
+
146
+ - Let {responseStream} be a new event stream {responseStream}.
147
+ - Let {hasNext} be {true}.
148
+ - Yield to {responseStream} an unordered map containing {data}, {errors}, and
149
+ {hasNext}.
150
+ - Let {remainingPendings} be an empty list.
151
+ - Let {nextPayloads} be an empty list.
152
+ - Let {nextPendings} be an empty list.
153
+ - Let {nextCompleteds} be an empty list.
154
+ - In parallel, for each new entry {pending} added to {remainingPendings}:
155
+ - Let {id} be the unique identifier for {pending}.
156
+ - Let {path} be the root path for {pending}.
157
+ - Let {nextPending} be an unordered map containing {id}, {path}.
158
+ - Add {nextPending} to {nextPendings}.
159
+ - Let {moreIncrementalPayloads}, {morePendings} be the result of running
160
+ {ExecutePending(pending, variableValues)}.
161
+ - For each entry {payload} in {moreIncrementalPayloads}:
162
+ - Add {payload} to {nextPayloads}.
163
+ - For each {newPending} in {morePendings}:
164
+ - Add {newPending} to {remainingPendings}.
165
+ - Let {nextCompleted} be an unordered map containing {id}.
166
+ - Add {nextCompleted} to {nextCompleteds}.
167
+ - Remove {pending} from {remainingPendings}.
168
+ - Call {FlushIncremental(responseStream, nextPayloads, nextPendings,
169
+ nextCompleteds)}, and reset {nextPayloads}, {nextPendings}, {nextCompleteds}
170
+ to empty lists.
171
+ - TODO: allow this to be batched more - we can group the results from
172
+ multiple {pending} executions into a single incremental response.
173
+ - For each {pending} in {pendings}:
174
+ - Add {pending} tp {remainingPendings}.
175
+ - When {remainingPendings} is empty:
176
+ - Call {FlushIncremental(stream, nextPayloads, newPending, nextCompleteds,
177
+ false)}.
178
+ - Complete {responseStream}.
179
+ - Return {responseStream}.
180
+
181
+ FlushIncremental(responseStream, nextPayloads, nextPendings, nextCompleteds,
182
+ hasNext):
183
+
184
+ - If {hasNext} is not provided, initialize it to {true}.
185
+ - If {nextPayloads} is not empty:
186
+ - Let {incremental} be {nextPayloads}.
187
+ - If {nextPendings} is not empty:
188
+ - Let {pending} be {nextPendings}.
189
+ - If {nextCompleteds} is not empty:
190
+ - Let {completed} be {nextCompleteds}.
191
+ - Let {payload} be an unordered map containing {incremental}, {pending},
192
+ {completed} and {hasNext}.
193
+ - Yield an event to {responseStream} containing {payload}.
140
194
141
195
### Mutation
142
196
@@ -153,11 +207,17 @@ ExecuteMutation(mutation, schema, variableValues, initialValue):
153
207
- Let {mutationType} be the root Mutation type in {schema}.
154
208
- Assert: {mutationType} is an Object type.
155
209
- Let {selectionSet} be the top level Selection Set in {mutation}.
156
- - Let {data} be the result of running {ExecuteSelectionSet(selectionSet,
157
- mutationType, initialValue, variableValues)} _ serially_ .
210
+ - Let {data}, {pendings} be the result of running
211
+ {ExecuteSelectionSet(selectionSet, mutationType, initialValue,
212
+ variableValues)} _ serially_ .
158
213
- Let {errors} be the list of all _ field error_ raised while executing the
159
214
selection set.
160
- - Return an unordered map containing {data} and {errors}.
215
+ - If {pendings} is empty:
216
+ - Return an unordered map containing {data} and {errors}.
217
+ - Else:
218
+ - Note: this places the defers after _ all_ the mutations. This may not be
219
+ desired; we should discuss.
220
+ - Return {IncrementalStream(data, errors, pendings, variableValues)}.
161
221
162
222
### Subscription
163
223
@@ -293,20 +353,29 @@ MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues):
293
353
- For each {event} on {sourceStream}:
294
354
- Let {response} be the result of running
295
355
{ExecuteSubscriptionEvent(subscription, schema, variableValues, event)}.
296
- - Yield an event containing {response}.
356
+ - If {response} is an event stream:
357
+ - For each event {event} in {response}:
358
+ - Yield an event containing {event}.
359
+ - Else:
360
+ - Yield an event containing {response}.
297
361
- When {responseStream} completes: complete this event stream.
298
362
299
363
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
300
364
301
365
- Let {subscriptionType} be the root Subscription type in {schema}.
302
366
- Assert: {subscriptionType} is an Object type.
303
367
- Let {selectionSet} be the top level Selection Set in {subscription}.
304
- - Let {data} be the result of running {ExecuteSelectionSet(selectionSet,
305
- subscriptionType, initialValue, variableValues)} _ normally _ (allowing
306
- parallelization).
368
+ - Let {data}, {pendings} be the result of running
369
+ {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue,
370
+ variableValues)} _ normally _ (allowing parallelization).
307
371
- Let {errors} be the list of all _ field error_ raised while executing the
308
372
selection set.
309
- - Return an unordered map containing {data} and {errors}.
373
+ - If {pendings} is empty:
374
+ - Return an unordered map containing {data} and {errors}.
375
+ - Else:
376
+ - Note: this places the defers after _ all_ the mutations. This may not be
377
+ desired; we should discuss.
378
+ - Return {IncrementalStream(data, errors, pendings, variableValues)}.
310
379
311
380
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to
312
381
{ExecuteQuery()} since this is how each event result is produced.
0 commit comments