From 45cc9796a52c1edc58260e24c51d88f8bfc84d1d Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 6 Mar 2024 23:27:52 +0800 Subject: [PATCH 01/10] feat: add transaction related method --- driver/src/conn.rs | 4 +++ driver/src/flight_sql.rs | 17 ++++++++- driver/src/rest_api.rs | 21 ++++++++++-- driver/tests/driver/main.rs | 1 + driver/tests/driver/transaction.rs | 55 ++++++++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 driver/tests/driver/transaction.rs diff --git a/driver/src/conn.rs b/driver/src/conn.rs index f7a58e544..016e6914c 100644 --- a/driver/src/conn.rs +++ b/driver/src/conn.rs @@ -190,6 +190,10 @@ pub trait Connection: DynClone + Send + Sync { )) } + async fn begin(&self) -> Result<()>; + async fn commit(&self) -> Result<()>; + async fn rollback(&self) -> Result<()>; + async fn get_files(&self, stage: &str, local_file: &str) -> Result { let mut total_count: usize = 0; let mut total_size: usize = 0; diff --git a/driver/src/flight_sql.rs b/driver/src/flight_sql.rs index db62361cb..7c0e9d9cd 100644 --- a/driver/src/flight_sql.rs +++ b/driver/src/flight_sql.rs @@ -143,6 +143,21 @@ impl Connection for FlightSQLConnection { "STREAM LOAD unavailable for FlightSQL".to_string(), )) } + + async fn begin(&self) -> Result<()> { + self.exec("BEGIN").await.unwrap(); + Ok(()) + } + + async fn commit(&self) -> Result<()> { + self.exec("COMMIT").await.unwrap(); + Ok(()) + } + + async fn rollback(&self) -> Result<()> { + self.exec("ROLLBACK").await.unwrap(); + Ok(()) + } } impl FlightSQLConnection { @@ -273,7 +288,7 @@ impl Args { return Err(Error::BadArgument(format!( "Invalid value for sslmode: {}", v.as_ref() - ))) + ))); } }, "tls_ca_file" => args.tls_ca_file = Some(v.to_string()), diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 4a5084260..ec892d07e 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -187,6 +187,21 @@ impl Connection for RestAPIConnection { let stats = self.load_data(sql, reader, size, None, None).await?; Ok(stats) } + + async fn begin(&self) -> Result<()> { + self.exec("BEGIN").await.unwrap(); + Ok(()) + } + + async fn commit(&self) -> Result<()> { + self.exec("COMMIT").await.unwrap(); + Ok(()) + } + + async fn rollback(&self) -> Result<()> { + self.exec("ROLLBACK").await.unwrap(); + Ok(()) + } } impl<'o> RestAPIConnection { @@ -219,8 +234,8 @@ impl<'o> RestAPIConnection { ("record_delimiter", "\n"), ("skip_header", "0"), ] - .into_iter() - .collect() + .into_iter() + .collect() } fn default_copy_options() -> BTreeMap<&'o str, &'o str> { @@ -228,7 +243,7 @@ impl<'o> RestAPIConnection { } } -type PageFut = Pin> + Send>>; +type PageFut = Pin> + Send>>; pub struct RestAPIRows { client: APIClient, diff --git a/driver/tests/driver/main.rs b/driver/tests/driver/main.rs index e9b6b9617..e834b5916 100644 --- a/driver/tests/driver/main.rs +++ b/driver/tests/driver/main.rs @@ -19,3 +19,4 @@ mod load; mod select_iter; mod select_simple; mod session; +mod transaction; diff --git a/driver/tests/driver/transaction.rs b/driver/tests/driver/transaction.rs new file mode 100644 index 000000000..af1ea77c6 --- /dev/null +++ b/driver/tests/driver/transaction.rs @@ -0,0 +1,55 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_driver::Client; + +use crate::common::DEFAULT_DSN; + + +#[tokio::test] +async fn test_commit() { + let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN); + let client = Client::new(dsn.to_string()); + let conn = client.get_conn().await.unwrap(); + + conn.exec("CREATE OR REPLACE TABLE t(c int);").await.unwrap(); + conn.begin().await.unwrap(); + conn.exec("INSERT INTO t VALUES(1);").await.unwrap(); + let row = conn.query_row("SELECT * FROM t").await.unwrap(); + let row = row.unwrap(); + let (val, ): (i32, ) = row.try_into().unwrap(); + assert_eq!(val, 1); + conn.commit().await.unwrap(); +} + +#[tokio::test] +async fn test_rollback() { + let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN); + let client = Client::new(dsn.to_string()); + let conn = client.get_conn().await.unwrap(); + + conn.exec("CREATE OR REPLACE TABLE t(c int);").await.unwrap(); + conn.begin().await.unwrap(); + conn.exec("INSERT INTO t VALUES(1);").await.unwrap(); + let row = conn.query_row("SELECT * FROM t").await.unwrap(); + let row = row.unwrap(); + let (val, ): (i32, ) = row.try_into().unwrap(); + assert_eq!(val, 1); + conn.rollback().await.unwrap(); + let row = conn.query_row("SELECT * FROM t").await.unwrap(); + let row = row.unwrap(); + let (val, ): (Option, ) = row.try_into().unwrap(); + assert_eq!(val, None) +} + From 8a5f7f5225492b5af3615972516edfc89b8e5aa8 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 6 Mar 2024 23:31:06 +0800 Subject: [PATCH 02/10] fmt --- driver/src/rest_api.rs | 6 +++--- driver/tests/driver/transaction.rs | 16 +++++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index ec892d07e..9895abfea 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -234,8 +234,8 @@ impl<'o> RestAPIConnection { ("record_delimiter", "\n"), ("skip_header", "0"), ] - .into_iter() - .collect() + .into_iter() + .collect() } fn default_copy_options() -> BTreeMap<&'o str, &'o str> { @@ -243,7 +243,7 @@ impl<'o> RestAPIConnection { } } -type PageFut = Pin> + Send>>; +type PageFut = Pin> + Send>>; pub struct RestAPIRows { client: APIClient, diff --git a/driver/tests/driver/transaction.rs b/driver/tests/driver/transaction.rs index af1ea77c6..f5633029e 100644 --- a/driver/tests/driver/transaction.rs +++ b/driver/tests/driver/transaction.rs @@ -16,19 +16,20 @@ use databend_driver::Client; use crate::common::DEFAULT_DSN; - #[tokio::test] async fn test_commit() { let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN); let client = Client::new(dsn.to_string()); let conn = client.get_conn().await.unwrap(); - conn.exec("CREATE OR REPLACE TABLE t(c int);").await.unwrap(); + conn.exec("CREATE OR REPLACE TABLE t(c int);") + .await + .unwrap(); conn.begin().await.unwrap(); conn.exec("INSERT INTO t VALUES(1);").await.unwrap(); let row = conn.query_row("SELECT * FROM t").await.unwrap(); let row = row.unwrap(); - let (val, ): (i32, ) = row.try_into().unwrap(); + let (val,): (i32,) = row.try_into().unwrap(); assert_eq!(val, 1); conn.commit().await.unwrap(); } @@ -39,17 +40,18 @@ async fn test_rollback() { let client = Client::new(dsn.to_string()); let conn = client.get_conn().await.unwrap(); - conn.exec("CREATE OR REPLACE TABLE t(c int);").await.unwrap(); + conn.exec("CREATE OR REPLACE TABLE t(c int);") + .await + .unwrap(); conn.begin().await.unwrap(); conn.exec("INSERT INTO t VALUES(1);").await.unwrap(); let row = conn.query_row("SELECT * FROM t").await.unwrap(); let row = row.unwrap(); - let (val, ): (i32, ) = row.try_into().unwrap(); + let (val,): (i32,) = row.try_into().unwrap(); assert_eq!(val, 1); conn.rollback().await.unwrap(); let row = conn.query_row("SELECT * FROM t").await.unwrap(); let row = row.unwrap(); - let (val, ): (Option, ) = row.try_into().unwrap(); + let (val,): (Option,) = row.try_into().unwrap(); assert_eq!(val, None) } - From 966309d67108e4ba3dd071a0a94af6aeb1dafec7 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 12 Mar 2024 10:35:56 +0800 Subject: [PATCH 03/10] fix test --- driver/tests/driver/transaction.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/driver/tests/driver/transaction.rs b/driver/tests/driver/transaction.rs index f5633029e..dd4838fa1 100644 --- a/driver/tests/driver/transaction.rs +++ b/driver/tests/driver/transaction.rs @@ -32,6 +32,9 @@ async fn test_commit() { let (val,): (i32,) = row.try_into().unwrap(); assert_eq!(val, 1); conn.commit().await.unwrap(); + let row = conn.query_row("select 1").await.unwrap(); + let row = row.unwrap(); + println!("{:?}",row); } #[tokio::test] @@ -40,6 +43,7 @@ async fn test_rollback() { let client = Client::new(dsn.to_string()); let conn = client.get_conn().await.unwrap(); + conn.exec("CREATE OR REPLACE TABLE t(c int);") .await .unwrap(); @@ -49,9 +53,6 @@ async fn test_rollback() { let row = row.unwrap(); let (val,): (i32,) = row.try_into().unwrap(); assert_eq!(val, 1); + conn.rollback().await.unwrap(); - let row = conn.query_row("SELECT * FROM t").await.unwrap(); - let row = row.unwrap(); - let (val,): (Option,) = row.try_into().unwrap(); - assert_eq!(val, None) } From 92adc366b8645249dfce810129f0655dc60b8faf Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 12 Mar 2024 10:38:04 +0800 Subject: [PATCH 04/10] fix check --- driver/tests/driver/transaction.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/driver/tests/driver/transaction.rs b/driver/tests/driver/transaction.rs index dd4838fa1..b971e1b10 100644 --- a/driver/tests/driver/transaction.rs +++ b/driver/tests/driver/transaction.rs @@ -32,9 +32,9 @@ async fn test_commit() { let (val,): (i32,) = row.try_into().unwrap(); assert_eq!(val, 1); conn.commit().await.unwrap(); - let row = conn.query_row("select 1").await.unwrap(); + let row = conn.query_row("select 1").await.unwrap(); let row = row.unwrap(); - println!("{:?}",row); + println!("{:?}", row); } #[tokio::test] @@ -43,7 +43,6 @@ async fn test_rollback() { let client = Client::new(dsn.to_string()); let conn = client.get_conn().await.unwrap(); - conn.exec("CREATE OR REPLACE TABLE t(c int);") .await .unwrap(); From 17da5f4b8a59e12d38187dbff72a25a9d79e3312 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 12 Mar 2024 10:47:20 +0800 Subject: [PATCH 05/10] fix check --- sql/src/value.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/src/value.rs b/sql/src/value.rs index 741705d12..efa71d0c0 100644 --- a/sql/src/value.rs +++ b/sql/src/value.rs @@ -138,6 +138,7 @@ impl Value { impl TryFrom<(&DataType, &str)> for Value { type Error = Error; + #[allow(deprecated)] fn try_from((t, v): (&DataType, &str)) -> Result { match t { DataType::Null => Ok(Self::Null), From 67d52bf5144543e27faf6cf672c5fd564d69694f Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 12 Mar 2024 10:54:11 +0800 Subject: [PATCH 06/10] fix --- sql/src/value.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/src/value.rs b/sql/src/value.rs index efa71d0c0..89629cb80 100644 --- a/sql/src/value.rs +++ b/sql/src/value.rs @@ -521,6 +521,7 @@ impl_try_from_number_value!(f64); impl TryFrom for NaiveDateTime { type Error = Error; + #[allow(deprecated)] fn try_from(val: Value) -> Result { match val { Value::Timestamp(i) => { @@ -616,6 +617,7 @@ impl std::fmt::Display for Value { } // Compatible with Databend, inner values of nested types are quoted. +#[allow(deprecated)] fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result { match val { Value::Null => write!(f, "NULL"), From 60a1b79e646aebc96acb1dd5f8d5b1e2deabd4bf Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 19 Mar 2024 14:46:39 +0800 Subject: [PATCH 07/10] fix --- driver/src/conn.rs | 17 ++++++++++++++--- driver/src/flight_sql.rs | 15 --------------- driver/src/rest_api.rs | 15 --------------- sql/src/value.rs | 3 --- 4 files changed, 14 insertions(+), 36 deletions(-) diff --git a/driver/src/conn.rs b/driver/src/conn.rs index 016e6914c..641e9e75e 100644 --- a/driver/src/conn.rs +++ b/driver/src/conn.rs @@ -190,9 +190,20 @@ pub trait Connection: DynClone + Send + Sync { )) } - async fn begin(&self) -> Result<()>; - async fn commit(&self) -> Result<()>; - async fn rollback(&self) -> Result<()>; + async fn begin(&self) -> Result<()> { + let _ = self.exec("BEGIN").await; + Ok(()) + } + + async fn commit(&self) -> Result<()> { + let _ = self.exec("COMMIT").await; + Ok(()) + } + + async fn rollback(&self) -> Result<()> { + let _ = self.exec("ROLLBACK").await; + Ok(()) + } async fn get_files(&self, stage: &str, local_file: &str) -> Result { let mut total_count: usize = 0; diff --git a/driver/src/flight_sql.rs b/driver/src/flight_sql.rs index 7c0e9d9cd..48a34251e 100644 --- a/driver/src/flight_sql.rs +++ b/driver/src/flight_sql.rs @@ -143,21 +143,6 @@ impl Connection for FlightSQLConnection { "STREAM LOAD unavailable for FlightSQL".to_string(), )) } - - async fn begin(&self) -> Result<()> { - self.exec("BEGIN").await.unwrap(); - Ok(()) - } - - async fn commit(&self) -> Result<()> { - self.exec("COMMIT").await.unwrap(); - Ok(()) - } - - async fn rollback(&self) -> Result<()> { - self.exec("ROLLBACK").await.unwrap(); - Ok(()) - } } impl FlightSQLConnection { diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 639e0b3a1..8b0f23d50 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -187,21 +187,6 @@ impl Connection for RestAPIConnection { let stats = self.load_data(sql, reader, size, None, None).await?; Ok(stats) } - - async fn begin(&self) -> Result<()> { - self.exec("BEGIN").await.unwrap(); - Ok(()) - } - - async fn commit(&self) -> Result<()> { - self.exec("COMMIT").await.unwrap(); - Ok(()) - } - - async fn rollback(&self) -> Result<()> { - self.exec("ROLLBACK").await.unwrap(); - Ok(()) - } } impl<'o> RestAPIConnection { diff --git a/sql/src/value.rs b/sql/src/value.rs index f7d00ac2d..7f8ea27e5 100644 --- a/sql/src/value.rs +++ b/sql/src/value.rs @@ -138,7 +138,6 @@ impl Value { impl TryFrom<(&DataType, &str)> for Value { type Error = Error; - #[allow(deprecated)] fn try_from((t, v): (&DataType, &str)) -> Result { match t { DataType::Null => Ok(Self::Null), @@ -522,7 +521,6 @@ impl_try_from_number_value!(f64); impl TryFrom for NaiveDateTime { type Error = Error; - #[allow(deprecated)] fn try_from(val: Value) -> Result { match val { Value::Timestamp(i) => { @@ -618,7 +616,6 @@ impl std::fmt::Display for Value { } // Compatible with Databend, inner values of nested types are quoted. -#[allow(deprecated)] fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result { match val { Value::Null => write!(f, "NULL"), From 60329f68ef32df3663889cb9eacb36fe3076f476 Mon Sep 17 00:00:00 2001 From: hantmac Date: Thu, 21 Mar 2024 11:08:51 +0800 Subject: [PATCH 08/10] fix handle error --- driver/src/conn.rs | 6 +++--- driver/tests/driver/common/mod.rs | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/driver/src/conn.rs b/driver/src/conn.rs index 641e9e75e..41df3e338 100644 --- a/driver/src/conn.rs +++ b/driver/src/conn.rs @@ -191,17 +191,17 @@ pub trait Connection: DynClone + Send + Sync { } async fn begin(&self) -> Result<()> { - let _ = self.exec("BEGIN").await; + self.exec("BEGIN").await?; Ok(()) } async fn commit(&self) -> Result<()> { - let _ = self.exec("COMMIT").await; + self.exec("COMMIT").await?; Ok(()) } async fn rollback(&self) -> Result<()> { - let _ = self.exec("ROLLBACK").await; + self.exec("ROLLBACK").await?; Ok(()) } diff --git a/driver/tests/driver/common/mod.rs b/driver/tests/driver/common/mod.rs index 3e236be46..e7e066b06 100644 --- a/driver/tests/driver/common/mod.rs +++ b/driver/tests/driver/common/mod.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub static DEFAULT_DSN: &str = "databend://root:@localhost:8000/default?sslmode=disable"; +pub static DEFAULT_DSN: &str = + "databend://databend:databend@localhost:8000/default?sslmode=disable"; From ffcab1940e3a558077b691db517c2f292c544e6f Mon Sep 17 00:00:00 2001 From: hantmac Date: Thu, 21 Mar 2024 11:18:26 +0800 Subject: [PATCH 09/10] txn test --- driver/tests/driver/transaction.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/driver/tests/driver/transaction.rs b/driver/tests/driver/transaction.rs index b971e1b10..1fc7bdaf1 100644 --- a/driver/tests/driver/transaction.rs +++ b/driver/tests/driver/transaction.rs @@ -32,9 +32,10 @@ async fn test_commit() { let (val,): (i32,) = row.try_into().unwrap(); assert_eq!(val, 1); conn.commit().await.unwrap(); - let row = conn.query_row("select 1").await.unwrap(); + let row = conn.query_row("SELECT * FROM t").await.unwrap(); let row = row.unwrap(); - println!("{:?}", row); + let (val,): (i32,) = row.try_into().unwrap(); + assert_eq!(val, 1); } #[tokio::test] @@ -54,4 +55,6 @@ async fn test_rollback() { assert_eq!(val, 1); conn.rollback().await.unwrap(); + let row = conn.query_row("SELECT * FROM t").await.unwrap(); + assert!(row.is_none()); } From a3df040aa72bc316fcd45eefb32e7f886dc021ad Mon Sep 17 00:00:00 2001 From: hantmac Date: Thu, 21 Mar 2024 14:34:06 +0800 Subject: [PATCH 10/10] for debug --- driver/tests/driver/transaction.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/driver/tests/driver/transaction.rs b/driver/tests/driver/transaction.rs index 1fc7bdaf1..8dac4613e 100644 --- a/driver/tests/driver/transaction.rs +++ b/driver/tests/driver/transaction.rs @@ -55,6 +55,9 @@ async fn test_rollback() { assert_eq!(val, 1); conn.rollback().await.unwrap(); + + let client = Client::new(dsn.to_string()); + let conn = client.get_conn().await.unwrap(); let row = conn.query_row("SELECT * FROM t").await.unwrap(); assert!(row.is_none()); }