From 1dafcc236a54cadd24b5a8f53bb0c14616300303 Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Thu, 24 Apr 2025 12:42:43 +0000 Subject: [PATCH 1/4] add parquet streaming --- chdb-purego/binding.go | 24 ++- chdb-purego/chdb.go | 28 ++++ chdb-purego/streaming.go | 72 +++++++++ chdb-purego/types.go | 14 +- chdb.h | 81 ++++++---- chdb/driver/driver.go | 100 +++++++++--- chdb/driver/parquet_streaming.go | 212 ++++++++++++++++++++++++++ chdb/driver/parquet_streaming_test.go | 138 +++++++++++++++++ chdb/session.go | 9 ++ chdb/wrapper.go | 15 ++ 10 files changed, 640 insertions(+), 53 deletions(-) create mode 100644 chdb-purego/streaming.go create mode 100644 chdb/driver/parquet_streaming.go create mode 100644 chdb/driver/parquet_streaming_test.go diff --git a/chdb-purego/binding.go b/chdb-purego/binding.go index e2330cf..aa7aab1 100644 --- a/chdb-purego/binding.go +++ b/chdb-purego/binding.go @@ -35,13 +35,18 @@ func findLibrary() string { } var ( - queryStable func(argc int, argv []string) *local_result - freeResult func(result *local_result) - queryStableV2 func(argc int, argv []string) *local_result_v2 - freeResultV2 func(result *local_result_v2) - connectChdb func(argc int, argv []*byte) **chdb_conn - closeConn func(conn **chdb_conn) - queryConn func(conn *chdb_conn, query string, format string) *local_result_v2 + queryStable func(argc int, argv []string) *local_result + freeResult func(result *local_result) + queryStableV2 func(argc int, argv []string) *local_result_v2 + freeResultV2 func(result *local_result_v2) + connectChdb func(argc int, argv []*byte) **chdb_conn + closeConn func(conn **chdb_conn) + queryConn func(conn *chdb_conn, query string, format string) *local_result_v2 + queryConnStreaming func(conn *chdb_conn, query string, format string) *chdb_streaming_result + streamingResultError func(result *chdb_streaming_result) *string + streamingResultNext func(conn *chdb_conn, result *chdb_streaming_result) *local_result_v2 + streamingResultDestroy func(result *chdb_streaming_result) + streamingResultCancel func(conn *chdb_conn, result *chdb_streaming_result) ) func init() { @@ -58,5 +63,10 @@ func init() { purego.RegisterLibFunc(&connectChdb, libchdb, "connect_chdb") purego.RegisterLibFunc(&closeConn, libchdb, "close_conn") purego.RegisterLibFunc(&queryConn, libchdb, "query_conn") + purego.RegisterLibFunc(&queryConnStreaming, libchdb, "query_conn_streaming") + purego.RegisterLibFunc(&streamingResultError, libchdb, "chdb_streaming_result_error") + purego.RegisterLibFunc(&streamingResultNext, libchdb, "chdb_streaming_fetch_result") + purego.RegisterLibFunc(&streamingResultCancel, libchdb, "chdb_streaming_cancel_query") + purego.RegisterLibFunc(&streamingResultDestroy, libchdb, "chdb_destroy_result") } diff --git a/chdb-purego/chdb.go b/chdb-purego/chdb.go index 6a0cdef..56e2663 100644 --- a/chdb-purego/chdb.go +++ b/chdb-purego/chdb.go @@ -98,6 +98,11 @@ type connection struct { conn **chdb_conn } +// CancelQuery implements ChdbConn. +func (c *connection) CancelQuery(query ChdbResult) (err error) { + panic("unimplemented") +} + func newChdbConn(conn **chdb_conn) ChdbConn { c := &connection{ conn: conn, @@ -136,6 +141,29 @@ func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult return newChdbResult(res), nil } +// Query implements ChdbConn. +func (c *connection) QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) { + + if c.conn == nil { + return nil, fmt.Errorf("invalid connection") + } + + rawConn := *c.conn + + res := queryConnStreaming(rawConn, queryStr, formatStr) + if res == nil { + // According to the C ABI of chDB v1.2.0, the C function query_stable_v2 + // returns nil if the query returns no data. This is not an error. We + // will change this behavior in the future. + return newStreamingResult(rawConn, res), nil + } + if s := streamingResultError(res); s != nil { + return nil, errors.New(*s) + } + + return newStreamingResult(rawConn, res), nil +} + func (c *connection) Ready() bool { if c.conn != nil { deref := *c.conn diff --git a/chdb-purego/streaming.go b/chdb-purego/streaming.go new file mode 100644 index 0000000..08a144f --- /dev/null +++ b/chdb-purego/streaming.go @@ -0,0 +1,72 @@ +package chdbpurego + +import "errors" + +type streamingResult struct { + curConn *chdb_conn + stream *chdb_streaming_result + curChunk ChdbResult +} + +func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStreamResult { + + // nextChunk := streamingResultNext(conn, cRes) + // if nextChunk == nil { + // return nil + // } + + res := &streamingResult{ + curConn: conn, + stream: cRes, + // curChunk: newChdbResult(nextChunk), + } + + // runtime.SetFinalizer(res, res.Free) + return res + +} + +// Error implements ChdbStreamResult. +func (c *streamingResult) Error() error { + if s := streamingResultError(c.stream); s != nil { + return errors.New(*s) + } + return nil +} + +// Free implements ChdbStreamResult. +func (c *streamingResult) Free() { + streamingResultCancel(c.curConn, c.stream) + streamingResultDestroy(c.stream) + c.stream = nil + if c.curChunk != nil { + c.curChunk.Free() + c.curChunk = nil + } +} + +// Cancel implements ChdbStreamResult. +func (c *streamingResult) Cancel() { + c.Free() +} + +// GetNext implements ChdbStreamResult. +func (c *streamingResult) GetNext() ChdbResult { + if c.curChunk == nil { + nextChunk := streamingResultNext(c.curConn, c.stream) + if nextChunk == nil { + return nil + } + c.curChunk = newChdbResult(nextChunk) + return c.curChunk + } + // free the current chunk before getting the next one + c.curChunk.Free() + c.curChunk = nil + nextChunk := streamingResultNext(c.curConn, c.stream) + if nextChunk == nil { + return nil + } + c.curChunk = newChdbResult(nextChunk) + return c.curChunk +} diff --git a/chdb-purego/types.go b/chdb-purego/types.go index ea953cb..dcf015a 100644 --- a/chdb-purego/types.go +++ b/chdb-purego/types.go @@ -24,6 +24,11 @@ type local_result_v2 struct { error_message *byte } +// clickhouse streaming result struct. for reference: https://github.com/chdb-io/chdb/blob/main/programs/local/chdb.h#L65 +type chdb_streaming_result struct { + internal_data unsafe.Pointer +} + // clickhouse background server connection.for reference: https://github.com/chdb-io/chdb/blob/main/programs/local/chdb.h#L82 type chdb_conn struct { server unsafe.Pointer @@ -32,7 +37,6 @@ type chdb_conn struct { } type ChdbResult interface { - // Raw bytes result buffer, used for reading the result of clickhouse query Buf() []byte // String rapresentation of the the buffer String() string @@ -50,9 +54,17 @@ type ChdbResult interface { Free() } +type ChdbStreamResult interface { + GetNext() ChdbResult + Error() error + Cancel() + Free() +} + type ChdbConn interface { //Query executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr Query(queryStr string, formatStr string) (result ChdbResult, err error) + QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) //Ready returns a boolean indicating if the connections is successfully established. Ready() bool //Close the connection and free the underlying allocated memory diff --git a/chdb.h b/chdb.h index 1188128..498ca61 100644 --- a/chdb.h +++ b/chdb.h @@ -1,12 +1,8 @@ #pragma once #ifdef __cplusplus -# include # include # include -# include -# include -# include extern "C" { #else # include @@ -55,26 +51,6 @@ CHDB_EXPORT void free_result(struct local_result * result); CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); -#ifdef __cplusplus -struct query_request -{ - std::string query; - std::string format; -}; - -struct query_queue -{ - std::mutex mutex; - std::condition_variable query_cv; // For query submission - std::condition_variable result_cv; // For query result retrieval - query_request current_query; - local_result_v2 * current_result = nullptr; - bool has_query = false; - bool shutdown = false; - bool cleanup_done = false; -}; -#endif - /** * Connection structure for chDB * Contains server instance, connection state, and query processing queue @@ -86,11 +62,15 @@ struct chdb_conn void * queue; /* Query processing queue */ }; +typedef struct { + void * internal_data; +} chdb_streaming_result; + /** * Creates a new chDB connection. * Only one active connection is allowed per process. * Creating a new connection with different path requires closing existing connection. - * + * * @param argc Number of command-line arguments * @param argv Command-line arguments array (--path= to specify database location) * @return Pointer to connection pointer, or NULL on failure @@ -101,7 +81,7 @@ CHDB_EXPORT struct chdb_conn ** connect_chdb(int argc, char ** argv); /** * Closes an existing chDB connection and cleans up resources. * Thread-safe function that handles connection shutdown and cleanup. - * + * * @param conn Pointer to connection pointer to close */ CHDB_EXPORT void close_conn(struct chdb_conn ** conn); @@ -109,7 +89,7 @@ CHDB_EXPORT void close_conn(struct chdb_conn ** conn); /** * Executes a query on the given connection. * Thread-safe function that handles query execution in a separate thread. - * + * * @param conn Connection to execute query on * @param query SQL query string to execute * @param format Output format string (e.g., "CSV", default format) @@ -118,6 +98,51 @@ CHDB_EXPORT void close_conn(struct chdb_conn ** conn); */ CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const char * query, const char * format); +/** + * Executes a streaming query on the given connection. + * @brief Initializes streaming query execution and returns result handle + * @param conn Connection to execute query on + * @param query SQL query string to execute + * @param format Output format string (e.g. "CSV", default format) + * @return Streaming result handle containing query state or error message + * @note Returns error result if connection is invalid or closed + */ +CHDB_EXPORT chdb_streaming_result * query_conn_streaming(struct chdb_conn * conn, const char * query, const char * format); + +/** + * Retrieves error message from streaming result. + * @brief Gets error message associated with streaming query execution + * @param result Streaming result handle from query_conn_streaming() + * @return Null-terminated error message string, or NULL if no error occurred + */ +CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); + +/** + * Fetches next chunk of streaming results. + * @brief Iterates through streaming query results + * @param conn Active connection handle + * @param result Streaming result handle from query_conn_streaming() + * @return Materialized result chunk with data + * @note Returns empty result when stream ends + */ +CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_conn * conn, chdb_streaming_result * result); + +/** + * Cancels ongoing streaming query. + * @brief Aborts streaming query execution and cleans up resources + * @param conn Active connection handle + * @param result Streaming result handle to cancel + */ +CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_streaming_result * result); + +/** + * Releases resources associated with streaming result. + * @brief Destroys streaming result handle and frees allocated memory + * @param result Streaming result handle to destroy + * @warning Must be called even if query was finished or canceled + */ +CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result); + #ifdef __cplusplus } -#endif +#endif \ No newline at end of file diff --git a/chdb/driver/driver.go b/chdb/driver/driver.go index 7363600..75ba1f9 100644 --- a/chdb/driver/driver.go +++ b/chdb/driver/driver.go @@ -20,6 +20,7 @@ type DriverType int const ( ARROW DriverType = iota PARQUET + PARQUET_STREAMING INVALID ) @@ -53,8 +54,47 @@ func (d DriverType) PrepareRows(result chdbpurego.ChdbResult, buf []byte, bufSiz bufferSize: bufSize, needNewBuffer: true, useUnsafeStringReader: useUnsafe, }, nil + + } + return nil, fmt.Errorf("unsupported driver type") +} + +func (d DriverType) PrepareStreamingRows(result chdbpurego.ChdbStreamResult, bufSize int, useUnsafe bool) (driver.Rows, error) { + switch d { + case PARQUET_STREAMING: + nextRes := result.GetNext() + if nextRes == nil { + return nil, fmt.Errorf("result is nil") + } + + reader := parquet.NewGenericReader[any](bytes.NewReader(nextRes.Buf())) + return &parquetStreamingRows{ + stream: result, curChunk: nextRes, reader: reader, + bufferSize: bufSize, needNewBuffer: true, + useUnsafeStringReader: useUnsafe, + }, nil + + } + return nil, fmt.Errorf("unsupported driver type") +} + +func (d DriverType) SupportStreaming() bool { + switch d { + case PARQUET_STREAMING: + return true + } + return false +} + +func (d DriverType) GetFormat() string { + switch d { + case PARQUET: + return "Parquet" + case PARQUET_STREAMING: + return "Parquet" } - return nil, fmt.Errorf("Unsupported driver type") + return "" + } func parseDriverType(s string) DriverType { @@ -63,6 +103,8 @@ func parseDriverType(s string) DriverType { // return ARROW case "PARQUET": return PARQUET + case "PARQUET_STREAMING": + return PARQUET_STREAMING } return INVALID } @@ -129,12 +171,15 @@ func (e *execResult) RowsAffected() (int64, error) { type queryHandle func(string, ...string) (chdbpurego.ChdbResult, error) +type queryStream func(string, ...string) (chdbpurego.ChdbStreamResult, error) + type connector struct { - udfPath string - driverType DriverType - bufferSize int - useUnsafe bool - session *chdb.Session + udfPath string + driverType DriverType + bufferSize int + isStreaming bool + useUnsafe bool + session *chdb.Session } // Connect returns a connection to a database. @@ -145,7 +190,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { cc := &conn{ udfPath: c.udfPath, session: c.session, driverType: c.driverType, bufferSize: c.bufferSize, - useUnsafe: c.useUnsafe, + useUnsafe: c.useUnsafe, isStreaming: c.isStreaming, } cc.SetupQueryFun() return cc, nil @@ -215,6 +260,7 @@ func NewConnect(opts map[string]string) (ret *connector, err error) { return nil, err } } + ret.isStreaming = ret.driverType.SupportStreaming() return } @@ -239,13 +285,15 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) { } type conn struct { - udfPath string - driverType DriverType - bufferSize int - useUnsafe bool - session *chdb.Session - - QueryFun queryHandle + udfPath string + driverType DriverType + bufferSize int + useUnsafe bool + isStreaming bool + session *chdb.Session + + QueryFun queryHandle + streamFun queryStream } func prepareValues(values []driver.Value) []driver.NamedValue { @@ -265,9 +313,19 @@ func (c *conn) Close() error { } func (c *conn) SetupQueryFun() { - c.QueryFun = chdb.Query + if c.isStreaming { + c.streamFun = chdb.QueryStream + } else { + c.QueryFun = chdb.Query + } + if c.session != nil { - c.QueryFun = c.session.Query + if c.isStreaming { + c.streamFun = c.session.QueryStream + } else { + c.QueryFun = c.session.Query + } + } } @@ -289,6 +347,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name if err != nil { return nil, err } + result, err := c.QueryFun(compiledQuery, c.driverType.String(), c.udfPath) if err != nil { return nil, err @@ -336,7 +395,14 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam if err != nil { return nil, err } - result, err := c.QueryFun(compiledQuery, c.driverType.String(), c.udfPath) + if c.isStreaming { + result, err := c.streamFun(compiledQuery, c.driverType.GetFormat(), c.udfPath) + if err != nil { + return nil, err + } + return c.driverType.PrepareStreamingRows(result, c.bufferSize, c.useUnsafe) + } + result, err := c.QueryFun(compiledQuery, c.driverType.GetFormat(), c.udfPath) if err != nil { return nil, err } diff --git a/chdb/driver/parquet_streaming.go b/chdb/driver/parquet_streaming.go new file mode 100644 index 0000000..f4726ac --- /dev/null +++ b/chdb/driver/parquet_streaming.go @@ -0,0 +1,212 @@ +package chdbdriver + +import ( + "bytes" + "database/sql/driver" + "fmt" + "io" + "time" + + "reflect" + + chdbpurego "github.com/chdb-io/chdb-go/chdb-purego" + "github.com/parquet-go/parquet-go" +) + +type parquetStreamingRows struct { + stream chdbpurego.ChdbStreamResult // result from clickhouse + curChunk chdbpurego.ChdbResult // current chunk + reader *parquet.GenericReader[any] // parquet reader + curRecord parquet.Row // TODO: delete this? + buffer []parquet.Row // record buffer + bufferSize int // amount of records to preload into buffer + bufferIndex int64 // index in the current buffer + curRow int64 // row counter + needNewBuffer bool + useUnsafeStringReader bool +} + +func (r *parquetStreamingRows) Columns() (out []string) { + sch := r.reader.Schema() + for _, f := range sch.Fields() { + out = append(out, f.Name()) + } + + return +} + +func (r *parquetStreamingRows) Close() error { + if r.curRecord != nil { + r.curRecord = nil + } + // ignore reader close + _ = r.reader.Close() + r.reader = nil + r.stream.Free() + r.curChunk = nil + r.stream = nil + + r.buffer = nil + return nil +} + +func (r *parquetStreamingRows) readNextChunkFromBuf() error { + r.buffer = make([]parquet.Row, r.bufferSize) + readAmount, err := r.reader.ReadRows(r.buffer) + if err == io.EOF && readAmount == 0 { + return err // no records read, should exit the loop + } + if err == io.EOF && readAmount > 0 { + return nil //here we are at EOF, but since we read at least 1 record, we should consume it + } + if readAmount == 0 { + return io.EOF //same thing + } + if readAmount < r.bufferSize { + r.buffer = r.buffer[:readAmount] //eliminate empty items so the loop will exit before + } + r.bufferIndex = 0 + r.needNewBuffer = false + return nil +} + +func (r *parquetStreamingRows) readNextChunkFromStream() error { + if err := r.reader.Close(); err != nil { + return err + } + r.curChunk = r.stream.GetNext() + if r.curChunk == nil { + return io.EOF + } + if r.curChunk.Error() != nil { + return fmt.Errorf("error in chunk: %s", r.curChunk.Error()) + } + if r.curChunk.RowsRead() == 0 { + return io.EOF + } + r.reader = parquet.NewGenericReader[any](bytes.NewReader(r.curChunk.Buf())) + return nil +} + +func (r *parquetStreamingRows) Next(dest []driver.Value) error { + if r.curRow == 0 && r.curChunk.RowsRead() == 0 { + return io.EOF //here we can simply return early since we don't need to issue a read to the file + } + if r.needNewBuffer { + err := r.readNextChunkFromBuf() + if err != nil { + if err2 := r.readNextChunkFromStream(); err2 != nil { + return err2 + } else { + if err := r.readNextChunkFromBuf(); err != nil { + return err + } + } + } + + } + r.curRecord = r.buffer[r.bufferIndex] + if len(r.curRecord) == 0 { + return fmt.Errorf("empty row") + } + var scanError error + r.curRecord.Range(func(columnIndex int, columnValues []parquet.Value) bool { + if len(columnValues) != 1 { + return false + } + curVal := columnValues[0] + if curVal.IsNull() { + dest[columnIndex] = nil + return true + } + switch r.ColumnTypeDatabaseTypeName(columnIndex) { + case "STRING": + // we check if the user has initialized the connection with the unsafeStringReader parameter, and in that case we use `getStringFromBytes` method. + // otherwise, we fallback to the traditional way and we allocate a new string + if r.useUnsafeStringReader { + dest[columnIndex] = getStringFromBytes(curVal) + } else { + dest[columnIndex] = string(curVal.ByteArray()) + } + + case "INT8", "INT(8,true)": + dest[columnIndex] = int8(curVal.Int32()) //check if this is correct + case "INT16", "INT(16,true)": + dest[columnIndex] = int16(curVal.Int32()) + case "INT64", "INT(64,true)": + dest[columnIndex] = curVal.Int64() + case "INT(64,false)": + dest[columnIndex] = curVal.Uint64() + case "INT(32,false)": + dest[columnIndex] = curVal.Uint32() + case "INT(8,false)": + dest[columnIndex] = uint8(curVal.Uint32()) //check if this is correct + case "INT(16,false)": + dest[columnIndex] = uint16(curVal.Uint32()) + case "INT32", "INT(32,true)": + dest[columnIndex] = curVal.Int32() + case "FLOAT32": + dest[columnIndex] = curVal.Float() + case "DOUBLE": + dest[columnIndex] = curVal.Double() + case "BOOLEAN": + dest[columnIndex] = curVal.Boolean() + case "BYTE_ARRAY", "FIXED_LEN_BYTE_ARRAY": + dest[columnIndex] = curVal.ByteArray() + case "TIMESTAMP(isAdjustedToUTC=true,unit=MILLIS)", "TIME(isAdjustedToUTC=true,unit=MILLIS)": + dest[columnIndex] = time.UnixMilli(curVal.Int64()).UTC() + case "TIMESTAMP(isAdjustedToUTC=true,unit=MICROS)", "TIME(isAdjustedToUTC=true,unit=MICROS)": + dest[columnIndex] = time.UnixMicro(curVal.Int64()).UTC() + case "TIMESTAMP(isAdjustedToUTC=true,unit=NANOS)", "TIME(isAdjustedToUTC=true,unit=NANOS)": + dest[columnIndex] = time.Unix(0, curVal.Int64()).UTC() + case "TIMESTAMP(isAdjustedToUTC=false,unit=MILLIS)", "TIME(isAdjustedToUTC=false,unit=MILLIS)": + dest[columnIndex] = time.UnixMilli(curVal.Int64()) + case "TIMESTAMP(isAdjustedToUTC=false,unit=MICROS)", "TIME(isAdjustedToUTC=false,unit=MICROS)": + dest[columnIndex] = time.UnixMicro(curVal.Int64()) + case "TIMESTAMP(isAdjustedToUTC=false,unit=NANOS)", "TIME(isAdjustedToUTC=false,unit=NANOS)": + dest[columnIndex] = time.Unix(0, curVal.Int64()) + default: + scanError = fmt.Errorf("could not cast to type: %s", r.ColumnTypeDatabaseTypeName(columnIndex)) + return false + + } + return true + }) + if scanError != nil { + return scanError + } + r.curRow++ + r.bufferIndex++ + r.needNewBuffer = r.bufferIndex == int64(len(r.buffer)) // if we achieved the buffer size, we need a new one + return nil +} + +func (r *parquetStreamingRows) ColumnTypeDatabaseTypeName(index int) string { + return r.reader.Schema().Fields()[index].Type().String() +} + +func (r *parquetStreamingRows) ColumnTypeNullable(index int) (nullable, ok bool) { + return r.reader.Schema().Fields()[index].Optional(), true +} + +func (r *parquetStreamingRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { + return 0, 0, false +} + +func (r *parquetStreamingRows) ColumnTypeScanType(index int) reflect.Type { + switch r.reader.Schema().Fields()[index].Type().Kind() { + case parquet.Boolean: + return reflect.TypeOf(false) + case parquet.Int32: + return reflect.TypeOf(int32(0)) + case parquet.Int64: + return reflect.TypeOf(int64(0)) + case parquet.Float: + return reflect.TypeOf(float32(0)) + case parquet.Double: + return reflect.TypeOf(float64(0)) + case parquet.ByteArray, parquet.FixedLenByteArray: + return reflect.TypeOf("") + } + return nil +} diff --git a/chdb/driver/parquet_streaming_test.go b/chdb/driver/parquet_streaming_test.go new file mode 100644 index 0000000..55d7e35 --- /dev/null +++ b/chdb/driver/parquet_streaming_test.go @@ -0,0 +1,138 @@ +package chdbdriver + +import ( + "database/sql" + "fmt" + "testing" +) + +func TestDbWithParquetStreaming(t *testing.T) { + + db, err := sql.Open("chdb", fmt.Sprintf("driverType=%s", "PARQUET_STREAMING")) + if err != nil { + t.Errorf("open db fail, err:%s", err) + } + if db.Ping() != nil { + t.Errorf("ping db fail") + } + rows, err := db.Query(`SELECT 1,number from system.numbers limit 100000`) + if err != nil { + t.Errorf("run Query fail, err:%s", err) + } + cols, err := rows.Columns() + if err != nil { + t.Errorf("get result columns fail, err: %s", err) + } + if len(cols) != 2 { + t.Errorf("select result columns length should be 2") + } + var ( + bar int + foo int + ) + defer rows.Close() + for rows.Next() { + err := rows.Scan(&bar, &foo) + if err != nil { + t.Errorf("scan fail, err: %s", err) + } + if bar != 1 { + t.Errorf("expected error") + } + + } +} + +func TestDBWithParquetStreamingSession(t *testing.T) { + + session.Query( + "CREATE TABLE IF NOT EXISTS TestDBWithParquetSessionStreaming (id UInt32) ENGINE = MergeTree() ORDER BY id;") + + session.Query("INSERT INTO TestDBWithParquetSessionStreaming VALUES (1), (2), (3);") + + ret, err := session.Query("SELECT * FROM TestDBWithParquetSessionStreaming;") + if err != nil { + t.Fatalf("Query fail, err: %s", err) + } + if string(ret.Buf()) != "1\n2\n3\n" { + t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) + } + db, err := sql.Open("chdb", fmt.Sprintf("session=%s;driverType=%s", session.ConnStr(), "PARQUET_STREAMING")) + if err != nil { + t.Fatalf("open db fail, err: %s", err) + } + if db.Ping() != nil { + t.Fatalf("ping db fail, err: %s", err) + } + rows, err := db.Query("select * from TestDBWithParquetSessionStreaming;") + if err != nil { + t.Fatalf("exec create function fail, err: %s", err) + } + defer rows.Close() + cols, err := rows.Columns() + if err != nil { + t.Fatalf("get result columns fail, err: %s", err) + } + if len(cols) != 1 { + t.Fatalf("result columns length shoule be 3, actual: %d", len(cols)) + } + var bar = 0 + var count = 1 + for rows.Next() { + err = rows.Scan(&bar) + if err != nil { + t.Fatalf("scan fail, err: %s", err) + } + if bar != count { + t.Fatalf("result is not match, want: %d actual: %d", count, bar) + } + count++ + } +} + +func TestDBWithParquetStreamingConnection(t *testing.T) { + + session.Query( + "CREATE TABLE IF NOT EXISTS TestDBWithParquetConnectionStreaming (id UInt32) ENGINE = MergeTree() ORDER BY id;") + + session.Query("INSERT INTO TestDBWithParquetConnectionStreaming VALUES (1), (2), (3);") + + ret, err := session.Query("SELECT * FROM TestDBWithParquetConnectionStreaming;") + if err != nil { + t.Fatalf("Query fail, err: %s", err) + } + if string(ret.Buf()) != "1\n2\n3\n" { + t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) + } + db, err := sql.Open("chdb", fmt.Sprintf("session=%s;driverType=%s", session.ConnStr(), "PARQUET_STREAMING")) + if err != nil { + t.Fatalf("open db fail, err: %s", err) + } + if db.Ping() != nil { + t.Fatalf("ping db fail, err: %s", err) + } + rows, err := db.Query("select * from TestDBWithParquetConnectionStreaming;") + if err != nil { + t.Fatalf("exec create function fail, err: %s", err) + } + defer rows.Close() + cols, err := rows.Columns() + if err != nil { + t.Fatalf("get result columns fail, err: %s", err) + } + if len(cols) != 1 { + t.Fatalf("result columns length shoule be 3, actual: %d", len(cols)) + } + var bar = 0 + var count = 1 + for rows.Next() { + err = rows.Scan(&bar) + if err != nil { + t.Fatalf("scan fail, err: %s", err) + } + if bar != count { + t.Fatalf("result is not match, want: %d actual: %d", count, bar) + } + count++ + } +} diff --git a/chdb/session.go b/chdb/session.go index 374d367..d396213 100644 --- a/chdb/session.go +++ b/chdb/session.go @@ -59,6 +59,15 @@ func (s *Session) Query(queryStr string, outputFormats ...string) (result chdbpu return s.conn.Query(queryStr, outputFormat) } +// Query calls `query_conn` function with the current connection and a default output format of "CSV" if not provided. +func (s *Session) QueryStream(queryStr string, outputFormats ...string) (result chdbpurego.ChdbStreamResult, err error) { + outputFormat := "CSV" // Default value + if len(outputFormats) > 0 { + outputFormat = outputFormats[0] + } + return s.conn.QueryStreaming(queryStr, outputFormat) +} + // Close closes the session and removes the temporary directory // // temporary directory is created when NewSession was called with an empty path. diff --git a/chdb/wrapper.go b/chdb/wrapper.go index 53619a0..9dfb74c 100644 --- a/chdb/wrapper.go +++ b/chdb/wrapper.go @@ -19,6 +19,21 @@ func Query(queryStr string, outputFormats ...string) (result chdbpurego.ChdbResu return tempSession.Query(queryStr, outputFormat) } +// Query calls query_conn with a default in-memory session and default output format of "CSV" if not provided. +func QueryStream(queryStr string, outputFormats ...string) (result chdbpurego.ChdbStreamResult, err error) { + outputFormat := "CSV" // Default value + if len(outputFormats) > 0 { + outputFormat = outputFormats[0] + } + // tempSession, err := initConnection(":memory:?verbose&log-level=test") + tempSession, err := initConnection(":memory:") + if err != nil { + return nil, err + } + defer tempSession.Close() + return tempSession.QueryStreaming(queryStr, outputFormat) +} + func initConnection(connStr string) (result chdbpurego.ChdbConn, err error) { return chdbpurego.NewConnectionFromConnString(connStr) } From b897f144f4ed8807efcdf6fa8fb53549a35f8560 Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Thu, 24 Apr 2025 12:47:45 +0000 Subject: [PATCH 2/4] update docs --- chdb-purego/chdb.go | 2 +- chdb-purego/types.go | 9 +++++++ chdb.md | 38 ++++++++++++++++++++------- chdb/session.go | 4 ++- lowApi.md | 61 ++++++++++++++++++++++++++++++++++---------- 5 files changed, 90 insertions(+), 24 deletions(-) diff --git a/chdb-purego/chdb.go b/chdb-purego/chdb.go index 56e2663..6bb4e7a 100644 --- a/chdb-purego/chdb.go +++ b/chdb-purego/chdb.go @@ -141,7 +141,7 @@ func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult return newChdbResult(res), nil } -// Query implements ChdbConn. +// QueryStreaming implements ChdbConn. func (c *connection) QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) { if c.conn == nil { diff --git a/chdb-purego/types.go b/chdb-purego/types.go index dcf015a..3a67868 100644 --- a/chdb-purego/types.go +++ b/chdb-purego/types.go @@ -55,15 +55,24 @@ type ChdbResult interface { } type ChdbStreamResult interface { + // GetNext returns the next chunk of data from the stream. + // The chunk is a ChdbResult object that can be used to read the data. + // If there are no more chunks, it returns nil. GetNext() ChdbResult + // Error returns the error message if there was an error during the streaming process. Error() error + // Cancel cancels the streaming process and frees the underlying memory. Cancel() + // Free frees the underlying memory and closes the stream. Free() } type ChdbConn interface { //Query executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr Query(queryStr string, formatStr string) (result ChdbResult, err error) + // QueryStreaming executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr + // The result is a stream of data that can be read in chunks. + // This is useful for large datasets that cannot be loaded into memory all at once. QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) //Ready returns a boolean indicating if the connections is successfully established. Ready() bool diff --git a/chdb.md b/chdb.md index 426a847..f87c9bd 100644 --- a/chdb.md +++ b/chdb.md @@ -9,6 +9,7 @@ import "github.com/chdb-io/chdb-go/chdb" ## Index - [func Query\(queryStr string, outputFormats ...string\) \(result chdbpurego.ChdbResult, err error\)](<#Query>) +- [func QueryStream\(queryStr string, outputFormats ...string\) \(result chdbpurego.ChdbStreamResult, err error\)](<#QueryStream>) - [type Session](<#Session>) - [func NewSession\(paths ...string\) \(\*Session, error\)](<#NewSession>) - [func \(s \*Session\) Cleanup\(\)](<#Session.Cleanup>) @@ -17,10 +18,11 @@ import "github.com/chdb-io/chdb-go/chdb" - [func \(s \*Session\) IsTemp\(\) bool](<#Session.IsTemp>) - [func \(s \*Session\) Path\(\) string](<#Session.Path>) - [func \(s \*Session\) Query\(queryStr string, outputFormats ...string\) \(result chdbpurego.ChdbResult, err error\)](<#Session.Query>) + - [func \(s \*Session\) QueryStream\(queryStr string, outputFormats ...string\) \(result chdbpurego.ChdbStreamResult, err error\)](<#Session.QueryStream>) -## func [Query]() +## func [Query]() ```go func Query(queryStr string, outputFormats ...string) (result chdbpurego.ChdbResult, err error) @@ -28,8 +30,17 @@ func Query(queryStr string, outputFormats ...string) (result chdbpurego.ChdbResu Query calls query\_conn with a default in\-memory session and default output format of "CSV" if not provided. + +## func [QueryStream]() + +```go +func QueryStream(queryStr string, outputFormats ...string) (result chdbpurego.ChdbStreamResult, err error) +``` + +Query calls query\_conn with a default in\-memory session and default output format of "CSV" if not provided. + -## type [Session]() +## type [Session]() @@ -40,7 +51,7 @@ type Session struct { ``` -### func [NewSession]() +### func [NewSession]() ```go func NewSession(paths ...string) (*Session, error) @@ -49,7 +60,7 @@ func NewSession(paths ...string) (*Session, error) NewSession creates a new session with the given path. If path is empty, a temporary directory is created. Note: The temporary directory is removed when Close is called. -### func \(\*Session\) [Cleanup]() +### func \(\*Session\) [Cleanup]() ```go func (s *Session) Cleanup() @@ -58,7 +69,7 @@ func (s *Session) Cleanup() Cleanup closes the session and removes the directory. -### func \(\*Session\) [Close]() +### func \(\*Session\) [Close]() ```go func (s *Session) Close() @@ -71,7 +82,7 @@ temporary directory is created when NewSession was called with an empty path. ``` -### func \(\*Session\) [ConnStr]() +### func \(\*Session\) [ConnStr]() ```go func (s *Session) ConnStr() string @@ -80,7 +91,7 @@ func (s *Session) ConnStr() string ConnStr returns the current connection string used for the underlying connection -### func \(\*Session\) [IsTemp]() +### func \(\*Session\) [IsTemp]() ```go func (s *Session) IsTemp() bool @@ -89,7 +100,7 @@ func (s *Session) IsTemp() bool IsTemp returns whether the session is temporary. -### func \(\*Session\) [Path]() +### func \(\*Session\) [Path]() ```go func (s *Session) Path() string @@ -98,7 +109,7 @@ func (s *Session) Path() string Path returns the path of the session. -### func \(\*Session\) [Query]() +### func \(\*Session\) [Query]() ```go func (s *Session) Query(queryStr string, outputFormats ...string) (result chdbpurego.ChdbResult, err error) @@ -106,4 +117,13 @@ func (s *Session) Query(queryStr string, outputFormats ...string) (result chdbpu Query calls \`query\_conn\` function with the current connection and a default output format of "CSV" if not provided. + +### func \(\*Session\) [QueryStream]() + +```go +func (s *Session) QueryStream(queryStr string, outputFormats ...string) (result chdbpurego.ChdbStreamResult, err error) +``` + +QueryStream calls \`query\_conn\` function with the current connection and a default output format of "CSV" if not provided. The result is a stream of data that can be read in chunks. This is useful for large datasets that cannot be loaded into memory all at once. + Generated by [gomarkdoc]() diff --git a/chdb/session.go b/chdb/session.go index d396213..def1f3c 100644 --- a/chdb/session.go +++ b/chdb/session.go @@ -59,7 +59,9 @@ func (s *Session) Query(queryStr string, outputFormats ...string) (result chdbpu return s.conn.Query(queryStr, outputFormat) } -// Query calls `query_conn` function with the current connection and a default output format of "CSV" if not provided. +// QueryStream calls `query_conn` function with the current connection and a default output format of "CSV" if not provided. +// The result is a stream of data that can be read in chunks. +// This is useful for large datasets that cannot be loaded into memory all at once. func (s *Session) QueryStream(queryStr string, outputFormats ...string) (result chdbpurego.ChdbStreamResult, err error) { outputFormat := "CSV" // Default value if len(outputFormats) > 0 { diff --git a/lowApi.md b/lowApi.md index 4829877..4a51cdc 100644 --- a/lowApi.md +++ b/lowApi.md @@ -10,12 +10,13 @@ import "github.com/chdb-io/chdb-go/chdb-purego" - [type ChdbConn](<#ChdbConn>) - [func NewConnection\(argc int, argv \[\]string\) \(ChdbConn, error\)](<#NewConnection>) + - [func NewConnectionFromConnString\(conn\_string string\) \(ChdbConn, error\)](<#NewConnectionFromConnString>) - [type ChdbResult](<#ChdbResult>) - - [func RawQuery\(argc int, argv \[\]string\) \(result ChdbResult, err error\)](<#RawQuery>) +- [type ChdbStreamResult](<#ChdbStreamResult>) -## type [ChdbConn]() +## type [ChdbConn]() @@ -23,6 +24,10 @@ import "github.com/chdb-io/chdb-go/chdb-purego" type ChdbConn interface { //Query executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr Query(queryStr string, formatStr string) (result ChdbResult, err error) + // QueryStreaming executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr + // The result is a stream of data that can be read in chunks. + // This is useful for large datasets that cannot be loaded into memory all at once. + QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) //Ready returns a boolean indicating if the connections is successfully established. Ready() bool //Close the connection and free the underlying allocated memory @@ -31,15 +36,35 @@ type ChdbConn interface { ``` -### func [NewConnection]() +### func [NewConnection]() ```go func NewConnection(argc int, argv []string) (ChdbConn, error) ``` -Session will keep the state of query. If path is None, it will create a temporary directory and use it as the database path and the temporary directory will be removed when the session is closed. You can also pass in a path to create a database at that path where will keep your data. +NewConnection is the low level function to create a new connection to the chdb server. using NewConnectionFromConnString is recommended. -You can also use a connection string to pass in the path and other parameters. Examples: +Deprecated: Use NewConnectionFromConnString instead. This function will be removed in a future version. + +Session will keep the state of query. If path is None, it will create a temporary directory and use it as the database path and the temporary directory will be removed when the session is closed. You can also pass in a path to create a database at that path where will keep your data. This is a thin wrapper around the connect\_chdb C API. the argc and argv should be like: + +- argc = 1, argv = \[\]string\{"\-\-path=/tmp/chdb"\} +- argc = 2, argv = \[\]string\{"\-\-path=/tmp/chdb", "\-\-readonly=1"\} + +Important: + +- There can be only one session at a time. If you want to create a new session, you need to close the existing one. +- Creating a new session will close the existing one. +- You need to ensure that the path exists before creating a new session. Or you can use NewConnectionFromConnString. + + +### func [NewConnectionFromConnString]() + +```go +func NewConnectionFromConnString(conn_string string) (ChdbConn, error) +``` + +NewConnectionFromConnString creates a new connection to the chdb server using a connection string. You can use a connection string to pass in the path and other parameters. Examples: - ":memory:" \(for in\-memory database\) - "test.db" \(for relative path\) @@ -67,13 +92,12 @@ Important: - Creating a new session will close the existing one. -## type [ChdbResult]() +## type [ChdbResult]() ```go type ChdbResult interface { - // Raw bytes result buffer, used for reading the result of clickhouse query Buf() []byte // String rapresentation of the the buffer String() string @@ -88,17 +112,28 @@ type ChdbResult interface { // If the query had any error during execution, here you can retrieve the cause. Error() error // Free the query result and all the allocated memory - Free() error + Free() } ``` - -### func [RawQuery]() + +## type [ChdbStreamResult]() + + ```go -func RawQuery(argc int, argv []string) (result ChdbResult, err error) +type ChdbStreamResult interface { + // GetNext returns the next chunk of data from the stream. + // The chunk is a ChdbResult object that can be used to read the data. + // If there are no more chunks, it returns nil. + GetNext() ChdbResult + // Error returns the error message if there was an error during the streaming process. + Error() error + // Cancel cancels the streaming process and frees the underlying memory. + Cancel() + // Free frees the underlying memory and closes the stream. + Free() +} ``` -RawQuery will execute the given clickouse query without using any session. - Generated by [gomarkdoc]() From 7b30026f9922144e17f48c67dff524abc29577a8 Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Thu, 24 Apr 2025 13:31:36 +0000 Subject: [PATCH 3/4] add finalizer for ExecContext --- chdb/driver/driver.go | 11 +++++++++-- chdb/driver/parquet_streaming_test.go | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/chdb/driver/driver.go b/chdb/driver/driver.go index 75ba1f9..e01c320 100644 --- a/chdb/driver/driver.go +++ b/chdb/driver/driver.go @@ -6,6 +6,7 @@ import ( "database/sql" "database/sql/driver" "fmt" + "runtime" "strconv" "strings" @@ -352,10 +353,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name if err != nil { return nil, err } - return &execResult{ + res := &execResult{ err: nil, localRes: result, - }, nil + } + runtime.SetFinalizer(res, func(r *execResult) { + if r.localRes != nil { + r.localRes.Free() + } + }) + return res, nil } func (c *conn) QueryRowContext(ctx context.Context, query string, values []driver.Value) *singleRow { diff --git a/chdb/driver/parquet_streaming_test.go b/chdb/driver/parquet_streaming_test.go index 55d7e35..4b8184f 100644 --- a/chdb/driver/parquet_streaming_test.go +++ b/chdb/driver/parquet_streaming_test.go @@ -111,6 +111,7 @@ func TestDBWithParquetStreamingConnection(t *testing.T) { if db.Ping() != nil { t.Fatalf("ping db fail, err: %s", err) } + rows, err := db.Query("select * from TestDBWithParquetConnectionStreaming;") if err != nil { t.Fatalf("exec create function fail, err: %s", err) From 9931e964c5694ae4059522a65690362c52da20a1 Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Mon, 28 Apr 2025 21:50:00 +0000 Subject: [PATCH 4/4] delete comment --- chdb/driver/parquet_streaming.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/chdb/driver/parquet_streaming.go b/chdb/driver/parquet_streaming.go index f4726ac..9c3420c 100644 --- a/chdb/driver/parquet_streaming.go +++ b/chdb/driver/parquet_streaming.go @@ -17,11 +17,11 @@ type parquetStreamingRows struct { stream chdbpurego.ChdbStreamResult // result from clickhouse curChunk chdbpurego.ChdbResult // current chunk reader *parquet.GenericReader[any] // parquet reader - curRecord parquet.Row // TODO: delete this? - buffer []parquet.Row // record buffer - bufferSize int // amount of records to preload into buffer - bufferIndex int64 // index in the current buffer - curRow int64 // row counter + curRecord parquet.Row + buffer []parquet.Row // record buffer + bufferSize int // amount of records to preload into buffer + bufferIndex int64 // index in the current buffer + curRow int64 // row counter needNewBuffer bool useUnsafeStringReader bool }