Skip to content

Commit 824de93

Browse files
authored
Merge pull request #247 from tjungblu/etcd
add etcd
2 parents 542e823 + 45f105c commit 824de93

File tree

4 files changed

+182
-2
lines changed

4 files changed

+182
-2
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ Available Commands:
9898
- MongoDB
9999
- Redis and Redis Cluster
100100
- BoltDB
101+
- etcd
101102

102103
## Database Configuration
103104

@@ -299,6 +300,17 @@ Common configurations:
299300
|bolt.mmap_flags|0|Set the DB.MmapFlags flag before memory mapping the file|
300301
|bolt.initial_mmap_size|0|The initial mmap size of the database in bytes. If <= 0, the initial map size is 0. If the size is smaller than the previous database, it takes no effect|
301302

303+
### etcd
304+
305+
|field|default value|description|
306+
|-|-|-|
307+
|etcd.endpoints|"localhost:2379"|The etcd endpoint(s), multiple endpoints can be passed separated by comma.|
308+
|etcd.dial_timeout|"2s"|The dial timeout duration passed into the client config.|
309+
|etcd.cert_file|""|When using secure etcd, this should point to the crt file.|
310+
|etcd.key_file|""|When using secure etcd, this should point to the pem file.|
311+
|etcd.cacert_file|""|When using secure etcd, this should point to the ca file.|
312+
313+
302314
## TODO
303315

304316
- [ ] Support more measurement, like HdrHistogram

cmd/go-ycsb/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package main
1515

1616
import (
1717
"context"
18+
"fmt"
1819
"log"
1920
"net/http"
2021
_ "net/http/pprof"
@@ -28,15 +29,14 @@ import (
2829

2930
// Register workload
3031

31-
"fmt"
32+
"github.com/spf13/cobra"
3233

3334
"github.com/pingcap/go-ycsb/pkg/client"
3435
"github.com/pingcap/go-ycsb/pkg/measurement"
3536
"github.com/pingcap/go-ycsb/pkg/prop"
3637
"github.com/pingcap/go-ycsb/pkg/util"
3738
_ "github.com/pingcap/go-ycsb/pkg/workload"
3839
"github.com/pingcap/go-ycsb/pkg/ycsb"
39-
"github.com/spf13/cobra"
4040

4141
// Register basic database
4242
_ "github.com/pingcap/go-ycsb/db/basic"
@@ -70,6 +70,8 @@ import (
7070
_ "github.com/pingcap/go-ycsb/db/minio"
7171
// Register elastic
7272
_ "github.com/pingcap/go-ycsb/db/elasticsearch"
73+
// Register etcd
74+
_ "github.com/pingcap/go-ycsb/db/etcd"
7375
)
7476

7577
var (

db/etcd/db.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package etcd
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/tls"
7+
"encoding/json"
8+
"fmt"
9+
"strings"
10+
"time"
11+
12+
clientv3 "go.etcd.io/etcd/client/v3"
13+
14+
"github.com/magiconair/properties"
15+
"go.etcd.io/etcd/client/pkg/v3/transport"
16+
17+
"github.com/pingcap/go-ycsb/pkg/ycsb"
18+
)
19+
20+
// properties
21+
const (
22+
etcdEndpoints = "etcd.endpoints"
23+
etcdDialTimeout = "etcd.dial_timeout"
24+
etcdCertFile = "etcd.cert_file"
25+
etcdKeyFile = "etcd.key_file"
26+
etcdCaFile = "etcd.cacert_file"
27+
)
28+
29+
type etcdCreator struct{}
30+
31+
type etcdDB struct {
32+
p *properties.Properties
33+
client *clientv3.Client
34+
}
35+
36+
func init() {
37+
ycsb.RegisterDBCreator("etcd", etcdCreator{})
38+
}
39+
40+
func (c etcdCreator) Create(p *properties.Properties) (ycsb.DB, error) {
41+
cfg, err := getClientConfig(p)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
client, err := clientv3.New(*cfg)
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
return &etcdDB{
52+
p: p,
53+
client: client,
54+
}, nil
55+
}
56+
57+
func getClientConfig(p *properties.Properties) (*clientv3.Config, error) {
58+
endpoints := p.GetString(etcdEndpoints, "localhost:2379")
59+
dialTimeout := p.GetDuration(etcdDialTimeout, 2*time.Second)
60+
61+
var tlsConfig *tls.Config
62+
if strings.Contains(endpoints, "https") {
63+
tlsInfo := transport.TLSInfo{
64+
CertFile: p.MustGetString(etcdCertFile),
65+
KeyFile: p.MustGetString(etcdKeyFile),
66+
TrustedCAFile: p.MustGetString(etcdCaFile),
67+
}
68+
c, err := tlsInfo.ClientConfig()
69+
if err != nil {
70+
return nil, err
71+
}
72+
tlsConfig = c
73+
}
74+
75+
return &clientv3.Config{
76+
Endpoints: strings.Split(endpoints, ","),
77+
DialTimeout: dialTimeout,
78+
TLS: tlsConfig,
79+
}, nil
80+
}
81+
82+
func (db *etcdDB) Close() error {
83+
return db.client.Close()
84+
}
85+
86+
func (db *etcdDB) InitThread(ctx context.Context, _ int, _ int) context.Context {
87+
return ctx
88+
}
89+
90+
func (db *etcdDB) CleanupThread(_ context.Context) {
91+
}
92+
93+
func getRowKey(table string, key string) string {
94+
return fmt.Sprintf("%s:%s", table, key)
95+
}
96+
97+
func (db *etcdDB) Read(ctx context.Context, table string, key string, _ []string) (map[string][]byte, error) {
98+
rkey := getRowKey(table, key)
99+
value, err := db.client.Get(ctx, rkey)
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
if value.Count == 0 {
105+
return nil, fmt.Errorf("could not find value for key [%s]", rkey)
106+
}
107+
108+
var r map[string][]byte
109+
err = json.NewDecoder(bytes.NewReader(value.Kvs[0].Value)).Decode(&r)
110+
if err != nil {
111+
return nil, err
112+
}
113+
return r, nil
114+
}
115+
116+
func (db *etcdDB) Scan(ctx context.Context, table string, startKey string, count int, _ []string) ([]map[string][]byte, error) {
117+
res := make([]map[string][]byte, count)
118+
rkey := getRowKey(table, startKey)
119+
values, err := db.client.Get(ctx, rkey, clientv3.WithFromKey(), clientv3.WithLimit(int64(count)))
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
if values.Count != int64(count) {
125+
return nil, fmt.Errorf("unexpected number of result for key [%s], expected %d but was %d", rkey, count, values.Count)
126+
}
127+
128+
for _, v := range values.Kvs {
129+
var r map[string][]byte
130+
err = json.NewDecoder(bytes.NewReader(v.Value)).Decode(&r)
131+
if err != nil {
132+
return nil, err
133+
}
134+
res = append(res, r)
135+
}
136+
return res, nil
137+
}
138+
139+
func (db *etcdDB) Update(ctx context.Context, table string, key string, values map[string][]byte) error {
140+
rkey := getRowKey(table, key)
141+
data, err := json.Marshal(values)
142+
if err != nil {
143+
return err
144+
}
145+
_, err = db.client.Put(ctx, rkey, string(data))
146+
if err != nil {
147+
return err
148+
}
149+
150+
return nil
151+
}
152+
153+
func (db *etcdDB) Insert(ctx context.Context, table string, key string, values map[string][]byte) error {
154+
return db.Update(ctx, table, key, values)
155+
}
156+
157+
func (db *etcdDB) Delete(ctx context.Context, table string, key string) error {
158+
_, err := db.client.Delete(ctx, getRowKey(table, key))
159+
if err != nil {
160+
return err
161+
}
162+
return nil
163+
}

db/etcd/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package etcd
2+
3+
// If you want to use etcd, please follow the [Getting Started](https://github.com/etcd-io/etcd#getting-etcd) guide to install it.

0 commit comments

Comments
 (0)