@@ -32,7 +32,7 @@ type ParallelExecutor struct{}
32
32
type queryExecutionResult struct {
33
33
InsertionPoint []string
34
34
Result map [string ]interface {}
35
- StripNode bool
35
+ Err error
36
36
}
37
37
38
38
// execution is broken up into two phases:
@@ -83,7 +83,7 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
83
83
// the root step could have multiple steps that have to happen
84
84
for _ , step := range ctx .Plan .RootStep .Then {
85
85
stepWg .Add (1 )
86
- go executeStep (ctx , ctx .Plan , step , []string {}, resultLock , ctx .Variables , resultCh , errCh , stepWg )
86
+ go executeStep (ctx , ctx .Plan , step , []string {}, resultLock , ctx .Variables , resultCh , stepWg )
87
87
}
88
88
89
89
// the list of errors we have encountered while executing the plan
@@ -95,24 +95,23 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
95
95
select {
96
96
// we have a new result
97
97
case payload := <- resultCh :
98
- if payload == nil {
99
- continue
100
- }
101
98
ctx .logger .Debug ("Inserting result into " , payload .InsertionPoint )
102
99
ctx .logger .Debug ("Result: " , payload .Result )
103
100
104
101
// we have to grab the value in the result and write it to the appropriate spot in the
105
102
// acumulator.
106
- err := executorInsertObject (ctx , result , resultLock , payload .InsertionPoint , payload .Result )
107
- if err != nil {
108
- errCh <- err
109
- continue
103
+ insertErr := executorInsertObject (ctx , result , resultLock , payload .InsertionPoint , payload .Result )
104
+
105
+ switch {
106
+ case payload .Err != nil : // response errors are the highest priority to return
107
+ errCh <- payload .Err
108
+ case insertErr != nil :
109
+ errCh <- insertErr
110
+ default :
111
+ ctx .logger .Debug ("Done. " , result )
112
+ // one of the queries is done
113
+ stepWg .Done ()
110
114
}
111
-
112
- ctx .logger .Debug ("Done. " , result )
113
- // one of the queries is done
114
- stepWg .Done ()
115
-
116
115
case err := <- errCh :
117
116
if err != nil {
118
117
errMutex .Lock ()
@@ -158,9 +157,41 @@ func executeStep(
158
157
resultLock * sync.Mutex ,
159
158
queryVariables map [string ]interface {},
160
159
resultCh chan * queryExecutionResult ,
161
- errCh chan error ,
162
160
stepWg * sync.WaitGroup ,
163
161
) {
162
+ queryResult , dependentSteps , queryErr := executeOneStep (ctx , plan , step , insertionPoint , resultLock , queryVariables )
163
+ // before publishing the current result, tell the wait-group about the dependent steps to wait for
164
+ stepWg .Add (len (dependentSteps ))
165
+ ctx .logger .Debug ("Pushing Result. Insertion point: " , insertionPoint , ". Value: " , queryResult )
166
+ // send the result to be stitched in with our accumulator
167
+ resultCh <- & queryExecutionResult {
168
+ InsertionPoint : insertionPoint ,
169
+ Result : queryResult ,
170
+ Err : queryErr ,
171
+ }
172
+ // We need to collect all the dependent steps and execute them after emitting the parent result in this function.
173
+ // This avoids a race condition, where the result of a dependent request is published to the
174
+ // result channel even before the result created in this iteration.
175
+ // Execute dependent steps after the main step has been published.
176
+ for _ , sr := range dependentSteps {
177
+ ctx .logger .Info ("Spawn " , sr .insertionPoint )
178
+ go executeStep (ctx , plan , sr .step , sr .insertionPoint , resultLock , queryVariables , resultCh , stepWg )
179
+ }
180
+ }
181
+
182
+ type dependentStepArgs struct {
183
+ step * QueryPlanStep
184
+ insertionPoint []string
185
+ }
186
+
187
+ func executeOneStep (
188
+ ctx * ExecutionContext ,
189
+ plan * QueryPlan ,
190
+ step * QueryPlanStep ,
191
+ insertionPoint []string ,
192
+ resultLock * sync.Mutex ,
193
+ queryVariables map [string ]interface {},
194
+ ) (map [string ]interface {}, []dependentStepArgs , error ) {
164
195
ctx .logger .Debug ("Executing step to be inserted in " , step .ParentType , ". Insertion point: " , insertionPoint )
165
196
166
197
ctx .logger .Debug (step .SelectionSet )
@@ -186,14 +217,12 @@ func executeStep(
186
217
// get the data of the point
187
218
pointData , err := executorGetPointData (head )
188
219
if err != nil {
189
- errCh <- err
190
- return
220
+ return nil , nil , err
191
221
}
192
222
193
223
// if we dont have an id
194
224
if pointData .ID == "" {
195
- errCh <- fmt .Errorf ("Could not find id in path" )
196
- return
225
+ return nil , nil , fmt .Errorf ("Could not find id in path" )
197
226
}
198
227
199
228
// save the id as a variable to the query
@@ -202,8 +231,7 @@ func executeStep(
202
231
203
232
// if there is no queryer
204
233
if step .Queryer == nil {
205
- errCh <- errors .New (" could not find queryer for step" )
206
- return
234
+ return nil , nil , errors .New (" could not find queryer for step" )
207
235
}
208
236
209
237
// the query we will use
@@ -233,8 +261,7 @@ func executeStep(
233
261
}, & queryResult )
234
262
if err != nil {
235
263
ctx .logger .Warn ("Network Error: " , err )
236
- errCh <- err
237
- return
264
+ return queryResult , nil , err
238
265
}
239
266
240
267
// NOTE: this insertion point could point to a list of values. If it did, we have to have
@@ -249,36 +276,19 @@ func executeStep(
249
276
// get the result from the response that we have to stitch there
250
277
extractedResult , err := executorExtractValue (ctx , queryResult , resultLock , []string {"node" })
251
278
if err != nil {
252
- errCh <- err
253
- return
279
+ return nil , nil , err
254
280
}
255
281
256
282
resultObj , ok := extractedResult .(map [string ]interface {})
257
283
if ! ok {
258
- errCh <- fmt .Errorf ("Query result of node query was not an object: %v" , queryResult )
259
- return
284
+ return nil , nil , fmt .Errorf ("Query result of node query was not an object: %v" , queryResult )
260
285
}
261
286
262
287
queryResult = resultObj
263
288
}
264
289
265
- // we need to collect all the dependent steps and execute them at last in this function
266
- // to avoid a race condition, where the result of a dependent request is published to the
267
- // result channel even before the result created in this iteration
268
- type stepArgs struct {
269
- step * QueryPlanStep
270
- insertionPoint []string
271
- }
272
- var dependentSteps []stepArgs
273
- // defer the execution of the dependent steps after the main step has been published
274
- defer func () {
275
- for _ , sr := range dependentSteps {
276
- ctx .logger .Info ("Spawn " , sr .insertionPoint )
277
- go executeStep (ctx , plan , sr .step , sr .insertionPoint , resultLock , queryVariables , resultCh , errCh , stepWg )
278
- }
279
- }()
280
-
281
290
// if there are next steps
291
+ var dependentSteps []dependentStepArgs
282
292
if len (step .Then ) > 0 {
283
293
ctx .logger .Debug ("Kicking off child queries" )
284
294
// we need to find the ids of the objects we are inserting into and then kick of the worker with the right
@@ -288,30 +298,19 @@ func executeStep(
288
298
copy (copiedInsertionPoint , insertionPoint )
289
299
insertPoints , err := executorFindInsertionPoints (ctx , resultLock , dependent .InsertionPoint , step .SelectionSet , queryResult , [][]string {copiedInsertionPoint }, step .FragmentDefinitions )
290
300
if err != nil {
291
- // reset dependent steps - result would be discarded anyways
292
- dependentSteps = nil
293
- errCh <- err
294
- return
301
+ return nil , nil , err
295
302
}
296
303
297
304
// this dependent needs to fire for every object that the insertion point references
298
305
for _ , insertionPoint := range insertPoints {
299
- dependentSteps = append (dependentSteps , stepArgs {
306
+ dependentSteps = append (dependentSteps , dependentStepArgs {
300
307
step : dependent ,
301
308
insertionPoint : insertionPoint ,
302
309
})
303
310
}
304
311
}
305
312
}
306
-
307
- // before publishing the current result, tell the wait-group about the dependent steps to wait for
308
- stepWg .Add (len (dependentSteps ))
309
- ctx .logger .Debug ("Pushing Result. Insertion point: " , insertionPoint , ". Value: " , queryResult )
310
- // send the result to be stitched in with our accumulator
311
- resultCh <- & queryExecutionResult {
312
- InsertionPoint : insertionPoint ,
313
- Result : queryResult ,
314
- }
313
+ return queryResult , dependentSteps , nil
315
314
}
316
315
317
316
func max (a , b int ) int {
@@ -386,7 +385,7 @@ func executorFindInsertionPoints(ctx *ExecutionContext, resultLock *sync.Mutex,
386
385
// make sure we are looking at the top of the selection set next time
387
386
selectionSetRoot = foundSelection .SelectionSet
388
387
389
- var value = resultChunk
388
+ value : = resultChunk
390
389
391
390
// the bit of result chunk with the appropriate key should be a list
392
391
rootValue , ok := value [point ]
0 commit comments