Skip to content

Commit c995b0a

Browse files
authored
Implement parallel handling of batch query #218 (#219)
* Implement parallel handling of batch query #218 * Add test for parallel handling of batch query #218
1 parent 0ac544a commit c995b0a

File tree

2 files changed

+143
-30
lines changed

2 files changed

+143
-30
lines changed

http.go

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"strconv"
1010
"strings"
11+
"sync"
1112

1213
"github.com/nautilus/graphql"
1314
)
@@ -27,6 +28,8 @@ type HTTPOperation struct {
2728
} `json:"extensions"`
2829
}
2930

31+
type setResultFunc func(r map[string]interface{})
32+
3033
func formatErrors(err error) map[string]interface{} {
3134
return formatErrorsWithCode(nil, err, "UNKNOWN_ERROR")
3235
}
@@ -70,12 +73,14 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
7073
/// Handle the operations regardless of the request method
7174

7275
// we have to respond to each operation in the right order
73-
results := []map[string]interface{}{}
76+
results := make([]map[string]interface{}, len(operations))
77+
opWg := new(sync.WaitGroup)
78+
opMutex := new(sync.Mutex)
7479

7580
// the status code to report
7681
statusCode := http.StatusOK
7782

78-
for _, operation := range operations {
83+
for opNum, operation := range operations {
7984
// there might be a query plan cache key embedded in the operation
8085
cacheKey := ""
8186
if operation.Extensions.QueryPlanCache != nil {
@@ -85,10 +90,8 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
8590
// if there is no query or cache key
8691
if operation.Query == "" && cacheKey == "" {
8792
statusCode = http.StatusUnprocessableEntity
88-
results = append(
89-
results,
90-
formatErrorsWithCode(nil, errors.New("could not find query body"), "BAD_USER_INPUT"),
91-
)
93+
results[opNum] = formatErrorsWithCode(nil, errors.New("could not find query body"), "BAD_USER_INPUT")
94+
9295
continue
9396
}
9497

@@ -116,32 +119,12 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
116119
return
117120
}
118121

119-
// fire the query with the request context passed through to execution
120-
result, err := g.Execute(requestContext, plan)
121-
if err != nil {
122-
results = append(results, formatErrorsWithCode(result, err, "INTERNAL_SERVER_ERROR"))
123-
124-
continue
125-
}
126-
127-
// the result for this operation
128-
payload := map[string]interface{}{"data": result}
129-
130-
// if there was a cache key associated with this query
131-
if requestContext.CacheKey != "" {
132-
// embed the cache key in the response
133-
payload["extensions"] = map[string]interface{}{
134-
"persistedQuery": map[string]interface{}{
135-
"sha265Hash": requestContext.CacheKey,
136-
"version": "1",
137-
},
138-
}
139-
}
140-
141-
// add this result to the list
142-
results = append(results, payload)
122+
opWg.Add(1)
123+
go g.executeRequest(requestContext, plan, opWg, g.setResultFunc(opNum, results, opMutex))
143124
}
144125

126+
opWg.Wait()
127+
145128
// the final result depends on whether we are executing in batch mode or not
146129
var finalResponse interface{}
147130
if batchMode {
@@ -165,6 +148,43 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
165148
emitResponse(w, statusCode, string(response))
166149
}
167150

151+
func (g *Gateway) setResultFunc(opNum int, results []map[string]interface{}, opMutex *sync.Mutex) setResultFunc {
152+
return func(r map[string]interface{}) {
153+
opMutex.Lock()
154+
defer opMutex.Unlock()
155+
results[opNum] = r
156+
}
157+
}
158+
159+
func (g *Gateway) executeRequest(requestContext *RequestContext, plan QueryPlanList, opWg *sync.WaitGroup, setResult setResultFunc) {
160+
defer opWg.Done()
161+
162+
// fire the query with the request context passed through to execution
163+
result, err := g.Execute(requestContext, plan)
164+
if err != nil {
165+
setResult(formatErrorsWithCode(result, err, "INTERNAL_SERVER_ERROR"))
166+
167+
return
168+
}
169+
170+
// the result for this operation
171+
payload := map[string]interface{}{"data": result}
172+
173+
// if there was a cache key associated with this query
174+
if requestContext.CacheKey != "" {
175+
// embed the cache key in the response
176+
payload["extensions"] = map[string]interface{}{
177+
"persistedQuery": map[string]interface{}{
178+
"sha265Hash": requestContext.CacheKey,
179+
"version": "1",
180+
},
181+
}
182+
}
183+
184+
// add this result to the list
185+
setResult(payload)
186+
}
187+
168188
// Parses request to operations (single or batch mode).
169189
// Returns an error and an error status code if the request is invalid.
170190
func parseRequest(r *http.Request) (operations []*HTTPOperation, batchMode bool, errStatusCode int, payloadErr error) {

http_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"strconv"
1515
"strings"
1616
"testing"
17+
"time"
1718

1819
"golang.org/x/net/html"
1920

@@ -971,6 +972,98 @@ func TestGraphQLHandler_postBatchWithMultipleFiles(t *testing.T) {
971972
assert.Equal(t, http.StatusOK, result.StatusCode)
972973
}
973974

975+
func TestGraphQLHandler_postBatchParallel(t *testing.T) {
976+
t.Parallel()
977+
schema, err := graphql.LoadSchema(`
978+
type Query {
979+
queryA: String!
980+
queryB: String!
981+
}
982+
`)
983+
assert.NoError(t, err)
984+
985+
// create gateway schema we can test against
986+
gateway, err := New([]*graphql.RemoteSchema{
987+
{Schema: schema, URL: "url-file-upload"},
988+
}, WithExecutor(ExecutorFunc(
989+
func(ec *ExecutionContext) (map[string]interface{}, error) {
990+
if ec.Plan.Operation.Name == "queryAOperation" {
991+
time.Sleep(50 * time.Millisecond)
992+
return map[string]interface{}{
993+
"queryA": "resultA",
994+
}, nil
995+
}
996+
if ec.Plan.Operation.Name == "queryBOperation" {
997+
return map[string]interface{}{
998+
"queryB": "resultB",
999+
}, nil
1000+
}
1001+
1002+
assert.Fail(t, "unexpected operation name", ec.Plan.Operation.Name)
1003+
return nil, nil
1004+
},
1005+
)))
1006+
1007+
if err != nil {
1008+
t.Error(err.Error())
1009+
return
1010+
}
1011+
1012+
request := httptest.NewRequest("POST", "/graphql", strings.NewReader(`[
1013+
{
1014+
"query": "query queryAOperation { queryA }",
1015+
"variables": null
1016+
},
1017+
{
1018+
"query": "query queryBOperation { queryB }",
1019+
"variables": null
1020+
}
1021+
]`))
1022+
1023+
// a recorder so we can check what the handler responded with
1024+
responseRecorder := httptest.NewRecorder()
1025+
1026+
// call the http hander
1027+
gateway.GraphQLHandler(responseRecorder, request)
1028+
1029+
// make sure we got correct order in response
1030+
response := responseRecorder.Result()
1031+
assert.Equal(t, http.StatusOK, response.StatusCode)
1032+
1033+
// read the body
1034+
body, err := io.ReadAll(response.Body)
1035+
assert.NoError(t, response.Body.Close())
1036+
1037+
if err != nil {
1038+
t.Error(err.Error())
1039+
return
1040+
}
1041+
1042+
result := []map[string]interface{}{}
1043+
err = json.Unmarshal(body, &result)
1044+
if err != nil {
1045+
t.Error(err.Error())
1046+
return
1047+
}
1048+
1049+
// we should have gotten 2 responses
1050+
if !assert.Len(t, result, 2) {
1051+
return
1052+
}
1053+
1054+
// make sure there were no errors in the first query
1055+
if firstQuery := result[0]; assert.Nil(t, firstQuery["errors"]) {
1056+
// make sure it has the right id
1057+
assert.Equal(t, map[string]interface{}{"queryA": "resultA"}, firstQuery["data"])
1058+
}
1059+
1060+
// make sure there were no errors in the second query
1061+
if secondQuery := result[1]; assert.Nil(t, secondQuery["errors"]) {
1062+
// make sure it has the right id
1063+
assert.Equal(t, map[string]interface{}{"queryB": "resultB"}, secondQuery["data"])
1064+
}
1065+
}
1066+
9741067
func TestGraphQLHandler_postFilesWithError(t *testing.T) {
9751068
t.Parallel()
9761069
schema, err := graphql.LoadSchema(`

0 commit comments

Comments
 (0)