Skip to content

Commit 642b60f

Browse files
marvin-hansenjovezhongzliang-min
authored
Resolved issue #6: Stream query in release mode. (#7)
* Updated Readme. Signed-off-by: Marvin Hansen <marvin.hansen@gmail.com> * Enabled relase-plz Signed-off-by: Marvin Hansen <marvin.hansen@gmail.com> * Added client without compression to resolve issue with handling streaming data in release mode. Added new method fetch_stream to ,well, fetch data from a continuous query of a stream. Signed-off-by: Marvin Hansen <marvin.hansen@gmail.com> * Fixed Docstring test in fetch_stream. Signed-off-by: Marvin Hansen <marvin.hansen@gmail.com> * Update src/lib/stream.rs Co-authored-by: Gimi Liang <gimi.liang@timeplus.io> * Update src/lib/mod.rs Co-authored-by: Gimi Liang <gimi.liang@timeplus.io> --------- Signed-off-by: Marvin Hansen <marvin.hansen@gmail.com> Co-authored-by: Jove Zhong <jove@timeplus.io> Co-authored-by: Gimi Liang <gimi.liang@timeplus.io>
1 parent e3289ed commit 642b60f

File tree

2 files changed

+64
-2
lines changed

2 files changed

+64
-2
lines changed

src/lib/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@ mod getters;
77
mod insert;
88
pub mod prelude;
99
mod query;
10+
mod stream;
1011

11-
use clickhouse::Client;
12+
use clickhouse::{Client, Compression};
1213

1314
#[derive(Clone)]
1415
pub struct ProtonClient {
1516
client: Client,
17+
// A separate client without compression is currently necessary to enable
18+
// streaming queries in release builds. See issue #6 on Github:
19+
// https://github.com/timeplus-io/proton-rust-client/issues/6
20+
client_without_compression: Client,
1621
url: String,
1722
}
1823

@@ -36,7 +41,13 @@ impl ProtonClient {
3641
/// ```
3742
pub fn new(url: &str) -> Self {
3843
let client = Client::default().with_url(url.to_string());
44+
let client_without_compression =
45+
Client::with_compression(client.clone(), Compression::None);
3946
let url = url.to_string();
40-
Self { client, url }
47+
Self {
48+
client,
49+
client_without_compression,
50+
url,
51+
}
4152
}
4253
}

src/lib/stream.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use crate::prelude::{ProtonClient, ProtonClientError, Result};
2+
use clickhouse::query::RowCursor;
3+
use clickhouse::Row;
4+
use serde::Deserialize;
5+
6+
impl ProtonClient {
7+
///
8+
/// Executes a streaming query, returning a [`RowCursor`] to obtain results
9+
/// as they become available from the stream. The key difference compared to fetch is that,
10+
/// for streaming query, the returned result is a unbounded stream. Also,
11+
/// a fetch_stream query will keep running continuously returning fresh data
12+
/// until the application terminates..
13+
///
14+
/// # Example
15+
///
16+
/// ```no_run
17+
/// use proton_client::ProtonClient;
18+
/// use proton_client::prelude::Result;
19+
///
20+
/// async fn example() -> Result<()> {
21+
///
22+
/// #[derive(Debug, clickhouse::Row, serde::Deserialize)]
23+
/// struct MyRow {
24+
/// no: u32,
25+
/// name: String,
26+
/// }
27+
///
28+
/// let client = ProtonClient::new("http://localhost:3218");
29+
///
30+
/// let mut cursor = client
31+
/// .fetch_stream::<MyRow>("SELECT ?fields from (test_stream) WHERE no BETWEEN 500 AND 504")
32+
/// .await
33+
/// .expect("[main/fetch]: Failed to fetch stream data");
34+
///
35+
/// while let Some(MyRow { name, no }) = cursor.next().await.expect("[main/fetch]: Failed to fetch data") {
36+
/// println!("{name}: {no}");
37+
/// }
38+
/// # Ok(()) }
39+
/// ```
40+
pub async fn fetch_stream<T>(&self, query: &str) -> Result<RowCursor<T>>
41+
where
42+
T: Row + for<'b> Deserialize<'b>,
43+
{
44+
// Here we use the client without compression. For details, see:
45+
// https://github.com/timeplus-io/proton-rust-client/issues/6
46+
match self.client_without_compression.query(query).fetch::<T>() {
47+
Ok(cursor) => Ok(cursor),
48+
Err(e) => Err(ProtonClientError::FetchFailed(e.to_string())),
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)