Skip to content

Commit 0afdd23

Browse files
authored
Error source handler middleware (#1101)
Adds an error source handler middleware that can be used both within the SDK and in Grafana.
1 parent 64eea53 commit 0afdd23

8 files changed

+406
-11
lines changed

backend/adapter_utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func metricWrapper(next handlerWrapperFunc) handlerWrapperFunc {
7575
endpoint := EndpointFromContext(ctx)
7676
status, err := next(ctx)
7777

78-
pluginRequestCounter.WithLabelValues(endpoint.String(), status.String(), string(errorSourceFromContext(ctx))).Inc()
78+
pluginRequestCounter.WithLabelValues(endpoint.String(), status.String(), string(ErrorSourceFromContext(ctx))).Inc()
7979

8080
return status, err
8181
}
@@ -106,7 +106,7 @@ func tracingWrapper(next handlerWrapperFunc) handlerWrapperFunc {
106106

107107
span.SetAttributes(
108108
attribute.String("request_status", status.String()),
109-
attribute.String("status_source", string(errorSourceFromContext(ctx))),
109+
attribute.String("status_source", string(ErrorSourceFromContext(ctx))),
110110
)
111111

112112
if err != nil {
@@ -136,7 +136,7 @@ func logWrapper(next handlerWrapperFunc) handlerWrapperFunc {
136136
logParams = append(logParams, "error", err)
137137
}
138138

139-
logParams = append(logParams, "statusSource", string(errorSourceFromContext(ctx)))
139+
logParams = append(logParams, "statusSource", string(ErrorSourceFromContext(ctx)))
140140

141141
if status > RequestStatusCancelled {
142142
logFunc = ctxLogger.Error

backend/adapter_utils_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestErrorWrapper(t *testing.T) {
1919
status, err := wrapper(ctx)
2020
require.ErrorIs(t, err, actualErr)
2121
require.Equal(t, RequestStatusError, status)
22-
require.Equal(t, DefaultErrorSource, errorSourceFromContext(ctx))
22+
require.Equal(t, DefaultErrorSource, ErrorSourceFromContext(ctx))
2323
})
2424

2525
t.Run("Downstream error should set downstream error source in context", func(t *testing.T) {
@@ -32,6 +32,6 @@ func TestErrorWrapper(t *testing.T) {
3232
status, err := wrapper(ctx)
3333
require.ErrorIs(t, err, actualErr)
3434
require.Equal(t, RequestStatusError, status)
35-
require.Equal(t, ErrorSourceDownstream, errorSourceFromContext(ctx))
35+
require.Equal(t, ErrorSourceDownstream, ErrorSourceFromContext(ctx))
3636
})
3737
}

backend/data_adapter.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,19 @@ func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataR
4646
continue
4747
}
4848

49-
// if error source not set and the error is a downstream error, set error source to downstream.
50-
if !r.ErrorSource.IsValid() && IsDownstreamError(r.Error) {
51-
r.ErrorSource = ErrorSourceDownstream
49+
if !r.ErrorSource.IsValid() {
50+
// if the error is a downstream error, set error source to downstream, otherwise plugin.
51+
if IsDownstreamError(r.Error) {
52+
r.ErrorSource = ErrorSourceDownstream
53+
} else {
54+
r.ErrorSource = ErrorSourcePlugin
55+
}
56+
resp.Responses[refID] = r
5257
}
5358

5459
if !r.Status.IsValid() {
5560
r.Status = statusFromError(r.Error)
61+
resp.Responses[refID] = r
5662
}
5763

5864
if r.ErrorSource == ErrorSourceDownstream {

backend/data_adapter_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,31 @@ func TestQueryData(t *testing.T) {
224224
require.NoError(t, err)
225225
}
226226

227-
ss := errorSourceFromContext(actualCtx)
227+
ss := ErrorSourceFromContext(actualCtx)
228228
require.Equal(t, tc.expErrorSource, ss)
229229
})
230230
}
231231
})
232+
233+
t.Run("QueryData response without valid error source error should set error source", func(t *testing.T) {
234+
someErr := errors.New("oops")
235+
downstreamErr := DownstreamError(someErr)
236+
a := newDataSDKAdapter(QueryDataHandlerFunc(func(_ context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) {
237+
return &QueryDataResponse{
238+
Responses: map[string]DataResponse{
239+
"A": {Error: someErr},
240+
"B": {Error: downstreamErr},
241+
},
242+
}, nil
243+
}))
244+
resp, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
245+
PluginContext: &pluginv2.PluginContext{},
246+
})
247+
248+
require.NoError(t, err)
249+
require.Equal(t, ErrorSourcePlugin, ErrorSource(resp.Responses["A"].ErrorSource))
250+
require.Equal(t, ErrorSourceDownstream, ErrorSource(resp.Responses["B"].ErrorSource))
251+
})
232252
}
233253

234254
var finalRoundTripper = httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {

backend/error_source.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,20 @@ func IsDownstreamHTTPError(err error) bool {
3838
return status.IsDownstreamHTTPError(err)
3939
}
4040

41+
// DownstreamError creates a new error with status [ErrorSourceDownstream].
4142
func DownstreamError(err error) error {
4243
return status.DownstreamError(err)
4344
}
4445

46+
// DownstreamErrorf creates a new error with status [ErrorSourceDownstream] and formats
47+
// according to a format specifier and returns the string as a value that satisfies error.
4548
func DownstreamErrorf(format string, a ...any) error {
4649
return DownstreamError(fmt.Errorf(format, a...))
4750
}
4851

49-
func errorSourceFromContext(ctx context.Context) ErrorSource {
52+
// ErrorSourceFromContext returns the error source stored in the context.
53+
// If no error source is stored in the context, [DefaultErrorSource] is returned.
54+
func ErrorSourceFromContext(ctx context.Context) ErrorSource {
5055
return status.SourceFromContext(ctx)
5156
}
5257

backend/error_source_middleware.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
)
8+
9+
// NewErrorSourceMiddleware returns a new backend.HandlerMiddleware that sets the error source in the
10+
// context.Context, based on returned errors or query data response errors.
11+
// If at least one query data response has a "downstream" error source and there isn't one with a "plugin" error source,
12+
// the error source in the context is set to "downstream".
13+
func NewErrorSourceMiddleware() HandlerMiddleware {
14+
return HandlerMiddlewareFunc(func(next Handler) Handler {
15+
return &ErrorSourceMiddleware{
16+
BaseHandler: NewBaseHandler(next),
17+
}
18+
})
19+
}
20+
21+
type ErrorSourceMiddleware struct {
22+
BaseHandler
23+
}
24+
25+
func (m *ErrorSourceMiddleware) handleDownstreamError(ctx context.Context, err error) error {
26+
if err == nil {
27+
return nil
28+
}
29+
30+
if IsDownstreamError(err) {
31+
if innerErr := WithDownstreamErrorSource(ctx); innerErr != nil {
32+
return fmt.Errorf("failed to set downstream error source: %w", errors.Join(innerErr, err))
33+
}
34+
}
35+
36+
return err
37+
}
38+
39+
func (m *ErrorSourceMiddleware) QueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataResponse, error) {
40+
resp, err := m.BaseHandler.QueryData(ctx, req)
41+
// we want to always process the error here first before checking anything else
42+
// because we want the opportunity to set the error source in the context.
43+
err = m.handleDownstreamError(ctx, err)
44+
45+
// no point in continue if we have an error or no response to process, so we return early.
46+
if err != nil || resp == nil || len(resp.Responses) == 0 {
47+
return resp, err
48+
}
49+
50+
// Set downstream error source in the context if there's at least one response with downstream error source,
51+
// and if there's no plugin error
52+
var hasPluginError bool
53+
var hasDownstreamError bool
54+
for refID, r := range resp.Responses {
55+
if r.Error == nil {
56+
continue
57+
}
58+
59+
if !r.ErrorSource.IsValid() {
60+
// if the error is a downstream error, set error source to downstream, otherwise plugin.
61+
if IsDownstreamError(r.Error) {
62+
r.ErrorSource = ErrorSourceDownstream
63+
} else {
64+
r.ErrorSource = ErrorSourcePlugin
65+
}
66+
resp.Responses[refID] = r
67+
}
68+
69+
if !r.Status.IsValid() {
70+
r.Status = statusFromError(r.Error)
71+
resp.Responses[refID] = r
72+
}
73+
74+
if r.ErrorSource == ErrorSourceDownstream {
75+
hasDownstreamError = true
76+
} else {
77+
hasPluginError = true
78+
}
79+
}
80+
81+
// A plugin error has higher priority than a downstream error,
82+
// so set to downstream only if there's no plugin error
83+
if hasDownstreamError && !hasPluginError {
84+
if err := WithDownstreamErrorSource(ctx); err != nil {
85+
return resp, fmt.Errorf("failed to set downstream status source: %w", err)
86+
}
87+
}
88+
89+
return resp, err
90+
}
91+
92+
func (m *ErrorSourceMiddleware) CallResource(ctx context.Context, req *CallResourceRequest, sender CallResourceResponseSender) error {
93+
err := m.BaseHandler.CallResource(ctx, req, sender)
94+
return m.handleDownstreamError(ctx, err)
95+
}
96+
97+
func (m *ErrorSourceMiddleware) CheckHealth(ctx context.Context, req *CheckHealthRequest) (*CheckHealthResult, error) {
98+
resp, err := m.BaseHandler.CheckHealth(ctx, req)
99+
return resp, m.handleDownstreamError(ctx, err)
100+
}
101+
102+
func (m *ErrorSourceMiddleware) CollectMetrics(ctx context.Context, req *CollectMetricsRequest) (*CollectMetricsResult, error) {
103+
resp, err := m.BaseHandler.CollectMetrics(ctx, req)
104+
return resp, m.handleDownstreamError(ctx, err)
105+
}
106+
107+
func (m *ErrorSourceMiddleware) SubscribeStream(ctx context.Context, req *SubscribeStreamRequest) (*SubscribeStreamResponse, error) {
108+
resp, err := m.BaseHandler.SubscribeStream(ctx, req)
109+
return resp, m.handleDownstreamError(ctx, err)
110+
}
111+
112+
func (m *ErrorSourceMiddleware) PublishStream(ctx context.Context, req *PublishStreamRequest) (*PublishStreamResponse, error) {
113+
resp, err := m.BaseHandler.PublishStream(ctx, req)
114+
return resp, m.handleDownstreamError(ctx, err)
115+
}
116+
117+
func (m *ErrorSourceMiddleware) RunStream(ctx context.Context, req *RunStreamRequest, sender *StreamSender) error {
118+
err := m.BaseHandler.RunStream(ctx, req, sender)
119+
return m.handleDownstreamError(ctx, err)
120+
}
121+
122+
func (m *ErrorSourceMiddleware) ValidateAdmission(ctx context.Context, req *AdmissionRequest) (*ValidationResponse, error) {
123+
resp, err := m.BaseHandler.ValidateAdmission(ctx, req)
124+
return resp, m.handleDownstreamError(ctx, err)
125+
}
126+
127+
func (m *ErrorSourceMiddleware) MutateAdmission(ctx context.Context, req *AdmissionRequest) (*MutationResponse, error) {
128+
resp, err := m.BaseHandler.MutateAdmission(ctx, req)
129+
return resp, m.handleDownstreamError(ctx, err)
130+
}
131+
132+
func (m *ErrorSourceMiddleware) ConvertObjects(ctx context.Context, req *ConversionRequest) (*ConversionResponse, error) {
133+
resp, err := m.BaseHandler.ConvertObjects(ctx, req)
134+
return resp, m.handleDownstreamError(ctx, err)
135+
}

0 commit comments

Comments
 (0)