Skip to content

Commit 584d5d7

Browse files
authored
Improve error handling (#177)
* - SQL errors should be downstream - add more PGX specific errors * - better check for retryable errors * - cleanup the approach
1 parent a16b389 commit 584d5d7

File tree

5 files changed

+264
-9
lines changed

5 files changed

+264
-9
lines changed

datasource.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,14 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
268268
}
269269
}
270270

271+
// Check if the error is retryable and convert to downstream error if so
272+
if errors.Is(err, ErrorQuery) && shouldRetry(ds.DriverSettings().RetryOn, err.Error()) {
273+
// Convert retryable errors to downstream errors
274+
if !backend.IsDownstreamError(err) {
275+
err = backend.DownstreamError(err)
276+
}
277+
}
278+
271279
// allow retries on timeouts
272280
if errors.Is(err, context.DeadlineExceeded) {
273281
for i := 0; i < ds.DriverSettings().Retries; i++ {

datasource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func Test_query_panic_in_rows_validation(t *testing.T) {
269269

270270
res := data.Responses["foo"]
271271
assert.NotNil(t, res.Error)
272-
assert.Contains(t, res.Error.Error(), "failed to validate rows")
272+
assert.Contains(t, res.Error.Error(), "SQL rows validation failed")
273273
assert.NotNil(t, res.Frames) // Error frame is returned, not nil
274274
}
275275

errors.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sqlds
22

33
import (
44
"errors"
5+
"strings"
56

67
"github.com/grafana/grafana-plugin-sdk-go/backend"
78
)
@@ -17,6 +18,12 @@ var (
1718
ErrorTimeout = errors.New("query timeout exceeded")
1819
// ErrorNoResults is returned if there were no results returned
1920
ErrorNoResults = errors.New("no results returned from query")
21+
// ErrorRowValidation is returned when SQL rows validation fails (e.g., connection issues, corrupt results)
22+
ErrorRowValidation = errors.New("SQL rows validation failed")
23+
// ErrorConnectionClosed is returned when the database connection is unexpectedly closed
24+
ErrorConnectionClosed = errors.New("database connection closed")
25+
// ErrorPGXLifecycle is returned for PGX v5 specific connection lifecycle issues
26+
ErrorPGXLifecycle = errors.New("PGX connection lifecycle error")
2027
)
2128

2229
func ErrorSource(err error) backend.ErrorSource {
@@ -25,3 +32,83 @@ func ErrorSource(err error) backend.ErrorSource {
2532
}
2633
return backend.ErrorSourcePlugin
2734
}
35+
36+
// IsPGXConnectionError checks if an error is related to PGX v5 connection issues
37+
func IsPGXConnectionError(err error) bool {
38+
if err == nil {
39+
return false
40+
}
41+
42+
errStr := strings.ToLower(err.Error())
43+
pgxConnectionErrors := []string{
44+
"connection closed",
45+
"connection reset",
46+
"connection refused",
47+
"broken pipe",
48+
"eof",
49+
"context canceled",
50+
"context deadline exceeded",
51+
"pgconn",
52+
"conn is closed",
53+
"bad connection",
54+
}
55+
56+
for _, pgxErr := range pgxConnectionErrors {
57+
if strings.Contains(errStr, pgxErr) {
58+
return true
59+
}
60+
}
61+
62+
return false
63+
}
64+
65+
// IsGenericDownstreamError checks if an error is a generic downstream error
66+
func IsGenericDownstreamError(err error) bool {
67+
if err == nil {
68+
return false
69+
}
70+
71+
errStr := strings.ToLower(err.Error())
72+
genericDownstreamErrors := []string{
73+
"invalid memory address",
74+
"nil pointer dereference",
75+
}
76+
77+
for _, genericErr := range genericDownstreamErrors {
78+
if strings.Contains(errStr, genericErr) {
79+
return true
80+
}
81+
}
82+
83+
return false
84+
}
85+
86+
// ClassifyError determines the appropriate error source and type for SQL errors
87+
func ClassifyError(err error) (backend.ErrorSource, error) {
88+
if err == nil {
89+
return backend.ErrorSourcePlugin, nil
90+
}
91+
92+
// Check for generic downstream errors first
93+
if IsGenericDownstreamError(err) {
94+
return backend.ErrorSourceDownstream, err
95+
}
96+
97+
// Check for PGX v5 specific connection errors
98+
if IsPGXConnectionError(err) {
99+
// These are typically downstream connection issues
100+
return backend.ErrorSourceDownstream, ErrorPGXLifecycle
101+
}
102+
103+
// Check for row validation errors
104+
if errors.Is(err, ErrorRowValidation) {
105+
return backend.ErrorSourceDownstream, err
106+
}
107+
108+
// Default to existing logic
109+
if backend.IsDownstreamError(err) {
110+
return backend.ErrorSourceDownstream, err
111+
}
112+
113+
return backend.ErrorSourcePlugin, err
114+
}

query.go

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,29 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
7878
start := time.Now()
7979
rows, err := q.DB.QueryContext(ctx, query.RawSQL, args...)
8080
if err != nil {
81+
// Determine error source based on retry configuration and error type
82+
errSource := backend.ErrorSourcePlugin
8183
errType := ErrorQuery
84+
8285
if errors.Is(err, context.Canceled) {
8386
errType = context.Canceled
87+
errSource = backend.ErrorSourcePlugin
88+
} else if IsPGXConnectionError(err) {
89+
errType = ErrorPGXLifecycle
90+
errSource = backend.ErrorSourceDownstream
91+
} else {
92+
// Use enhanced error classification for PGX v5
93+
errSource, _ = ClassifyError(err)
94+
}
95+
96+
var errWithSource error
97+
if errSource == backend.ErrorSourceDownstream {
98+
errWithSource = backend.DownstreamError(fmt.Errorf("%w: %s", errType, err.Error()))
99+
} else {
100+
errWithSource = backend.PluginError(fmt.Errorf("%w: %s", errType, err.Error()))
84101
}
85-
errWithSource := backend.DownstreamError(fmt.Errorf("%w: %s", errType, err.Error()))
86-
q.metrics.CollectDuration(SourceDownstream, StatusError, time.Since(start).Seconds())
102+
103+
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
87104
return sqlutil.ErrorFrameFromQuery(query), errWithSource
88105
}
89106
q.metrics.CollectDuration(SourceDownstream, StatusOK, time.Since(start).Seconds())
@@ -96,8 +113,16 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
96113
errWithSource := backend.DownstreamError(fmt.Errorf("%s: %w", "No results from query", err))
97114
return sqlutil.ErrorFrameFromQuery(query), errWithSource
98115
}
99-
errWithSource := backend.DownstreamError(fmt.Errorf("%s: %w", "Error response from database", err))
100-
q.metrics.CollectDuration(SourceDownstream, StatusError, time.Since(start).Seconds())
116+
117+
errSource, _ := ClassifyError(err)
118+
var errWithSource error
119+
if errSource == backend.ErrorSourceDownstream {
120+
errWithSource = backend.DownstreamError(fmt.Errorf("%s: %w", "Error response from database", err))
121+
} else {
122+
errWithSource = backend.PluginError(fmt.Errorf("%s: %w", "Error response from database", err))
123+
}
124+
125+
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
101126
return sqlutil.ErrorFrameFromQuery(query), errWithSource
102127
}
103128

@@ -111,11 +136,13 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
111136
// Convert the response to frames
112137
res, err := getFrames(rows, q.rowLimit, q.converters, q.fillMode, query)
113138
if err != nil {
114-
// We default to plugin error source
115-
errSource := backend.ErrorSourcePlugin
139+
errSource, _ := ClassifyError(err)
140+
141+
// Additional checks for processing errors
116142
if backend.IsDownstreamHTTPError(err) || isProcessingDownstreamError(err) {
117143
errSource = backend.ErrorSourceDownstream
118144
}
145+
119146
errWithSource := backend.NewErrorWithSource(fmt.Errorf("%w: %s", err, "Could not process SQL results"), errSource)
120147
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
121148
return sqlutil.ErrorFrameFromQuery(query), errWithSource
@@ -212,12 +239,12 @@ func accessColumns(rows *sql.Rows) (columnErr error) {
212239
// validateRows performs safety checks on SQL rows to prevent panics
213240
func validateRows(rows *sql.Rows) error {
214241
if rows == nil {
215-
return fmt.Errorf("rows is nil")
242+
return fmt.Errorf("%w: rows is nil", ErrorRowValidation)
216243
}
217244

218245
err := accessColumns(rows)
219246
if err != nil {
220-
return fmt.Errorf("failed to validate rows: %w", err)
247+
return fmt.Errorf("%w: %w", ErrorRowValidation, err)
221248
}
222249
return nil
223250
}
@@ -285,11 +312,25 @@ func isProcessingDownstreamError(err error) bool {
285312
data.ErrorInputFieldsWithoutRows,
286313
data.ErrorSeriesUnsorted,
287314
data.ErrorNullTimeValues,
315+
ErrorRowValidation,
316+
ErrorConnectionClosed,
317+
ErrorPGXLifecycle,
288318
}
289319
for _, e := range downstreamErrors {
290320
if errors.Is(err, e) {
291321
return true
292322
}
293323
}
324+
325+
// Check for generic downstream errors
326+
if IsGenericDownstreamError(err) {
327+
return true
328+
}
329+
330+
// Check for PGX connection errors
331+
if IsPGXConnectionError(err) {
332+
return true
333+
}
334+
294335
return false
295336
}

query_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,3 +231,122 @@ func TestIsProcessingDownstreamError(t *testing.T) {
231231
})
232232
}
233233
}
234+
235+
func TestPGXErrorClassification(t *testing.T) {
236+
tests := []struct {
237+
name string
238+
errorMsg string
239+
expectedSource backend.ErrorSource
240+
expectedIsPGX bool
241+
}{
242+
{
243+
name: "nil pointer dereference",
244+
errorMsg: "runtime error: invalid memory address or nil pointer dereference",
245+
expectedSource: backend.ErrorSourceDownstream,
246+
expectedIsPGX: false, // Now handled as generic downstream error
247+
},
248+
{
249+
name: "connection closed",
250+
errorMsg: "connection closed",
251+
expectedSource: backend.ErrorSourceDownstream,
252+
expectedIsPGX: true,
253+
},
254+
{
255+
name: "broken pipe",
256+
errorMsg: "broken pipe",
257+
expectedSource: backend.ErrorSourceDownstream,
258+
expectedIsPGX: true,
259+
},
260+
{
261+
name: "pgconn error",
262+
errorMsg: "pgconn: connection failed",
263+
expectedSource: backend.ErrorSourceDownstream,
264+
expectedIsPGX: true,
265+
},
266+
{
267+
name: "regular SQL error",
268+
errorMsg: "syntax error at position 1",
269+
expectedSource: backend.ErrorSourcePlugin,
270+
expectedIsPGX: false,
271+
},
272+
}
273+
274+
for _, tt := range tests {
275+
t.Run(tt.name, func(t *testing.T) {
276+
err := errors.New(tt.errorMsg)
277+
278+
// Test IsPGXConnectionError
279+
isPGX := IsPGXConnectionError(err)
280+
if isPGX != tt.expectedIsPGX {
281+
t.Errorf("IsPGXConnectionError() = %v, expected %v", isPGX, tt.expectedIsPGX)
282+
}
283+
284+
// Test ClassifyError
285+
source, _ := ClassifyError(err)
286+
if source != tt.expectedSource {
287+
t.Errorf("ClassifyError() source = %v, expected %v", source, tt.expectedSource)
288+
}
289+
290+
// Test isProcessingDownstreamError
291+
if tt.expectedSource == backend.ErrorSourceDownstream {
292+
isDownstream := isProcessingDownstreamError(err)
293+
if !isDownstream {
294+
t.Errorf("isProcessingDownstreamError() = %v, expected true for downstream error", isDownstream)
295+
}
296+
}
297+
})
298+
}
299+
}
300+
301+
func TestIsGenericDownstreamError(t *testing.T) {
302+
tests := []struct {
303+
name string
304+
errorMsg string
305+
expected bool
306+
}{
307+
{
308+
name: "nil pointer dereference",
309+
errorMsg: "runtime error: invalid memory address or nil pointer dereference",
310+
expected: true,
311+
},
312+
{
313+
name: "invalid memory address",
314+
errorMsg: "runtime error: invalid memory address",
315+
expected: true,
316+
},
317+
{
318+
name: "nil pointer dereference uppercase",
319+
errorMsg: "NIL POINTER DEREFERENCE",
320+
expected: true,
321+
},
322+
{
323+
name: "regular SQL error",
324+
errorMsg: "syntax error at position 1",
325+
expected: false,
326+
},
327+
{
328+
name: "connection error",
329+
errorMsg: "connection closed",
330+
expected: false,
331+
},
332+
{
333+
name: "nil error",
334+
errorMsg: "",
335+
expected: false,
336+
},
337+
}
338+
339+
for _, tt := range tests {
340+
t.Run(tt.name, func(t *testing.T) {
341+
var err error
342+
if tt.errorMsg != "" {
343+
err = errors.New(tt.errorMsg)
344+
}
345+
346+
result := IsGenericDownstreamError(err)
347+
if result != tt.expected {
348+
t.Errorf("IsGenericDownstreamError(%v) = %v, expected %v", tt.errorMsg, result, tt.expected)
349+
}
350+
})
351+
}
352+
}

0 commit comments

Comments
 (0)