Skip to content

Enhance error handling and simplify Result type usage in the library #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ jobs:
- uses: actions/checkout@v3
- name: Fetch library
run: |
wget https://github.com/chdb-io/chdb/releases/latest/download/linux-x86_64-libchdb.tar.gz
#wget https://github.com/chdb-io/chdb/releases/latest/download/linux-x86_64-libchdb.tar.gz
#Fix libchdb version to v2.1.1 until libchdb v3 based chdb-rust refactor
wget https://github.com/chdb-io/chdb/releases/download/v2.1.1/linux-x86_64-libchdb.tar.gz
tar -xzf linux-x86_64-libchdb.tar.gz
sudo mv libchdb.so /usr/lib/libchdb.so
sudo ldconfig
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ thiserror = "1"

[build-dependencies]
bindgen = "0.70.1"

[dev-dependencies]
tempdir = "0.3.7"
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::ffi::NulError;
use std::string::FromUtf8Error;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("An unknown error has occurred")]
Unknown,
#[error("No result")]
NoResult,
#[error("Invalid data: {0}")]
InvalidData(String),
#[error("Invalid path")]
Expand All @@ -15,7 +18,9 @@ pub enum Error {
#[error("Insufficient dir permissions")]
InsufficientPermissions,
#[error("Non UTF-8 sequence: {0}")]
NonUtf8Sequence(String),
NonUtf8Sequence(FromUtf8Error),
#[error("{0}")]
QueryError(String),
}

pub type Result<T, Err = Error> = std::result::Result<T, Err>;
17 changes: 10 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::ffi::{c_char, CString};

use crate::arg::Arg;
use crate::error::Error;
use crate::error::Result;
use crate::query_result::QueryResult;

pub fn execute(query: &str, query_args: Option<&[Arg]>) -> Result<Option<QueryResult>, Error> {
pub fn execute(query: &str, query_args: Option<&[Arg]>) -> Result<QueryResult> {
let mut argv = Vec::with_capacity(query_args.as_ref().map_or(0, |v| v.len()) + 2);
argv.push(arg_clickhouse()?.into_raw());

Expand All @@ -33,26 +34,28 @@ pub fn execute(query: &str, query_args: Option<&[Arg]>) -> Result<Option<QueryRe
call_chdb(argv)
}

fn call_chdb(mut argv: Vec<*mut c_char>) -> Result<Option<QueryResult>, Error> {
fn call_chdb(mut argv: Vec<*mut c_char>) -> Result<QueryResult> {
let argc = argv.len() as i32;
let argv = argv.as_mut_ptr();
let result_ptr = unsafe { bindings::query_stable_v2(argc, argv) };

if result_ptr.is_null() {
return Ok(None);
return Err(Error::NoResult);
}
let result = QueryResult::new(result_ptr);
let result = result.check_error()?;

Ok(Some(QueryResult(result_ptr).check_error()?))
Ok(result)
}

fn arg_clickhouse() -> Result<CString, Error> {
fn arg_clickhouse() -> Result<CString> {
Ok(CString::new("clickhouse")?)
}

fn arg_data_path(value: &str) -> Result<CString, Error> {
fn arg_data_path(value: &str) -> Result<CString> {
Ok(CString::new(format!("--path={}", value))?)
}

fn arg_query(value: &str) -> Result<CString, Error> {
fn arg_query(value: &str) -> Result<CString> {
Ok(CString::new(format!("--query={}", value))?)
}
43 changes: 28 additions & 15 deletions src/query_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ use std::time::Duration;

use crate::bindings;
use crate::error::Error;

use crate::error::Result;
#[derive(Clone)]
pub struct QueryResult(pub(crate) *mut bindings::local_result_v2);
pub struct QueryResult {
inner: *mut bindings::local_result_v2,
}

impl QueryResult {
pub fn data_utf8(&self) -> Result<String, Error> {
String::from_utf8(self.data_ref().to_vec())
.map_err(|e| Error::NonUtf8Sequence(e.to_string()))
pub(crate) fn new(inner: *mut bindings::local_result_v2) -> Self {
Self { inner }
}
pub fn data_utf8(&self) -> Result<String> {
let buf = self.data_ref();

pub fn data_utf8_lossy<'a>(&'a self) -> Cow<'a, str> {
String::from_utf8(buf.to_vec()).map_err(Error::NonUtf8Sequence)
}

pub fn data_utf8_lossy(&self) -> Cow<str> {
String::from_utf8_lossy(self.data_ref())
}

Expand All @@ -24,30 +30,37 @@ impl QueryResult {
}

pub fn data_ref(&self) -> &[u8] {
let buf = unsafe { (*self.0).buf };
let len = unsafe { (*self.0).len };
let inner = self.inner;
let buf = unsafe { (*inner).buf };
let len = unsafe { (*inner).len };
let bytes: &[u8] = unsafe { slice::from_raw_parts(buf as *const u8, len) };
bytes
}

pub fn rows_read(&self) -> u64 {
(unsafe { *self.0 }).rows_read
let inner = self.inner;
unsafe { *inner }.rows_read
}

pub fn bytes_read(&self) -> u64 {
unsafe { (*self.0).bytes_read }
let inner = self.inner;
unsafe { *inner }.bytes_read
}

pub fn elapsed(&self) -> Duration {
let elapsed = unsafe { (*self.0).elapsed };
let elapsed = unsafe { (*self.inner).elapsed };
Duration::from_secs_f64(elapsed)
}

pub(crate) fn check_error(self) -> Result<Self, Error> {
let err_ptr = unsafe { (*self.0).error_message };
pub(crate) fn check_error(self) -> Result<Self> {
self.check_error_ref()?;
Ok(self)
}
pub(crate) fn check_error_ref(&self) -> Result<()> {
let err_ptr = unsafe { (*self.inner).error_message };

if err_ptr.is_null() {
return Ok(self);
return Ok(());
}

Err(Error::QueryError(unsafe {
Expand All @@ -58,6 +71,6 @@ impl QueryResult {

impl Drop for QueryResult {
fn drop(&mut self) {
unsafe { bindings::free_result_v2(self.0) };
unsafe { bindings::free_result_v2(self.inner) };
}
}
6 changes: 1 addition & 5 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ impl<'a> Default for SessionBuilder<'a> {
}

impl Session {
pub fn execute(
&self,
query: &str,
query_args: Option<&[Arg]>,
) -> Result<Option<QueryResult>, Error> {
pub fn execute(&self, query: &str, query_args: Option<&[Arg]>) -> Result<QueryResult, Error> {
let mut argv = Vec::with_capacity(
self.default_args.len() + query_args.as_ref().map_or(0, |v| v.len()) + 1,
);
Expand Down
52 changes: 24 additions & 28 deletions tests/examples.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,65 @@
use chdb_rust::arg::Arg;
use chdb_rust::error::Result;
use chdb_rust::execute;
use chdb_rust::format::InputFormat;
use chdb_rust::format::OutputFormat;
use chdb_rust::log_level::LogLevel;
use chdb_rust::session::SessionBuilder;

#[test]
fn stateful() {
fn test_stateful() -> Result<()> {
//
// Create session.
//

let tmp = tempdir::TempDir::new("chdb-rust")?;
let session = SessionBuilder::new()
.with_data_path("/tmp/chdb")
.with_data_path(tmp.path())
.with_arg(Arg::LogLevel(LogLevel::Debug))
.with_arg(Arg::Custom("priority".into(), Some("1".into())))
.with_auto_cleanup(true)
.build()
.unwrap();
.build()?;

//
// Create database.
//

session
.execute("CREATE DATABASE demo; USE demo", Some(&[Arg::MultiQuery]))
.unwrap();
session.execute("CREATE DATABASE demo; USE demo", Some(&[Arg::MultiQuery]))?;

//
// Create table.
//

session
.execute(
"CREATE TABLE logs (id UInt64, msg String) ENGINE = MergeTree ORDER BY id",
None,
)
.unwrap();
session.execute(
"CREATE TABLE logs (id UInt64, msg String) ENGINE = MergeTree() ORDER BY id",
None,
)?;

//
// Insert into table.
//

session
.execute("INSERT INTO logs (id, msg) VALUES (1, 'test')", None)
.unwrap();
session.execute("INSERT INTO logs (id, msg) VALUES (1, 'test')", None)?;

//
// Select from table.
//
let len = session.execute(
"SELECT COUNT(*) FROM logs",
Some(&[Arg::OutputFormat(OutputFormat::JSONEachRow)]),
)?;

let result = session
.execute(
"SELECT * FROM logs",
Some(&[Arg::OutputFormat(OutputFormat::JSONEachRow)]),
)
.unwrap()
.unwrap();
assert_eq!(len.data_utf8_lossy(), "{\"COUNT()\":1}\n");

let result = session.execute(
"SELECT * FROM logs",
Some(&[Arg::OutputFormat(OutputFormat::JSONEachRow)]),
)?;
assert_eq!(result.data_utf8_lossy(), "{\"id\":1,\"msg\":\"test\"}\n");
Ok(())
}

#[test]
fn stateless() {
fn test_stateless() -> Result<()> {
let query = format!(
"SELECT * FROM file('tests/logs.csv', {})",
InputFormat::CSV.as_str()
Expand All @@ -71,9 +68,8 @@ fn stateless() {
let result = execute(
&query,
Some(&[Arg::OutputFormat(OutputFormat::JSONEachRow)]),
)
.unwrap()
.unwrap();
)?;

assert_eq!(result.data_utf8_lossy(), "{\"id\":1,\"msg\":\"test\"}\n");
Ok(())
}