Skip to content

Commit 35ae785

Browse files
authored
Enable dataproxy.row_limit configuration option from Grafana (#162)
1 parent c79cba9 commit 35ae785

File tree

4 files changed

+31
-9
lines changed

4 files changed

+31
-9
lines changed

datasource.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
)
2020

2121
const defaultKeySuffix = "default"
22+
const defaultRowLimit = int64(-1)
2223

2324
var (
2425
ErrorMissingMultipleConnectionsConfig = backend.PluginError(errors.New("received connection arguments but the feature is not enabled"))
@@ -50,6 +51,10 @@ type SQLDatasource struct {
5051
CustomRoutes map[string]func(http.ResponseWriter, *http.Request)
5152
metrics Metrics
5253
EnableMultipleConnections bool
54+
// EnableRowLimit: enables using the dataproxy.row_limit setting to limit the number of rows returned by the query
55+
// https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#row_limit
56+
EnableRowLimit bool
57+
rowLimit int64
5358
// PreCheckHealth (optional). Performs custom health check before the Connect method
5459
PreCheckHealth func(ctx context.Context, req *backend.CheckHealthRequest) *backend.CheckHealthResult
5560
// PostCheckHealth (optional).Performs custom health check after the Connect method
@@ -73,6 +78,17 @@ func (ds *SQLDatasource) NewDatasource(ctx context.Context, settings backend.Dat
7378
ds.CallResourceHandler = httpadapter.New(mux)
7479
ds.metrics = NewMetrics(settings.Name, settings.Type, EndpointQuery)
7580

81+
config := backend.GrafanaConfigFromContext(ctx)
82+
ds.rowLimit = defaultRowLimit
83+
if ds.EnableRowLimit && config != nil {
84+
sqlConfig, err := config.SQL()
85+
if err != nil {
86+
backend.Logger.Error(fmt.Sprintf("failed setting row limit from sql config: %s", err))
87+
} else {
88+
ds.rowLimit = sqlConfig.RowLimit
89+
}
90+
}
91+
7692
return ds, nil
7793
}
7894

@@ -191,7 +207,7 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
191207
// * Some datasources (snowflake) expire connections or have an authentication token that expires if not used in 1 or 4 hours.
192208
// Because the datasource driver does not include an option for permanent connections, we retry the connection
193209
// if the query fails. NOTE: this does not include some errors like "ErrNoRows"
194-
dbQuery := NewQuery(dbConn.db, dbConn.settings, ds.driver().Converters(), fillMode)
210+
dbQuery := NewQuery(dbConn.db, dbConn.settings, ds.driver().Converters(), fillMode, ds.rowLimit)
195211
res, err := dbQuery.Run(ctx, q, args...)
196212
if err == nil {
197213
return res, nil
@@ -217,7 +233,7 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
217233
time.Sleep(time.Duration(ds.DriverSettings().Pause * int(time.Second)))
218234
}
219235

220-
dbQuery := NewQuery(db, dbConn.settings, ds.driver().Converters(), fillMode)
236+
dbQuery := NewQuery(db, dbConn.settings, ds.driver().Converters(), fillMode, ds.rowLimit)
221237
res, err = dbQuery.Run(ctx, q, args...)
222238
if err == nil {
223239
return res, err
@@ -239,7 +255,7 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
239255
continue
240256
}
241257

242-
dbQuery := NewQuery(db, dbConn.settings, ds.driver().Converters(), fillMode)
258+
dbQuery := NewQuery(db, dbConn.settings, ds.driver().Converters(), fillMode, ds.rowLimit)
243259
res, err = dbQuery.Run(ctx, q, args...)
244260
if err == nil {
245261
return res, err
@@ -289,3 +305,7 @@ func (ds *SQLDatasource) errors(response *Response) error {
289305
}
290306
return err
291307
}
308+
309+
func (ds *SQLDatasource) GetRowLimit() int64 {
310+
return ds.rowLimit
311+
}

query.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,17 @@ type DBQuery struct {
5757
metrics Metrics
5858
DSName string
5959
converters []sqlutil.Converter
60+
rowLimit int64
6061
}
6162

62-
func NewQuery(db Connection, settings backend.DataSourceInstanceSettings, converters []sqlutil.Converter, fillMode *data.FillMissing) *DBQuery {
63+
func NewQuery(db Connection, settings backend.DataSourceInstanceSettings, converters []sqlutil.Converter, fillMode *data.FillMissing, rowLimit int64) *DBQuery {
6364
return &DBQuery{
6465
DB: db,
6566
DSName: settings.Name,
6667
converters: converters,
6768
fillMode: fillMode,
6869
metrics: NewMetrics(settings.Name, settings.Type, EndpointQuery),
70+
rowLimit: rowLimit,
6971
}
7072
}
7173

@@ -105,14 +107,14 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
105107

106108
start = time.Now()
107109
// Convert the response to frames
108-
res, err := getFrames(rows, -1, q.converters, q.fillMode, query)
110+
res, err := getFrames(rows, q.rowLimit, q.converters, q.fillMode, query)
109111
if err != nil {
110112
// We default to plugin error source
111113
errSource := backend.ErrorSourcePlugin
112114
if backend.IsDownstreamHTTPError(err) || isProcessingDownstreamError(err) {
113115
errSource = backend.ErrorSourceDownstream
114116
}
115-
errWithSource := backend.NewErrorWithSource(fmt.Errorf("%w: %s", err, "Could not process SQL results"), errSource)
117+
errWithSource := backend.NewErrorWithSource(fmt.Errorf("%w: %s", err, "Could not process SQL results"), errSource)
116118
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
117119
return sqlutil.ErrorFrameFromQuery(query), errWithSource
118120
}

query_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestQuery_MySQL(t *testing.T) {
7979
Name: "foo",
8080
}
8181

82-
sqlQuery := NewQuery(db, settings, []sqlutil.Converter{}, nil)
82+
sqlQuery := NewQuery(db, settings, []sqlutil.Converter{}, nil, defaultRowLimit)
8383
_, err := sqlQuery.Run(ctx, q)
8484
if err == nil {
8585
t.Fatal("expected an error but received none")

query_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func TestQuery_Timeout(t *testing.T) {
8484
Name: "foo",
8585
}
8686

87-
sqlQuery := NewQuery(conn, settings, []sqlutil.Converter{}, nil)
87+
sqlQuery := NewQuery(conn, settings, []sqlutil.Converter{}, nil, defaultRowLimit)
8888
_, err := sqlQuery.Run(ctx, &Query{})
8989

9090
if !errors.Is(err, context.Canceled) {
@@ -111,7 +111,7 @@ func TestQuery_Timeout(t *testing.T) {
111111
Name: "foo",
112112
}
113113

114-
sqlQuery := NewQuery(conn, settings, []sqlutil.Converter{}, nil)
114+
sqlQuery := NewQuery(conn, settings, []sqlutil.Converter{}, nil, defaultRowLimit)
115115
_, err := sqlQuery.Run(ctx, &Query{})
116116

117117
if !errors.Is(err, ErrorQuery) {

0 commit comments

Comments
 (0)