Skip to content

Commit be01dba

Browse files
authored
query cache (#12)
* query cache * add info to the frame that the data is from cache
1 parent b3bb5db commit be01dba

File tree

4 files changed

+240
-52
lines changed

4 files changed

+240
-52
lines changed

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
* Doesn't require CGO.
66
* Requires duckdb cli to be in the path.
77

8-
## Dataframes
9-
* Allows querying dataframes using SQL, with all the aggregate/window/analytics functions from DuckDB.
10-
118
## In Memory Database
129
```
1310
db := NewInMemoryDB()
@@ -30,6 +27,10 @@
3027
_, err := db.RunCommands(commands)
3128
res, err := db.Query("SELECT * from t1;")
3229
```
30+
31+
## Dataframes
32+
* Allows querying dataframes using SQL, with all the aggregate/window/analytics functions from DuckDB.
33+
3334
## Query Dataframes
3435
```
3536
db := NewInMemoryDB()

duck/duckdb.go

Lines changed: 52 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,35 @@ import (
1717

1818
var logger = log.DefaultLogger
1919

20+
type Dirs map[string]string
21+
2022
type DuckDB struct {
21-
Name string
22-
mode string
23-
format string
24-
exe string
25-
chunk int
23+
Name string
24+
mode string
25+
format string
26+
exe string
27+
chunk int
28+
cacheDuration int
29+
cache cache
2630
}
2731

2832
type Opts struct {
29-
Mode string
30-
Format string
31-
Chunk int
32-
Exe string
33+
Mode string
34+
Format string
35+
Chunk int
36+
Exe string
37+
CacheDuration int
3338
}
3439

3540
const newline = "\n"
3641

3742
// NewInMemoryDB creates a new in-memory DuckDB
38-
func NewInMemoryDB(opts ...Opts) DuckDB {
43+
func NewInMemoryDB(opts ...Opts) *DuckDB {
3944
return NewDuckDB("", opts...)
4045
}
4146

4247
// NewDuckDB creates a new DuckDB
43-
func NewDuckDB(name string, opts ...Opts) DuckDB {
48+
func NewDuckDB(name string, opts ...Opts) *DuckDB {
4449
db := DuckDB{
4550
Name: name,
4651
mode: "json",
@@ -59,6 +64,9 @@ func NewDuckDB(name string, opts ...Opts) DuckDB {
5964
if opt.Chunk > 0 {
6065
db.chunk = opt.Chunk
6166
}
67+
if opt.CacheDuration > 0 {
68+
db.cacheDuration = opt.CacheDuration
69+
}
6270
}
6371

6472
// Find the executable if it is not configured
@@ -68,7 +76,8 @@ func NewDuckDB(name string, opts ...Opts) DuckDB {
6876
db.exe = "/usr/local/bin/duckdb"
6977
}
7078
}
71-
return db
79+
db.cache = cache{}
80+
return &db
7281
}
7382

7483
// RunCommands runs a series of of sql commands against duckdb
@@ -108,51 +117,48 @@ func (d *DuckDB) Query(query string) (string, error) {
108117
}
109118

110119
// QueryFrame will load a dataframe into a view named RefID, and run the query against that view
111-
func (d *DuckDB) QueryFrames(name string, query string, frames []*sdk.Frame) (string, error) {
112-
dirs, err := data.ToParquet(frames, d.chunk)
113-
if err != nil {
114-
logger.Error("error converting to parquet", "error", err)
115-
return "", err
120+
func (d *DuckDB) QueryFrames(name string, query string, frames []*sdk.Frame) (string, bool, error) {
121+
data := FrameData{
122+
cacheDuration: d.cacheDuration,
123+
cache: &d.cache,
124+
db: d,
116125
}
117126

118-
defer func() {
119-
for _, dir := range dirs {
120-
err := os.RemoveAll(dir)
121-
if err != nil {
122-
logger.Error("failed to remove parquet files", "error", err)
123-
}
124-
}
125-
}()
126-
127-
commands := []string{}
128-
created := map[string]bool{}
129-
logger.Debug("starting to create views from frames", "frames", len(frames))
130-
for _, frame := range frames {
131-
if created[frame.RefID] {
132-
continue
133-
}
134-
cmd := fmt.Sprintf("CREATE VIEW %s AS (SELECT * from '%s/*.parquet');", frame.RefID, dirs[frame.RefID])
135-
logger.Debug("creating view", "cmd", cmd)
136-
commands = append(commands, cmd)
137-
created[frame.RefID] = true
138-
}
127+
return data.Query(name, query, frames)
128+
}
139129

140-
commands = append(commands, query)
141-
res, err := d.RunCommands(commands)
142-
if err != nil {
143-
logger.Error("error running commands", "error", err)
144-
return "", err
130+
func wipe(dirs map[string]string) {
131+
for _, dir := range dirs {
132+
err := os.RemoveAll(dir)
133+
if err != nil {
134+
logger.Error("failed to remove parquet files", "error", err)
135+
}
145136
}
146-
return res, nil
147137
}
148138

149139
func (d *DuckDB) QueryFramesInto(name string, query string, frames []*sdk.Frame, f *sdk.Frame) error {
150-
res, err := d.QueryFrames(name, query, frames)
140+
res, cached, err := d.QueryFrames(name, query, frames)
151141
if err != nil {
152142
return err
153143
}
154144

155-
return resultsToFrame(name, res, f, frames)
145+
err = resultsToFrame(name, res, f, frames)
146+
if err != nil {
147+
return err
148+
}
149+
if cached {
150+
for _, frame := range frames {
151+
if frame.Meta == nil {
152+
frame.Meta = &sdk.FrameMeta{}
153+
}
154+
notice := sdk.Notice{
155+
Severity: sdk.NoticeSeverityInfo,
156+
Text: "Data retrieved from cache",
157+
}
158+
frame.Meta.Notices = append(frame.Meta.Notices, notice)
159+
}
160+
}
161+
return nil
156162
}
157163

158164
// Destroy will remove database files created by duckdb

duck/duckdb_test.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,42 @@ func TestQueryFrame(t *testing.T) {
5252
frame.RefID = "foo"
5353
frames := []*data.Frame{frame}
5454

55-
res, err := db.QueryFrames("foo", "select * from foo", frames)
55+
res, _, err := db.QueryFrames("foo", "select * from foo", frames)
5656
assert.Nil(t, err)
5757

5858
assert.Contains(t, res, `[{"value":"test"}]`)
5959
}
6060

61+
func TestQueryFrameCache(t *testing.T) {
62+
opts := Opts{
63+
CacheDuration: 5,
64+
}
65+
db := NewInMemoryDB(opts)
66+
67+
var values = []string{"test"}
68+
frame := data.NewFrame("foo", data.NewField("value", nil, values))
69+
frame.RefID = "foo"
70+
frames := []*data.Frame{frame}
71+
72+
res, cached, err := db.QueryFrames("foo", "select * from foo", frames)
73+
assert.Nil(t, err)
74+
assert.False(t, cached)
75+
assert.Contains(t, res, `[{"value":"test"}]`)
76+
77+
res, cached, err = db.QueryFrames("foo", "select * from foo", frames)
78+
assert.Nil(t, err)
79+
assert.True(t, cached)
80+
assert.Contains(t, res, `[{"value":"test"}]`)
81+
82+
// wait for cache to expire
83+
time.Sleep(6 * time.Second)
84+
85+
res, cached, err = db.QueryFrames("foo", "select * from foo", frames)
86+
assert.Nil(t, err)
87+
assert.False(t, cached)
88+
assert.Contains(t, res, `[{"value":"test"}]`)
89+
}
90+
6191
func TestQueryFrameWithDisplayName(t *testing.T) {
6292

6393
db := NewInMemoryDB()
@@ -71,7 +101,7 @@ func TestQueryFrameWithDisplayName(t *testing.T) {
71101
frame.RefID = "foo"
72102
frames := []*data.Frame{frame}
73103

74-
res, err := db.QueryFrames("foo", "select * from foo", frames)
104+
res, _, err := db.QueryFrames("foo", "select * from foo", frames)
75105
assert.Nil(t, err)
76106

77107
assert.Contains(t, res, `[{"some value":"test"}]`)
@@ -88,7 +118,7 @@ func TestQueryFrameChunks(t *testing.T) {
88118
frame.RefID = "foo"
89119
frames := []*data.Frame{frame}
90120

91-
res, err := db.QueryFrames("foo", "select * from foo", frames)
121+
res, _, err := db.QueryFrames("foo", "select * from foo", frames)
92122
assert.Nil(t, err)
93123

94124
assert.Contains(t, res, `test2`)

duck/frame-data.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package duck
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
sdk "github.com/grafana/grafana-plugin-sdk-go/data"
9+
"github.com/scottlepp/go-duck/duck/data"
10+
)
11+
12+
type FrameData struct {
13+
cacheDuration int
14+
cache *cache
15+
db *DuckDB
16+
}
17+
18+
func (f *FrameData) Query(name string, query string, frames []*sdk.Frame) (string, bool, error) {
19+
dirs, cached, err := f.data(name, query, frames)
20+
if err != nil {
21+
logger.Error("error converting to parquet", "error", err)
22+
return "", cached, err
23+
}
24+
25+
defer f.postProcess(name, query, dirs, cached)
26+
27+
// create a wait group to wait for the query to finish
28+
// if the cache duration is exceeded, wait before deleting the cache ( parquet files )
29+
var wg sync.WaitGroup
30+
wg.Add(1)
31+
f.cache.setWait(fmt.Sprintf("%s:%s", name, query), &wg)
32+
33+
var res string
34+
var qerr error
35+
36+
go func() {
37+
res, qerr = f.runQuery(query, dirs, frames)
38+
wg.Done()
39+
}()
40+
41+
wg.Wait()
42+
f.cache.deleteWait(fmt.Sprintf("%s:%s", name, query))
43+
44+
if qerr != nil {
45+
logger.Error("error running commands", "error", err)
46+
return "", cached, err
47+
}
48+
49+
key := fmt.Sprintf("%s:%s", name, query)
50+
if f.cacheDuration > 0 && !cached {
51+
f.cache.set(key, dirs)
52+
}
53+
54+
return res, cached, nil
55+
}
56+
57+
func (f *FrameData) runQuery(query string, dirs Dirs, frames []*sdk.Frame) (string, error) {
58+
commands := createViews(frames, dirs)
59+
commands = append(commands, query)
60+
return f.db.RunCommands(commands)
61+
}
62+
63+
func createViews(frames []*sdk.Frame, dirs Dirs) []string {
64+
commands := []string{}
65+
created := map[string]bool{}
66+
logger.Debug("starting to create views from frames", "frames", len(frames))
67+
for _, frame := range frames {
68+
if created[frame.RefID] {
69+
continue
70+
}
71+
cmd := fmt.Sprintf("CREATE VIEW %s AS (SELECT * from '%s/*.parquet');", frame.RefID, dirs[frame.RefID])
72+
logger.Debug("creating view", "cmd", cmd)
73+
commands = append(commands, cmd)
74+
created[frame.RefID] = true
75+
}
76+
return commands
77+
}
78+
79+
func (f *FrameData) data(name string, query string, frames []*sdk.Frame) (Dirs, bool, error) {
80+
if f.cacheDuration > 0 {
81+
// check the cache
82+
key := fmt.Sprintf("%s:%s", name, query)
83+
if d, ok := f.cache.get(key); ok {
84+
return d, true, nil
85+
}
86+
}
87+
88+
dirs, err := data.ToParquet(frames, f.db.chunk)
89+
return dirs, false, err
90+
}
91+
92+
func (f *FrameData) postProcess(name string, query string, dirs Dirs, cached bool) {
93+
go func() {
94+
if f.cacheDuration == 0 {
95+
wipe(dirs)
96+
return
97+
}
98+
// if result is not cached, a new cache entry will be created
99+
// delete the new cache entry after cacheDuration
100+
if !cached {
101+
time.Sleep(time.Duration(f.cacheDuration) * time.Second)
102+
key := fmt.Sprintf("%s:%s", name, query)
103+
f.cache.delete(key)
104+
// if the query is running wait for the query to finish before deleting the parquet files
105+
wg, wait := f.cache.getWait(key)
106+
if wait {
107+
wg.Wait()
108+
wipe(dirs)
109+
return
110+
}
111+
wipe(dirs)
112+
}
113+
}()
114+
}
115+
116+
type cache struct {
117+
store sync.Map
118+
wait sync.Map
119+
}
120+
121+
func (c *cache) set(key string, value Dirs) {
122+
c.store.Store(key, value)
123+
}
124+
125+
func (c *cache) get(key string) (Dirs, bool) {
126+
val, ok := c.store.Load(key)
127+
if !ok {
128+
return nil, false
129+
}
130+
return val.(Dirs), true
131+
}
132+
133+
func (c *cache) delete(key string) {
134+
c.store.Delete(key)
135+
}
136+
137+
func (c *cache) setWait(key string, value *sync.WaitGroup) {
138+
c.wait.Store(key, value)
139+
}
140+
141+
func (c *cache) getWait(key string) (*sync.WaitGroup, bool) {
142+
val, ok := c.wait.Load(key)
143+
if !ok {
144+
return nil, false
145+
}
146+
return val.(*sync.WaitGroup), true
147+
}
148+
149+
func (c *cache) deleteWait(key string) {
150+
c.wait.Delete(key)
151+
}

0 commit comments

Comments
 (0)