Skip to content

Commit 7dafc4b

Browse files
authored
Fix multiple connections setup (#49)
* Fix multiple connections setup * improve tests
1 parent 863eb97 commit 7dafc4b

File tree

3 files changed

+119
-54
lines changed

3 files changed

+119
-54
lines changed

datasource.go

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sqlds
33
import (
44
"context"
55
"database/sql"
6+
"encoding/json"
67
"errors"
78
"fmt"
89
"net/http"
@@ -14,15 +15,32 @@ import (
1415
"github.com/grafana/grafana-plugin-sdk-go/data"
1516
)
1617

17-
const defaultKey = "_default"
18+
const defaultKeySuffix = "default"
19+
20+
var (
21+
MissingMultipleConnectionsConfig = errors.New("received connection arguments but the feature is not enabled")
22+
MissingDBConnection = errors.New("unable to get default db connection")
23+
)
24+
25+
func defaultKey(datasourceID int64) string {
26+
return fmt.Sprintf("%d-%s", datasourceID, defaultKeySuffix)
27+
}
28+
29+
func keyWithConnectionArgs(datasourceID int64, connArgs json.RawMessage) string {
30+
return fmt.Sprintf("%d-%s", datasourceID, string(connArgs))
31+
}
32+
33+
type dbConnection struct {
34+
db *sql.DB
35+
settings backend.DataSourceInstanceSettings
36+
}
1837

1938
type sqldatasource struct {
2039
Completable
2140

2241
dbConnections sync.Map
2342
c Driver
2443
driverSettings DriverSettings
25-
settings backend.DataSourceInstanceSettings
2644

2745
backend.CallResourceHandler
2846
CustomRoutes map[string]func(http.ResponseWriter, *http.Request)
@@ -32,15 +50,28 @@ type sqldatasource struct {
3250
EnableMultipleConnections bool
3351
}
3452

53+
func (ds *sqldatasource) getDBConnection(key string) (dbConnection, bool) {
54+
conn, ok := ds.dbConnections.Load(key)
55+
if !ok {
56+
return dbConnection{}, false
57+
}
58+
return conn.(dbConnection), true
59+
}
60+
61+
func (ds *sqldatasource) storeDBConnection(key string, dbConn dbConnection) {
62+
ds.dbConnections.Store(key, dbConn)
63+
}
64+
3565
// NewDatasource creates a new `sqldatasource`.
3666
// It uses the provided settings argument to call the ds.Driver to connect to the SQL server
3767
func (ds *sqldatasource) NewDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
3868
db, err := ds.c.Connect(settings, nil)
3969
if err != nil {
4070
return nil, err
4171
}
42-
ds.dbConnections.Store(defaultKey, db)
43-
ds.settings = settings
72+
key := defaultKey(settings.ID)
73+
ds.storeDBConnection(key, dbConnection{db, settings})
74+
4475
mux := http.NewServeMux()
4576
err = ds.registerRoutes(mux)
4677
if err != nil {
@@ -62,8 +93,8 @@ func NewDatasource(c Driver) *sqldatasource {
6293

6394
// Dispose cleans up datasource instance resources.
6495
func (ds *sqldatasource) Dispose() {
65-
ds.dbConnections.Range(func(key, db interface{}) bool {
66-
err := db.(*sql.DB).Close()
96+
ds.dbConnections.Range(func(key, dbConn interface{}) bool {
97+
err := dbConn.(dbConnection).db.Close()
6798
if err != nil {
6899
backend.Logger.Error(err.Error())
69100
}
@@ -100,32 +131,36 @@ func (ds *sqldatasource) QueryData(ctx context.Context, req *backend.QueryDataRe
100131

101132
}
102133

103-
func (ds *sqldatasource) getDB(q *Query) (*sql.DB, string, error) {
134+
func (ds *sqldatasource) getDBConnectionFromQuery(q *Query) (string, dbConnection, error) {
135+
if !ds.EnableMultipleConnections && len(q.ConnectionArgs) > 0 {
136+
return "", dbConnection{}, MissingMultipleConnectionsConfig
137+
}
104138
// The database connection may vary depending on query arguments
105139
// The raw arguments are used as key to store the db connection in memory so they can be reused
106-
key := defaultKey
107-
db, ok := ds.dbConnections.Load(key)
140+
key := defaultKey(q.DatasourceID)
141+
dbConn, ok := ds.getDBConnection(key)
108142
if !ok {
109-
return nil, "", fmt.Errorf("unable to get default db connection")
143+
return "", dbConnection{}, MissingDBConnection
110144
}
111145
if !ds.EnableMultipleConnections || len(q.ConnectionArgs) == 0 {
112-
return db.(*sql.DB), key, nil
146+
return key, dbConn, nil
113147
}
114148

115-
key = string(q.ConnectionArgs)
116-
if cachedDB, ok := ds.dbConnections.Load(key); ok {
117-
return cachedDB.(*sql.DB), key, nil
149+
key = keyWithConnectionArgs(q.DatasourceID, q.ConnectionArgs)
150+
if cachedConn, ok := ds.getDBConnection(key); ok {
151+
return key, cachedConn, nil
118152
}
119153

120154
var err error
121-
db, err = ds.c.Connect(ds.settings, q.ConnectionArgs)
155+
db, err := ds.c.Connect(dbConn.settings, q.ConnectionArgs)
122156
if err != nil {
123-
return nil, "", err
157+
return "", dbConnection{}, err
124158
}
125159
// Assign this connection in the cache
126-
ds.dbConnections.Store(key, db)
160+
dbConn = dbConnection{db, dbConn.settings}
161+
ds.storeDBConnection(key, dbConn)
127162

128-
return db.(*sql.DB), key, nil
163+
return key, dbConn, nil
129164
}
130165

131166
// handleQuery will call query, and attempt to reconnect if the query failed
@@ -149,7 +184,7 @@ func (ds *sqldatasource) handleQuery(ctx context.Context, req backend.DataQuery)
149184
}
150185

151186
// Retrieve the database connection
152-
db, cacheKey, err := ds.getDB(q)
187+
cacheKey, dbConn, err := ds.getDBConnectionFromQuery(q)
153188
if err != nil {
154189
return getErrorFrameFromQuery(q), err
155190
}
@@ -165,7 +200,7 @@ func (ds *sqldatasource) handleQuery(ctx context.Context, req backend.DataQuery)
165200
// * Some datasources (snowflake) expire connections or have an authentication token that expires if not used in 1 or 4 hours.
166201
// Because the datasource driver does not include an option for permanent connections, we retry the connection
167202
// if the query fails. NOTE: this does not include some errors like "ErrNoRows"
168-
res, err := query(ctx, db, ds.c.Converters(), fillMode, q)
203+
res, err := query(ctx, dbConn.db, ds.c.Converters(), fillMode, q)
169204
if err == nil {
170205
return res, nil
171206
}
@@ -175,11 +210,11 @@ func (ds *sqldatasource) handleQuery(ctx context.Context, req backend.DataQuery)
175210
}
176211

177212
if errors.Is(err, ErrorQuery) {
178-
db, err = ds.c.Connect(ds.settings, q.ConnectionArgs)
213+
db, err := ds.c.Connect(dbConn.settings, q.ConnectionArgs)
179214
if err != nil {
180215
return nil, err
181216
}
182-
ds.dbConnections.Store(cacheKey, db)
217+
ds.storeDBConnection(cacheKey, dbConnection{db, dbConn.settings})
183218

184219
return query(ctx, db, ds.c.Converters(), fillMode, q)
185220
}
@@ -189,11 +224,12 @@ func (ds *sqldatasource) handleQuery(ctx context.Context, req backend.DataQuery)
189224

190225
// CheckHealth pings the connected SQL database
191226
func (ds *sqldatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
192-
db, ok := ds.dbConnections.Load(defaultKey)
227+
key := defaultKey(req.PluginContext.DataSourceInstanceSettings.ID)
228+
dbConn, ok := ds.getDBConnection(key)
193229
if !ok {
194-
return nil, fmt.Errorf("unable to get default db connection")
230+
return nil, MissingDBConnection
195231
}
196-
if err := db.(*sql.DB).Ping(); err != nil {
232+
if err := dbConn.db.Ping(); err != nil {
197233
return &backend.CheckHealthResult{
198234
Status: backend.HealthStatusError,
199235
Message: err.Error(),

datasource_test.go

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sqlds
33
import (
44
"database/sql"
55
"encoding/json"
6+
"errors"
67
"testing"
78

89
"github.com/DATA-DOG/go-sqlmock"
@@ -19,57 +20,82 @@ func (d *fakeDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage
1920
return d.db, nil
2021
}
2122

22-
func Test_getDB(t *testing.T) {
23+
func Test_getDBConnectionFromQuery(t *testing.T) {
2324
db := &sql.DB{}
2425
db2 := &sql.DB{}
25-
d := &fakeDriver{db: db2}
26+
db3 := &sql.DB{}
27+
d := &fakeDriver{db: db3}
2628
tests := []struct {
27-
desc string
28-
args string
29-
existingDB *sql.DB
30-
expectedDB *sql.DB
29+
desc string
30+
dsID int64
31+
args string
32+
existingDB *sql.DB
33+
expectedKey string
34+
expectedDB *sql.DB
3135
}{
3236
{
33-
"it should return the default db with no args",
34-
defaultKey,
35-
db,
36-
db,
37+
desc: "it should return the default db with no args",
38+
dsID: 1,
39+
args: "",
40+
expectedKey: "1-default",
41+
expectedDB: db,
3742
},
3843
{
39-
"it should return the cached connection for the given args",
40-
"foo",
41-
db,
42-
db,
44+
desc: "it should return the cached connection for the given args",
45+
dsID: 1,
46+
args: "foo",
47+
expectedKey: "1-foo",
48+
existingDB: db2,
49+
expectedDB: db2,
4350
},
4451
{
45-
"it should create a new connection with the given args",
46-
"foo",
47-
nil,
48-
db2,
52+
desc: "it should create a new connection with the given args",
53+
dsID: 1,
54+
args: "foo",
55+
expectedKey: "1-foo",
56+
expectedDB: db3,
4957
},
5058
}
5159
for _, tt := range tests {
5260
t.Run(tt.desc, func(t *testing.T) {
5361
ds := &sqldatasource{c: d, EnableMultipleConnections: true}
62+
settings := backend.DataSourceInstanceSettings{ID: tt.dsID}
63+
key := defaultKey(tt.dsID)
64+
// Add the mandatory default db
65+
ds.storeDBConnection(key, dbConnection{db, settings})
5466
if tt.existingDB != nil {
55-
ds.dbConnections.Store(tt.args, tt.existingDB)
67+
key = keyWithConnectionArgs(tt.dsID, []byte(tt.args))
68+
ds.storeDBConnection(key, dbConnection{tt.existingDB, settings})
5669
}
57-
if tt.args != defaultKey {
58-
// Add the mandatory default db
59-
ds.dbConnections.Store(defaultKey, db)
60-
}
61-
res, key, err := ds.getDB(&Query{ConnectionArgs: json.RawMessage(tt.args)})
70+
71+
key, dbConn, err := ds.getDBConnectionFromQuery(&Query{DatasourceID: tt.dsID, ConnectionArgs: json.RawMessage(tt.args)})
6272
if err != nil {
6373
t.Fatalf("unexpected error %v", err)
6474
}
65-
if key != tt.args {
75+
if key != tt.expectedKey {
6676
t.Fatalf("unexpected cache key %s", key)
6777
}
68-
if res != tt.expectedDB {
69-
t.Fatalf("unexpected result %v", res)
78+
if dbConn.db != tt.expectedDB {
79+
t.Fatalf("unexpected result %v", dbConn.db)
7080
}
7181
})
7282
}
83+
84+
t.Run("it should return an error if connection args are used without enabling multiple connections", func(t *testing.T) {
85+
ds := &sqldatasource{c: d, EnableMultipleConnections: false}
86+
_, _, err := ds.getDBConnectionFromQuery(&Query{ConnectionArgs: json.RawMessage("foo")})
87+
if err == nil || !errors.Is(err, MissingMultipleConnectionsConfig) {
88+
t.Errorf("expecting error: %v", MissingMultipleConnectionsConfig)
89+
}
90+
})
91+
92+
t.Run("it should return an error if the default connection is missing", func(t *testing.T) {
93+
ds := &sqldatasource{c: d}
94+
_, _, err := ds.getDBConnectionFromQuery(&Query{})
95+
if err == nil || !errors.Is(err, MissingDBConnection) {
96+
t.Errorf("expecting error: %v", MissingDBConnection)
97+
}
98+
})
7399
}
74100

75101
func Test_Dispose(t *testing.T) {
@@ -84,8 +110,8 @@ func Test_Dispose(t *testing.T) {
84110
}
85111

86112
ds := &sqldatasource{}
87-
ds.dbConnections.Store(defaultKey, db1)
88-
ds.dbConnections.Store("foo", db2)
113+
ds.dbConnections.Store(defaultKey(1), dbConnection{db: db1})
114+
ds.dbConnections.Store("foo", dbConnection{db: db2})
89115

90116
mock1.ExpectClose()
91117
mock2.ExpectClose()

query.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Query struct {
3737
Interval time.Duration `json:"-"`
3838
TimeRange backend.TimeRange `json:"-"`
3939
MaxDataPoints int64 `json:"-"`
40+
DatasourceID int64 `json:"datasourceId"`
4041
FillMissing *data.FillMissing `json:"fillMode,omitempty"`
4142

4243
// Macros
@@ -54,6 +55,7 @@ func (q *Query) WithSQL(query string) *Query {
5455
RefID: q.RefID,
5556
Interval: q.Interval,
5657
TimeRange: q.TimeRange,
58+
DatasourceID: q.DatasourceID,
5759
MaxDataPoints: q.MaxDataPoints,
5860
FillMissing: q.FillMissing,
5961
Schema: q.Schema,
@@ -78,6 +80,7 @@ func GetQuery(query backend.DataQuery) (*Query, error) {
7880
RefID: query.RefID,
7981
Interval: query.Interval,
8082
TimeRange: query.TimeRange,
83+
DatasourceID: model.DatasourceID,
8184
MaxDataPoints: query.MaxDataPoints,
8285
FillMissing: model.FillMissing,
8386
Schema: model.Schema,

0 commit comments

Comments
 (0)