Skip to content

Add streaming query #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions chdb-purego/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ func findLibrary() string {
}

var (
queryStable func(argc int, argv []string) *local_result
freeResult func(result *local_result)
queryStableV2 func(argc int, argv []string) *local_result_v2
freeResultV2 func(result *local_result_v2)
connectChdb func(argc int, argv []*byte) **chdb_conn
closeConn func(conn **chdb_conn)
queryConn func(conn *chdb_conn, query string, format string) *local_result_v2
queryStable func(argc int, argv []string) *local_result
freeResult func(result *local_result)
queryStableV2 func(argc int, argv []string) *local_result_v2
freeResultV2 func(result *local_result_v2)
connectChdb func(argc int, argv []*byte) **chdb_conn
closeConn func(conn **chdb_conn)
queryConn func(conn *chdb_conn, query string, format string) *local_result_v2
queryConnStreaming func(conn *chdb_conn, query string, format string) *chdb_streaming_result
streamingResultError func(result *chdb_streaming_result) *string
streamingResultNext func(conn *chdb_conn, result *chdb_streaming_result) *local_result_v2
streamingResultDestroy func(result *chdb_streaming_result)
streamingResultCancel func(conn *chdb_conn, result *chdb_streaming_result)
)

func init() {
Expand All @@ -58,5 +63,10 @@ func init() {
purego.RegisterLibFunc(&connectChdb, libchdb, "connect_chdb")
purego.RegisterLibFunc(&closeConn, libchdb, "close_conn")
purego.RegisterLibFunc(&queryConn, libchdb, "query_conn")
purego.RegisterLibFunc(&queryConnStreaming, libchdb, "query_conn_streaming")
purego.RegisterLibFunc(&streamingResultError, libchdb, "chdb_streaming_result_error")
purego.RegisterLibFunc(&streamingResultNext, libchdb, "chdb_streaming_fetch_result")
purego.RegisterLibFunc(&streamingResultCancel, libchdb, "chdb_streaming_cancel_query")
purego.RegisterLibFunc(&streamingResultDestroy, libchdb, "chdb_destroy_result")

}
28 changes: 28 additions & 0 deletions chdb-purego/chdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type connection struct {
conn **chdb_conn
}

// CancelQuery implements ChdbConn.
func (c *connection) CancelQuery(query ChdbResult) (err error) {
panic("unimplemented")
}

func newChdbConn(conn **chdb_conn) ChdbConn {
c := &connection{
conn: conn,
Expand Down Expand Up @@ -136,6 +141,29 @@ func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult
return newChdbResult(res), nil
}

// QueryStreaming implements ChdbConn.
func (c *connection) QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) {

if c.conn == nil {
return nil, fmt.Errorf("invalid connection")
}

rawConn := *c.conn

res := queryConnStreaming(rawConn, queryStr, formatStr)
if res == nil {
// According to the C ABI of chDB v1.2.0, the C function query_stable_v2
// returns nil if the query returns no data. This is not an error. We
// will change this behavior in the future.
return newStreamingResult(rawConn, res), nil
}
if s := streamingResultError(res); s != nil {
return nil, errors.New(*s)
}

return newStreamingResult(rawConn, res), nil
}

func (c *connection) Ready() bool {
if c.conn != nil {
deref := *c.conn
Expand Down
72 changes: 72 additions & 0 deletions chdb-purego/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package chdbpurego

import "errors"

type streamingResult struct {
curConn *chdb_conn
stream *chdb_streaming_result
curChunk ChdbResult
}

func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStreamResult {

// nextChunk := streamingResultNext(conn, cRes)
// if nextChunk == nil {
// return nil
// }

res := &streamingResult{
curConn: conn,
stream: cRes,
// curChunk: newChdbResult(nextChunk),
}

// runtime.SetFinalizer(res, res.Free)
return res

}

// Error implements ChdbStreamResult.
func (c *streamingResult) Error() error {
if s := streamingResultError(c.stream); s != nil {
return errors.New(*s)
}
return nil
}

// Free implements ChdbStreamResult.
func (c *streamingResult) Free() {
streamingResultCancel(c.curConn, c.stream)
streamingResultDestroy(c.stream)
c.stream = nil
if c.curChunk != nil {
c.curChunk.Free()
c.curChunk = nil
}
}

// Cancel implements ChdbStreamResult.
func (c *streamingResult) Cancel() {
c.Free()
}

// GetNext implements ChdbStreamResult.
func (c *streamingResult) GetNext() ChdbResult {
if c.curChunk == nil {
nextChunk := streamingResultNext(c.curConn, c.stream)
if nextChunk == nil {
return nil
}
c.curChunk = newChdbResult(nextChunk)
return c.curChunk
}
// free the current chunk before getting the next one
c.curChunk.Free()
c.curChunk = nil
nextChunk := streamingResultNext(c.curConn, c.stream)
if nextChunk == nil {
return nil
}
c.curChunk = newChdbResult(nextChunk)
return c.curChunk
}
23 changes: 22 additions & 1 deletion chdb-purego/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type local_result_v2 struct {
error_message *byte
}

// clickhouse streaming result struct. for reference: https://github.com/chdb-io/chdb/blob/main/programs/local/chdb.h#L65
type chdb_streaming_result struct {
internal_data unsafe.Pointer
}

// clickhouse background server connection.for reference: https://github.com/chdb-io/chdb/blob/main/programs/local/chdb.h#L82
type chdb_conn struct {
server unsafe.Pointer
Expand All @@ -32,7 +37,6 @@ type chdb_conn struct {
}

type ChdbResult interface {
// Raw bytes result buffer, used for reading the result of clickhouse query
Buf() []byte
// String rapresentation of the the buffer
String() string
Expand All @@ -50,9 +54,26 @@ type ChdbResult interface {
Free()
}

type ChdbStreamResult interface {
// GetNext returns the next chunk of data from the stream.
// The chunk is a ChdbResult object that can be used to read the data.
// If there are no more chunks, it returns nil.
GetNext() ChdbResult
// Error returns the error message if there was an error during the streaming process.
Error() error
// Cancel cancels the streaming process and frees the underlying memory.
Cancel()
// Free frees the underlying memory and closes the stream.
Free()
}

type ChdbConn interface {
//Query executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr
Query(queryStr string, formatStr string) (result ChdbResult, err error)
// QueryStreaming executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr
// The result is a stream of data that can be read in chunks.
// This is useful for large datasets that cannot be loaded into memory all at once.
QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error)
//Ready returns a boolean indicating if the connections is successfully established.
Ready() bool
//Close the connection and free the underlying allocated memory
Expand Down
81 changes: 53 additions & 28 deletions chdb.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
#pragma once

#ifdef __cplusplus
# include <condition_variable>
# include <cstddef>
# include <cstdint>
# include <mutex>
# include <queue>
# include <string>
extern "C" {
#else
# include <stdbool.h>
Expand Down Expand Up @@ -55,26 +51,6 @@ CHDB_EXPORT void free_result(struct local_result * result);
CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv);
CHDB_EXPORT void free_result_v2(struct local_result_v2 * result);

#ifdef __cplusplus
struct query_request
{
std::string query;
std::string format;
};

struct query_queue
{
std::mutex mutex;
std::condition_variable query_cv; // For query submission
std::condition_variable result_cv; // For query result retrieval
query_request current_query;
local_result_v2 * current_result = nullptr;
bool has_query = false;
bool shutdown = false;
bool cleanup_done = false;
};
#endif

/**
* Connection structure for chDB
* Contains server instance, connection state, and query processing queue
Expand All @@ -86,11 +62,15 @@ struct chdb_conn
void * queue; /* Query processing queue */
};

typedef struct {
void * internal_data;
} chdb_streaming_result;

/**
* Creates a new chDB connection.
* Only one active connection is allowed per process.
* Creating a new connection with different path requires closing existing connection.
*
*
* @param argc Number of command-line arguments
* @param argv Command-line arguments array (--path=<db_path> to specify database location)
* @return Pointer to connection pointer, or NULL on failure
Expand All @@ -101,15 +81,15 @@ CHDB_EXPORT struct chdb_conn ** connect_chdb(int argc, char ** argv);
/**
* Closes an existing chDB connection and cleans up resources.
* Thread-safe function that handles connection shutdown and cleanup.
*
*
* @param conn Pointer to connection pointer to close
*/
CHDB_EXPORT void close_conn(struct chdb_conn ** conn);

/**
* Executes a query on the given connection.
* Thread-safe function that handles query execution in a separate thread.
*
*
* @param conn Connection to execute query on
* @param query SQL query string to execute
* @param format Output format string (e.g., "CSV", default format)
Expand All @@ -118,6 +98,51 @@ CHDB_EXPORT void close_conn(struct chdb_conn ** conn);
*/
CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const char * query, const char * format);

/**
* Executes a streaming query on the given connection.
* @brief Initializes streaming query execution and returns result handle
* @param conn Connection to execute query on
* @param query SQL query string to execute
* @param format Output format string (e.g. "CSV", default format)
* @return Streaming result handle containing query state or error message
* @note Returns error result if connection is invalid or closed
*/
CHDB_EXPORT chdb_streaming_result * query_conn_streaming(struct chdb_conn * conn, const char * query, const char * format);

/**
* Retrieves error message from streaming result.
* @brief Gets error message associated with streaming query execution
* @param result Streaming result handle from query_conn_streaming()
* @return Null-terminated error message string, or NULL if no error occurred
*/
CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result);

/**
* Fetches next chunk of streaming results.
* @brief Iterates through streaming query results
* @param conn Active connection handle
* @param result Streaming result handle from query_conn_streaming()
* @return Materialized result chunk with data
* @note Returns empty result when stream ends
*/
CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_conn * conn, chdb_streaming_result * result);

/**
* Cancels ongoing streaming query.
* @brief Aborts streaming query execution and cleans up resources
* @param conn Active connection handle
* @param result Streaming result handle to cancel
*/
CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_streaming_result * result);

/**
* Releases resources associated with streaming result.
* @brief Destroys streaming result handle and frees allocated memory
* @param result Streaming result handle to destroy
* @warning Must be called even if query was finished or canceled
*/
CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result);

#ifdef __cplusplus
}
#endif
#endif
Loading