Skip to content

Commit a54ca89

Browse files
committed
bump chdb v3, implemented first part of connection based query
1 parent 24d9727 commit a54ca89

File tree

7 files changed

+435
-12
lines changed

7 files changed

+435
-12
lines changed

chdb/connection.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package chdb
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/chdb-io/chdb-go/chdbstable"
8+
)
9+
10+
type Connection struct {
11+
conn *chdbstable.ChdbConn
12+
connStr string
13+
path string
14+
isTemp bool
15+
}
16+
17+
// NewSession creates a new session with the given path.
18+
// If path is empty, a temporary directory is created.
19+
// Note: The temporary directory is removed when Close is called.
20+
func NewConnection(paths ...string) (*Connection, error) {
21+
path := ""
22+
if len(paths) > 0 {
23+
path = paths[0]
24+
}
25+
isTemp := false
26+
if path == "" {
27+
// Create a temporary directory
28+
tempDir, err := os.MkdirTemp("", "chdb_")
29+
if err != nil {
30+
return nil, err
31+
}
32+
path = tempDir
33+
isTemp = true
34+
35+
}
36+
connStr := fmt.Sprintf("file:%s/chdb.db", path)
37+
38+
conn, err := initConnection(connStr)
39+
if err != nil {
40+
return nil, err
41+
}
42+
return &Connection{connStr: connStr, path: path, isTemp: isTemp, conn: conn}, nil
43+
}
44+
45+
// Query calls queryToBuffer with a default output format of "CSV" if not provided.
46+
func (s *Connection) Query(queryStr string, outputFormats ...string) (result *chdbstable.LocalResult, err error) {
47+
outputFormat := "CSV" // Default value
48+
if len(outputFormats) > 0 {
49+
outputFormat = outputFormats[0]
50+
}
51+
52+
return connQueryToBuffer(s.conn, queryStr, outputFormat)
53+
}
54+
55+
// Close closes the session and removes the temporary directory
56+
//
57+
// temporary directory is created when NewSession was called with an empty path.
58+
func (s *Connection) Close() {
59+
// Remove the temporary directory if it starts with "chdb_"
60+
s.conn.Close()
61+
if s.isTemp {
62+
s.Cleanup()
63+
}
64+
}
65+
66+
// Cleanup closes the session and removes the directory.
67+
func (s *Connection) Cleanup() {
68+
// Remove the session directory, no matter if it is temporary or not
69+
_ = os.RemoveAll(s.path)
70+
}
71+
72+
// Path returns the path of the session.
73+
func (s *Connection) Path() string {
74+
return s.path
75+
}
76+
77+
// IsTemp returns whether the session is temporary.
78+
func (s *Connection) IsTemp() bool {
79+
return s.isTemp
80+
}

chdb/connection_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package chdb
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"testing"
7+
)
8+
9+
// TestNewconnection tests the creation of a new connection.
10+
func TestNewConnection(t *testing.T) {
11+
connection, err := NewConnection()
12+
if err != nil {
13+
t.Fatalf("Failed to create new connection: %s", err)
14+
}
15+
defer connection.Cleanup()
16+
17+
// Check if the connection directory exists
18+
if _, err := os.Stat(connection.Path()); os.IsNotExist(err) {
19+
t.Errorf("connection directory does not exist: %s", connection.Path())
20+
}
21+
22+
// Check if the connection is temporary
23+
if !connection.IsTemp() {
24+
t.Errorf("Expected connection to be temporary")
25+
}
26+
}
27+
28+
// TestconnectionClose tests the Close method of the connection.
29+
func TestConnectionClose(t *testing.T) {
30+
connection, _ := NewConnection()
31+
defer connection.Cleanup() // Cleanup in case Close fails
32+
33+
// Close the connection
34+
connection.Close()
35+
36+
// Check if the connection directory has been removed
37+
if _, err := os.Stat(connection.Path()); !os.IsNotExist(err) {
38+
t.Errorf("connection directory should be removed after Close: %s", connection.Path())
39+
}
40+
}
41+
42+
// TestconnectionCleanup tests the Cleanup method of the connection.
43+
func TestConnectionCleanup(t *testing.T) {
44+
connection, _ := NewConnection()
45+
46+
// Cleanup the connection
47+
connection.Cleanup()
48+
49+
// Check if the connection directory has been removed
50+
if _, err := os.Stat(connection.Path()); !os.IsNotExist(err) {
51+
t.Errorf("connection directory should be removed after Cleanup: %s", connection.Path())
52+
}
53+
}
54+
55+
// TestQuery tests the Query method of the connection.
56+
func TestQueryOnConnection(t *testing.T) {
57+
path := filepath.Join(os.TempDir(), "chdb_test")
58+
defer os.RemoveAll(path)
59+
connection, _ := NewConnection(path)
60+
defer connection.Cleanup()
61+
62+
connection.Query("CREATE DATABASE IF NOT EXISTS testdb; " +
63+
"CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;")
64+
65+
connection.Query(" INSERT INTO testdb.testtable VALUES (1), (2), (3);")
66+
67+
ret, err := connection.Query("SELECT * FROM testtable;")
68+
if err != nil {
69+
t.Errorf("Query failed: %s", err)
70+
}
71+
t.Errorf("result is: %s", string(ret.Buf()))
72+
if string(ret.Buf()) != "1\n2\n3\n" {
73+
t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf()))
74+
}
75+
}
76+
77+
func TestQueryOnConnection2(t *testing.T) {
78+
path := filepath.Join(os.TempDir(), "chdb_test")
79+
defer os.RemoveAll(path)
80+
connection, _ := NewConnection(path)
81+
defer connection.Cleanup()
82+
83+
ret, err := connection.Query("SELECT number+1 from system.numbers limit 3")
84+
if err != nil {
85+
t.Errorf("Query failed: %s", err)
86+
}
87+
if string(ret.Buf()) != "1\n2\n3\n" {
88+
t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf()))
89+
}
90+
}
91+
92+
func TestConnectionPathAndIsTemp(t *testing.T) {
93+
// Create a new connection and check its Path and IsTemp
94+
connection, _ := NewConnection()
95+
defer connection.Cleanup()
96+
97+
if connection.Path() == "" {
98+
t.Errorf("connection path should not be empty")
99+
}
100+
101+
if !connection.IsTemp() {
102+
t.Errorf("connection should be temporary")
103+
}
104+
105+
// Create a new connection with a specific path and check its Path and IsTemp
106+
path := filepath.Join(os.TempDir(), "chdb_test2")
107+
defer os.RemoveAll(path)
108+
connection, _ = NewConnection(path)
109+
defer connection.Cleanup()
110+
111+
if connection.Path() != path {
112+
t.Errorf("connection path should be %s, got %s", path, connection.Path())
113+
}
114+
115+
if connection.IsTemp() {
116+
t.Errorf("connection should not be temporary")
117+
}
118+
}

chdb/driver/driver.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const (
2727

2828
const (
2929
sessionOptionKey = "session"
30+
connectionOptionKey = "connection"
3031
udfPathOptionKey = "udfPath"
3132
driverTypeKey = "driverType"
3233
useUnsafeStringReaderKey = "useUnsafeStringReader"
@@ -136,11 +137,13 @@ func (e *execResult) RowsAffected() (int64, error) {
136137
type queryHandle func(string, ...string) (*chdbstable.LocalResult, error)
137138

138139
type connector struct {
139-
udfPath string
140-
driverType DriverType
141-
bufferSize int
142-
useUnsafe bool
143-
session *chdb.Session
140+
udfPath string
141+
driverType DriverType
142+
bufferSize int
143+
useUnsafe bool
144+
session *chdb.Session
145+
connection *chdb.Connection
146+
useConnection bool
144147
}
145148

146149
// Connect returns a connection to a database.
@@ -186,6 +189,17 @@ func NewConnect(opts map[string]string) (ret *connector, err error) {
186189
return nil, err
187190
}
188191
}
192+
connectionStr, ok := opts[connectionOptionKey]
193+
if ok {
194+
if ret.session != nil {
195+
return nil, fmt.Errorf("could not use both session & connection. please use one of the two")
196+
}
197+
ret.connection, err = chdb.NewConnection(connectionStr)
198+
if err != nil {
199+
return nil, err
200+
}
201+
ret.useConnection = true
202+
}
189203
driverType, ok := opts[driverTypeKey]
190204
if ok {
191205
ret.driverType = parseDriverType(driverType)
@@ -238,12 +252,14 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {
238252
}
239253

240254
type conn struct {
241-
udfPath string
242-
driverType DriverType
243-
bufferSize int
244-
useUnsafe bool
245-
session *chdb.Session
246-
QueryFun queryHandle
255+
udfPath string
256+
driverType DriverType
257+
bufferSize int
258+
useUnsafe bool
259+
useConnection bool
260+
session *chdb.Session
261+
connection *chdb.Connection
262+
QueryFun queryHandle
247263
}
248264

249265
func prepareValues(values []driver.Value) []driver.NamedValue {
@@ -267,6 +283,9 @@ func (c *conn) SetupQueryFun() {
267283
if c.session != nil {
268284
c.QueryFun = c.session.Query
269285
}
286+
if c.connection != nil {
287+
c.QueryFun = c.connection.Query
288+
}
270289
}
271290

272291
func (c *conn) Query(query string, values []driver.Value) (driver.Rows, error) {

chdb/driver/driver_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,63 @@ func TestDbWithSession(t *testing.T) {
168168
}
169169
}
170170

171+
func TestDbWithConnection(t *testing.T) {
172+
connectionDir, err := os.MkdirTemp("", "unittest-connectiondata")
173+
if err != nil {
174+
t.Fatalf("create temp directory fail, err: %s", err)
175+
}
176+
defer os.RemoveAll(connectionDir)
177+
connection, err := chdb.NewConnection(connectionDir)
178+
if err != nil {
179+
t.Fatalf("new connection fail, err: %s", err)
180+
}
181+
defer connection.Cleanup()
182+
183+
connection.Query("CREATE DATABASE IF NOT EXISTS testdb; " +
184+
"CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;")
185+
186+
connection.Query("USE testdb; INSERT INTO testtable VALUES (1), (2), (3);")
187+
188+
ret, err := connection.Query("SELECT * FROM testtable;")
189+
if err != nil {
190+
t.Fatalf("Query fail, err: %s", err)
191+
}
192+
if string(ret.Buf()) != "1\n2\n3\n" {
193+
t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf()))
194+
}
195+
db, err := sql.Open("chdb", fmt.Sprintf("connection=%s", connectionDir))
196+
if err != nil {
197+
t.Fatalf("open db fail, err: %s", err)
198+
}
199+
if db.Ping() != nil {
200+
t.Fatalf("ping db fail, err: %s", err)
201+
}
202+
rows, err := db.Query("select * from testtable;")
203+
if err != nil {
204+
t.Fatalf("exec create function fail, err: %s", err)
205+
}
206+
defer rows.Close()
207+
cols, err := rows.Columns()
208+
if err != nil {
209+
t.Fatalf("get result columns fail, err: %s", err)
210+
}
211+
if len(cols) != 1 {
212+
t.Fatalf("result columns length shoule be 3, actual: %d", len(cols))
213+
}
214+
var bar = 0
215+
var count = 1
216+
for rows.Next() {
217+
err = rows.Scan(&bar)
218+
if err != nil {
219+
t.Fatalf("scan fail, err: %s", err)
220+
}
221+
if bar != count {
222+
t.Fatalf("result is not match, want: %d actual: %d", count, bar)
223+
}
224+
count++
225+
}
226+
}
227+
171228
func TestQueryRow(t *testing.T) {
172229
sessionDir, err := os.MkdirTemp("", "unittest-sessiondata")
173230
if err != nil {

chdb/wrapper.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,16 @@ func queryToBuffer(queryStr, outputFormat, path, udfPath string) (result *chdbst
4040
// Call QueryStable with the constructed arguments
4141
return chdbstable.QueryStable(len(argv), argv)
4242
}
43+
44+
func initConnection(connStr string) (result *chdbstable.ChdbConn, err error) {
45+
argv := []string{connStr}
46+
// Call NewConnection with the constructed arguments
47+
return chdbstable.NewConnection(len(argv), argv)
48+
}
49+
50+
func connQueryToBuffer(conn *chdbstable.ChdbConn, queryStr, outputFormat string) (result *chdbstable.LocalResult, err error) {
51+
if outputFormat == "" {
52+
outputFormat = "CSV"
53+
}
54+
return conn.QueryConn(queryStr, outputFormat)
55+
}

0 commit comments

Comments
 (0)