Skip to content

Commit 7f599d2

Browse files
scottleppmarefr
andauthored
slo - capture query and health duration - label with error source (#122)
* capture query and health duration --------- Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
1 parent 6631da7 commit 7f599d2

File tree

6 files changed

+345
-246
lines changed

6 files changed

+345
-246
lines changed

connector.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package sqlds
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"net/http"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/grafana/grafana-plugin-sdk-go/backend"
13+
)
14+
15+
type Connector struct {
16+
UID string
17+
connections sync.Map
18+
driver Driver
19+
driverSettings DriverSettings
20+
// Enabling multiple connections may cause that concurrent connection limits
21+
// are hit. The datasource enabling this should make sure connections are cached
22+
// if necessary.
23+
enableMultipleConnections bool
24+
}
25+
26+
func NewConnector(ctx context.Context, driver Driver, settings backend.DataSourceInstanceSettings, enableMultipleConnections bool) (*Connector, error) {
27+
ds := driver.Settings(ctx, settings)
28+
db, err := driver.Connect(ctx, settings, nil)
29+
if err != nil {
30+
return nil, DownstreamError(err)
31+
}
32+
33+
conn := &Connector{
34+
UID: settings.UID,
35+
driver: driver,
36+
driverSettings: ds,
37+
enableMultipleConnections: enableMultipleConnections,
38+
}
39+
key := defaultKey(settings.UID)
40+
conn.storeDBConnection(key, dbConnection{db, settings})
41+
return conn, nil
42+
}
43+
44+
func (c *Connector) Connect(ctx context.Context, headers http.Header) (*dbConnection, error) {
45+
key := defaultKey(c.UID)
46+
dbConn, ok := c.getDBConnection(key)
47+
if !ok {
48+
return nil, ErrorMissingDBConnection
49+
}
50+
51+
if c.driverSettings.Retries == 0 {
52+
err := c.connect(dbConn)
53+
return nil, err
54+
}
55+
56+
err := c.connectWithRetries(ctx, dbConn, key, headers)
57+
return &dbConn, err
58+
}
59+
60+
func (c *Connector) connectWithRetries(ctx context.Context, conn dbConnection, key string, headers http.Header) error {
61+
q := &Query{}
62+
if c.driverSettings.ForwardHeaders {
63+
applyHeaders(q, headers)
64+
}
65+
66+
for i := 0; i < c.driverSettings.Retries; i++ {
67+
db, err := c.Reconnect(ctx, conn, q, key)
68+
if err != nil {
69+
return err
70+
}
71+
conn := dbConnection{
72+
db: db,
73+
settings: conn.settings,
74+
}
75+
err = c.connect(conn)
76+
if err == nil {
77+
return err
78+
}
79+
80+
if !shouldRetry(c.driverSettings.RetryOn, err.Error()) {
81+
break
82+
}
83+
84+
if c.driverSettings.Pause > 0 {
85+
time.Sleep(time.Duration(c.driverSettings.Pause * int(time.Second)))
86+
}
87+
backend.Logger.Warn(fmt.Sprintf("connect failed: %s. Retrying %d times", err.Error(), i))
88+
}
89+
90+
return nil
91+
}
92+
93+
func (c *Connector) connect(conn dbConnection) error {
94+
if err := c.ping(conn); err != nil {
95+
return DownstreamError(err)
96+
}
97+
98+
return nil
99+
}
100+
101+
func (c *Connector) ping(conn dbConnection) error {
102+
if c.driverSettings.Timeout == 0 {
103+
return conn.db.Ping()
104+
}
105+
106+
ctx, cancel := context.WithTimeout(context.Background(), c.driverSettings.Timeout)
107+
defer cancel()
108+
109+
return conn.db.PingContext(ctx)
110+
}
111+
112+
func (c *Connector) Reconnect(ctx context.Context, dbConn dbConnection, q *Query, cacheKey string) (*sql.DB, error) {
113+
if err := dbConn.db.Close(); err != nil {
114+
backend.Logger.Warn(fmt.Sprintf("closing existing connection failed: %s", err.Error()))
115+
}
116+
117+
db, err := c.driver.Connect(ctx, dbConn.settings, q.ConnectionArgs)
118+
if err != nil {
119+
return nil, DownstreamError(err)
120+
}
121+
c.storeDBConnection(cacheKey, dbConnection{db, dbConn.settings})
122+
return db, nil
123+
}
124+
125+
func (ds *Connector) getDBConnection(key string) (dbConnection, bool) {
126+
conn, ok := ds.connections.Load(key)
127+
if !ok {
128+
return dbConnection{}, false
129+
}
130+
return conn.(dbConnection), true
131+
}
132+
133+
func (ds *Connector) storeDBConnection(key string, dbConn dbConnection) {
134+
ds.connections.Store(key, dbConn)
135+
}
136+
137+
func (c *Connector) GetConnectionFromQuery(ctx context.Context, q *Query) (string, dbConnection, error) {
138+
if !c.enableMultipleConnections && !c.driverSettings.ForwardHeaders && len(q.ConnectionArgs) > 0 {
139+
return "", dbConnection{}, MissingMultipleConnectionsConfig
140+
}
141+
// The database connection may vary depending on query arguments
142+
// The raw arguments are used as key to store the db connection in memory so they can be reused
143+
key := defaultKey(c.UID)
144+
dbConn, ok := c.getDBConnection(key)
145+
if !ok {
146+
return "", dbConnection{}, MissingDBConnection
147+
}
148+
if !c.enableMultipleConnections || len(q.ConnectionArgs) == 0 {
149+
return key, dbConn, nil
150+
}
151+
152+
key = keyWithConnectionArgs(c.UID, q.ConnectionArgs)
153+
if cachedConn, ok := c.getDBConnection(key); ok {
154+
return key, cachedConn, nil
155+
}
156+
157+
db, err := c.driver.Connect(ctx, dbConn.settings, q.ConnectionArgs)
158+
if err != nil {
159+
return "", dbConnection{}, DownstreamError(err)
160+
}
161+
// Assign this connection in the cache
162+
dbConn = dbConnection{db, dbConn.settings}
163+
c.storeDBConnection(key, dbConn)
164+
165+
return key, dbConn, nil
166+
}
167+
168+
func shouldRetry(retryOn []string, err string) bool {
169+
for _, r := range retryOn {
170+
if strings.Contains(err, r) {
171+
return true
172+
}
173+
}
174+
return false
175+
}

0 commit comments

Comments
 (0)