Skip to content

Commit 5940bf3

Browse files
author
Matthieu Vachon
committed
Merge branch 'master' into fix/subscription-panic-in-resolver
# Conflicts: # subscription_test.go
2 parents 848e84a + 3c9ac91 commit 5940bf3

File tree

5 files changed

+113
-19
lines changed

5 files changed

+113
-19
lines changed

graphql.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"reflect"
8+
"time"
89

910
"github.com/graph-gophers/graphql-go/errors"
1011
"github.com/graph-gophers/graphql-go/internal/common"
@@ -64,13 +65,14 @@ type Schema struct {
6465
schema *schema.Schema
6566
res *resolvable.Schema
6667

67-
maxDepth int
68-
maxParallelism int
69-
tracer trace.Tracer
70-
validationTracer trace.ValidationTracer
71-
logger log.Logger
72-
useStringDescriptions bool
73-
disableIntrospection bool
68+
maxDepth int
69+
maxParallelism int
70+
tracer trace.Tracer
71+
validationTracer trace.ValidationTracer
72+
logger log.Logger
73+
useStringDescriptions bool
74+
disableIntrospection bool
75+
subscribeResolverTimeout time.Duration
7476
}
7577

7678
// SchemaOpt is an option to pass to ParseSchema or MustParseSchema.
@@ -135,6 +137,15 @@ func DisableIntrospection() SchemaOpt {
135137
}
136138
}
137139

140+
// SubscribeResolverTimeout is an option to control the amount of time
141+
// we allow for a single subscribe message resolver to complete it's job
142+
// before it times out and returns an error to the subscriber.
143+
func SubscribeResolverTimeout(timeout time.Duration) SchemaOpt {
144+
return func(s *Schema) {
145+
s.subscribeResolverTimeout = timeout
146+
}
147+
}
148+
138149
// Response represents a typical response of a GraphQL server. It may be encoded to JSON directly or
139150
// it may be further processed to a custom response type, for example to include custom error data.
140151
// Errors are intentionally serialized first based on the advice in https://github.com/facebook/graphql/commit/7b40390d48680b15cb93e02d46ac5eb249689876#diff-757cea6edf0288677a9eea4cfc801d87R107
@@ -190,7 +201,7 @@ func (s *Schema) exec(ctx context.Context, queryString string, operationName str
190201

191202
// Subscriptions are not valid in Exec. Use schema.Subscribe() instead.
192203
if op.Type == query.Subscription {
193-
return &Response{Errors: []*errors.QueryError{&errors.QueryError{Message: "graphql-ws protocol header is missing"}}}
204+
return &Response{Errors: []*errors.QueryError{{Message: "graphql-ws protocol header is missing"}}}
194205
}
195206
if op.Type == query.Mutation {
196207
if _, ok := s.schema.EntryPoints["mutation"]; !ok {

internal/exec/exec.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"reflect"
99
"sync"
10+
"time"
1011

1112
"github.com/graph-gophers/graphql-go/errors"
1213
"github.com/graph-gophers/graphql-go/internal/common"
@@ -20,9 +21,10 @@ import (
2021

2122
type Request struct {
2223
selected.Request
23-
Limiter chan struct{}
24-
Tracer trace.Tracer
25-
Logger log.Logger
24+
Limiter chan struct{}
25+
Tracer trace.Tracer
26+
Logger log.Logger
27+
SubscribeResolverTimeout time.Duration
2628
}
2729

2830
func (r *Request) handlePanic(ctx context.Context) {

internal/exec/subscribe.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,15 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query
4949
result = callOut[0]
5050

5151
if f.field.HasError && !callOut[1].IsNil() {
52-
resolverErr := callOut[1].Interface().(error)
53-
err = errors.Errorf("%s", resolverErr)
54-
err.ResolverError = resolverErr
52+
switch resolverErr := callOut[1].Interface().(type) {
53+
case *errors.QueryError:
54+
err = resolverErr
55+
case error:
56+
err = errors.Errorf("%s", resolverErr)
57+
err.ResolverError = resolverErr
58+
default:
59+
panic(fmt.Errorf("can only deal with *QueryError and error types, got %T", resolverErr))
60+
}
5561
}
5662
}()
5763

@@ -120,8 +126,12 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query
120126
}
121127
var out bytes.Buffer
122128
func() {
123-
// TODO: configurable timeout
124-
subCtx, cancel := context.WithTimeout(ctx, time.Second)
129+
timeout := r.SubscribeResolverTimeout
130+
if timeout == 0 {
131+
timeout = time.Second
132+
}
133+
134+
subCtx, cancel := context.WithTimeout(ctx, timeout)
125135
defer cancel()
126136

127137
// resolve response

subscription_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"testing"
8+
"time"
89

910
graphql "github.com/graph-gophers/graphql-go"
1011
qerrors "github.com/graph-gophers/graphql-go/errors"
@@ -24,6 +25,7 @@ func (r *helloResolver) Hello() string {
2425
}
2526

2627
var resolverErr = errors.New("resolver error")
28+
var resolverQueryErr = &qerrors.QueryError{Message: "query", ResolverError: resolverErr}
2729

2830
type helloSaidResolver struct {
2931
err error
@@ -265,6 +267,27 @@ func TestSchemaSubscribe(t *testing.T) {
265267
},
266268
},
267269
},
270+
{
271+
Name: "subscription_resolver_can_query_error",
272+
Schema: graphql.MustParseSchema(schema, &rootResolver{
273+
helloSaidResolver: &helloSaidResolver{err: resolverQueryErr},
274+
}),
275+
Query: `
276+
subscription onHelloSaid {
277+
helloSaid {
278+
msg
279+
}
280+
}
281+
`,
282+
ExpectedResults: []gqltesting.TestResponse{
283+
{
284+
Data: json.RawMessage(`
285+
null
286+
`),
287+
Errors: []*qerrors.QueryError{resolverQueryErr},
288+
},
289+
},
290+
},
268291
{
269292
Name: "schema_without_resolver_errors",
270293
Schema: graphql.MustParseSchema(schema, nil),
@@ -474,6 +497,53 @@ const schema = `
474497
}
475498
`
476499

500+
type subscriptionsCustomTimeout struct{}
501+
502+
type messageResolver struct{}
503+
504+
func (r messageResolver) Msg() string {
505+
time.Sleep(5 * time.Millisecond)
506+
return "failed!"
507+
}
508+
509+
func (r *subscriptionsCustomTimeout) OnTimeout() <-chan *messageResolver {
510+
c := make(chan *messageResolver)
511+
go func() {
512+
c <- &messageResolver{}
513+
close(c)
514+
}()
515+
516+
return c
517+
}
518+
519+
func TestSchemaSubscribe_CustomResolverTimeout(t *testing.T) {
520+
r := &struct {
521+
*subscriptionsCustomTimeout
522+
}{
523+
subscriptionsCustomTimeout: &subscriptionsCustomTimeout{},
524+
}
525+
gqltesting.RunSubscribe(t, &gqltesting.TestSubscription{
526+
Schema: graphql.MustParseSchema(`
527+
type Query {}
528+
type Subscription {
529+
onTimeout : Message!
530+
}
531+
532+
type Message {
533+
msg: String!
534+
}
535+
`, r, graphql.SubscribeResolverTimeout(1*time.Millisecond)),
536+
Query: `
537+
subscription {
538+
onTimeout { msg }
539+
}
540+
`,
541+
ExpectedResults: []gqltesting.TestResponse{
542+
{Errors: []*qerrors.QueryError{{Message: "context deadline exceeded"}}},
543+
},
544+
})
545+
}
546+
477547
type subscriptionsPanicInResolver struct{}
478548

479549
func (r *subscriptionsPanicInResolver) OnPanic() <-chan string {

subscriptions.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ func (s *Schema) subscribe(ctx context.Context, queryString string, operationNam
5454
Vars: variables,
5555
Schema: s.schema,
5656
},
57-
Limiter: make(chan struct{}, s.maxParallelism),
58-
Tracer: s.tracer,
59-
Logger: s.logger,
57+
Limiter: make(chan struct{}, s.maxParallelism),
58+
Tracer: s.tracer,
59+
Logger: s.logger,
60+
SubscribeResolverTimeout: s.subscribeResolverTimeout,
6061
}
6162
varTypes := make(map[string]*introspection.Type)
6263
for _, v := range op.Vars {

0 commit comments

Comments
 (0)