From 3f941b4f724e46e0e4a8e00a094b2d7a14646114 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 11 Jun 2025 15:49:03 +0800 Subject: [PATCH 1/8] fix: impl and get rowcount from resp data --- bindings/python/src/blocking.rs | 39 ++++++++++++++++++++++---- core/src/pages.rs | 49 +++++++++++++++++++++++++++++++++ driver/src/rest_api.rs | 7 ++++- 3 files changed, 89 insertions(+), 6 deletions(-) diff --git a/bindings/python/src/blocking.rs b/bindings/python/src/blocking.rs index dab536e9..ea790e8a 100644 --- a/bindings/python/src/blocking.rs +++ b/bindings/python/src/blocking.rs @@ -192,6 +192,7 @@ pub struct BlockingDatabendCursor { // buffer is used to store only the first row after execute buffer: Vec, schema: Option, + rowcount: i64, } impl BlockingDatabendCursor { @@ -201,6 +202,7 @@ impl BlockingDatabendCursor { rows: None, buffer: Vec::new(), schema: None, + rowcount: -1, } } } @@ -210,6 +212,7 @@ impl BlockingDatabendCursor { self.rows = None; self.buffer.clear(); self.schema = None; + self.rowcount = -1; } } @@ -247,10 +250,9 @@ impl BlockingDatabendCursor { } } - /// Not supported currently #[getter] pub fn rowcount(&self, _py: Python) -> i64 { - -1 + self.rowcount } pub fn close(&mut self, py: Python) -> PyResult<()> { @@ -277,18 +279,40 @@ impl BlockingDatabendCursor { self.reset(); let conn = self.conn.clone(); - // fetch first row after execute - // then we could finish the query directly if there's no result let params = to_sql_params(params); + + // check if it is DML(INSERT, UPDATE, DELETE) + let sql_trimmed = operation.trim_start().to_lowercase(); + let is_dml = sql_trimmed.starts_with("insert") + || sql_trimmed.starts_with("update") + || sql_trimmed.starts_with("delete") + || sql_trimmed.starts_with("replace"); + + if is_dml { + let affected_rows = wait_for_future(py, async move { + conn.exec(&operation, params) + .await + .map_err(DriverError::new) + })?; + self.rowcount = affected_rows; + return Ok(py.None()); + } + + // for select, use query_iter let (first, rows) = wait_for_future(py, async move { let mut rows = conn.query_iter(&operation, params).await?; let first = rows.next().await.transpose()?; Ok::<_, databend_driver::Error>((first, rows)) }) .map_err(DriverError::new)?; + if let Some(first) = first { self.buffer.push(Row::new(first)); + self.rowcount = 1; + } else { + self.rowcount = 0; } + self.rows = Some(Arc::new(Mutex::new(rows))); self.set_schema(py); Ok(py.None()) @@ -375,9 +399,14 @@ impl BlockingDatabendCursor { for row in fetched { result.push(Row::new(row.map_err(DriverError::new)?)); } + + if self.rowcount == -1 { + self.rowcount = result.len() as i64; + } + Ok(result) } - None => Ok(vec![]), + None => Ok(result), } } diff --git a/core/src/pages.rs b/core/src/pages.rs index 074996ad..13cf37b7 100644 --- a/core/src/pages.rs +++ b/core/src/pages.rs @@ -47,6 +47,55 @@ impl Page { } self.stats = p.stats; } + + pub fn affected_rows(&self) -> Result> { + if self.schema.is_empty() { + return Ok(0); + } + + let first_field = &self.schema[0]; + + if !first_field.name.contains("number of rows") { + return Ok(0); + } + + if self.data.is_empty() || self.data[0].is_empty() { + return Ok(0); + } + + match &self.data[0][0] { + Some(value_str) => self.parse_row_count_string(value_str), + None => Ok(0), + } + } + + fn parse_row_count_string(&self, value_str: &str) -> Result> { + let trimmed = value_str.trim(); + + if trimmed.is_empty() { + return Ok(0); + } + + if let Ok(count) = trimmed.parse::() { + return Ok(count); + } + + if let Ok(count) = serde_json::from_str::(trimmed) { + return Ok(count); + } + + let unquoted = trimmed.trim_matches('"'); + if let Ok(count) = unquoted.parse::() { + return Ok(count); + } + + Err(format!("failed to parse affected rows from: '{}'", value_str).into()) + } + + ///the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update` + pub fn has_affected_rows(&self) -> bool { + !self.schema.is_empty() && self.schema[0].name.contains("number of rows") + } } type PageFut = Pin> + Send>>; diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index b6e7a5c3..dfb1bef5 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -65,7 +65,12 @@ impl IConnection for RestAPIConnection { async fn exec(&self, sql: &str) -> Result { info!("exec: {}", sql); let page = self.client.query_all(sql).await?; - Ok(page.stats.progresses.write_progress.rows as i64) + + let affected_rows = page + .affected_rows() + .map_err(|e| anyhow::anyhow!("Failed to parse affected rows: {}", e))?; + + Ok(affected_rows) } async fn kill_query(&self, query_id: &str) -> Result<()> { From 310b51439e7d47f3ba1075e9e91967ff94a7ff35 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 11 Jun 2025 16:11:09 +0800 Subject: [PATCH 2/8] z --- core/src/pages.rs | 9 ++++++--- driver/src/rest_api.rs | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/pages.rs b/core/src/pages.rs index 13cf37b7..7cc8bea7 100644 --- a/core/src/pages.rs +++ b/core/src/pages.rs @@ -48,7 +48,7 @@ impl Page { self.stats = p.stats; } - pub fn affected_rows(&self) -> Result> { + pub fn affected_rows(&self) -> Result { if self.schema.is_empty() { return Ok(0); } @@ -69,7 +69,7 @@ impl Page { } } - fn parse_row_count_string(&self, value_str: &str) -> Result> { + fn parse_row_count_string(&self, value_str: &str) -> Result { let trimmed = value_str.trim(); if trimmed.is_empty() { @@ -89,7 +89,10 @@ impl Page { return Ok(count); } - Err(format!("failed to parse affected rows from: '{}'", value_str).into()) + Err(format!( + "failed to parse affected rows from: '{}'", + value_str + )) } ///the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update` diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index dfb1bef5..ec2a0d6b 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -68,7 +68,7 @@ impl IConnection for RestAPIConnection { let affected_rows = page .affected_rows() - .map_err(|e| anyhow::anyhow!("Failed to parse affected rows: {}", e))?; + .map_err(|e| Error::InvalidResponse(e))?; Ok(affected_rows) } From f88ef88a3570aab0bf86b0baaaca0ee79535d167 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 11 Jun 2025 16:17:42 +0800 Subject: [PATCH 3/8] z --- driver/src/rest_api.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index ec2a0d6b..be3e5f30 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -66,9 +66,7 @@ impl IConnection for RestAPIConnection { info!("exec: {}", sql); let page = self.client.query_all(sql).await?; - let affected_rows = page - .affected_rows() - .map_err(|e| Error::InvalidResponse(e))?; + let affected_rows = page.affected_rows().map_err(Error::InvalidResponse)?; Ok(affected_rows) } From e629133f2e19359fd6cd058cadcbbf86037d1fda Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 14 Jun 2025 16:49:39 +0800 Subject: [PATCH 4/8] impl affect_rows in rest_api.rs and use query_iter --- bindings/python/src/blocking.rs | 3 +- core/src/pages.rs | 52 --------------------------- driver/src/rest_api.rs | 62 ++++++++++++++++++++++++++++++--- 3 files changed, 59 insertions(+), 58 deletions(-) diff --git a/bindings/python/src/blocking.rs b/bindings/python/src/blocking.rs index ea790e8a..efb2951a 100644 --- a/bindings/python/src/blocking.rs +++ b/bindings/python/src/blocking.rs @@ -294,11 +294,12 @@ impl BlockingDatabendCursor { .await .map_err(DriverError::new) })?; + self.rowcount = affected_rows; return Ok(py.None()); } - // for select, use query_iter + // For SELECT, use query_iter as before let (first, rows) = wait_for_future(py, async move { let mut rows = conn.query_iter(&operation, params).await?; let first = rows.next().await.transpose()?; diff --git a/core/src/pages.rs b/core/src/pages.rs index 7cc8bea7..074996ad 100644 --- a/core/src/pages.rs +++ b/core/src/pages.rs @@ -47,58 +47,6 @@ impl Page { } self.stats = p.stats; } - - pub fn affected_rows(&self) -> Result { - if self.schema.is_empty() { - return Ok(0); - } - - let first_field = &self.schema[0]; - - if !first_field.name.contains("number of rows") { - return Ok(0); - } - - if self.data.is_empty() || self.data[0].is_empty() { - return Ok(0); - } - - match &self.data[0][0] { - Some(value_str) => self.parse_row_count_string(value_str), - None => Ok(0), - } - } - - fn parse_row_count_string(&self, value_str: &str) -> Result { - let trimmed = value_str.trim(); - - if trimmed.is_empty() { - return Ok(0); - } - - if let Ok(count) = trimmed.parse::() { - return Ok(count); - } - - if let Ok(count) = serde_json::from_str::(trimmed) { - return Ok(count); - } - - let unquoted = trimmed.trim_matches('"'); - if let Ok(count) = unquoted.parse::() { - return Ok(count); - } - - Err(format!( - "failed to parse affected rows from: '{}'", - value_str - )) - } - - ///the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update` - pub fn has_affected_rows(&self) -> bool { - !self.schema.is_empty() && self.schema[0].name.contains("number of rows") - } } type PageFut = Pin> + Send>>; diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index be3e5f30..f9add7ca 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -24,6 +24,7 @@ use log::info; use tokio::fs::File; use tokio::io::BufReader; use tokio_stream::Stream; +use tokio_stream::StreamExt; use databend_client::APIClient; use databend_client::Pages; @@ -64,11 +65,8 @@ impl IConnection for RestAPIConnection { async fn exec(&self, sql: &str) -> Result { info!("exec: {}", sql); - let page = self.client.query_all(sql).await?; - - let affected_rows = page.affected_rows().map_err(Error::InvalidResponse)?; - - Ok(affected_rows) + // Use the new affected_rows method that internally uses query_iter + self.calculate_affected_rows_from_iter(sql).await } async fn kill_query(&self, query_id: &str) -> Result<()> { @@ -200,6 +198,60 @@ impl<'o> RestAPIConnection { fn default_copy_options() -> BTreeMap<&'o str, &'o str> { vec![("purge", "true")].into_iter().collect() } + + fn parse_row_count_string(value_str: &str) -> Result { + let trimmed = value_str.trim(); + + if trimmed.is_empty() { + return Ok(0); + } + + if let Ok(count) = trimmed.parse::() { + return Ok(count); + } + + if let Ok(count) = serde_json::from_str::(trimmed) { + return Ok(count); + } + + let unquoted = trimmed.trim_matches('"'); + if let Ok(count) = unquoted.parse::() { + return Ok(count); + } + + Err(format!( + "failed to parse affected rows from: '{}'", + value_str + )) + } + + async fn calculate_affected_rows_from_iter(&self, sql: &str) -> Result { + let mut rows = IConnection::query_iter(self, sql).await?; + let mut count = 0i64; + + // Get the first row to check if it has affected rows info + if let Some(first_row) = rows.next().await { + let row = first_row?; + let schema = row.schema(); + + // Check if this is an affected rows response + if !schema.fields().is_empty() && schema.fields()[0].name.contains("number of rows") { + let values = row.values(); + if !values.is_empty() { + let value = &values[0]; + let s: String = value.clone().try_into().map_err(|e| { + Error::InvalidResponse(format!("Failed to convert value to string: {}", e)) + })?; + count = Self::parse_row_count_string(&s).map_err(Error::InvalidResponse)?; + } + } else { + // If it's not affected rows info, count normally + count = -1; + } + } + + Ok(count) + } } pub struct RestAPIRows { From 59a444c65270dd9639dda0de57213113f887a52e Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 14 Jun 2025 17:42:41 +0800 Subject: [PATCH 5/8] Revert "impl affect_rows in rest_api.rs and use query_iter" This reverts commit e629133f2e19359fd6cd058cadcbbf86037d1fda. --- bindings/python/src/blocking.rs | 3 +- core/src/pages.rs | 52 +++++++++++++++++++++++++++ driver/src/rest_api.rs | 62 +++------------------------------ 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/bindings/python/src/blocking.rs b/bindings/python/src/blocking.rs index efb2951a..ea790e8a 100644 --- a/bindings/python/src/blocking.rs +++ b/bindings/python/src/blocking.rs @@ -294,12 +294,11 @@ impl BlockingDatabendCursor { .await .map_err(DriverError::new) })?; - self.rowcount = affected_rows; return Ok(py.None()); } - // For SELECT, use query_iter as before + // for select, use query_iter let (first, rows) = wait_for_future(py, async move { let mut rows = conn.query_iter(&operation, params).await?; let first = rows.next().await.transpose()?; diff --git a/core/src/pages.rs b/core/src/pages.rs index 074996ad..7cc8bea7 100644 --- a/core/src/pages.rs +++ b/core/src/pages.rs @@ -47,6 +47,58 @@ impl Page { } self.stats = p.stats; } + + pub fn affected_rows(&self) -> Result { + if self.schema.is_empty() { + return Ok(0); + } + + let first_field = &self.schema[0]; + + if !first_field.name.contains("number of rows") { + return Ok(0); + } + + if self.data.is_empty() || self.data[0].is_empty() { + return Ok(0); + } + + match &self.data[0][0] { + Some(value_str) => self.parse_row_count_string(value_str), + None => Ok(0), + } + } + + fn parse_row_count_string(&self, value_str: &str) -> Result { + let trimmed = value_str.trim(); + + if trimmed.is_empty() { + return Ok(0); + } + + if let Ok(count) = trimmed.parse::() { + return Ok(count); + } + + if let Ok(count) = serde_json::from_str::(trimmed) { + return Ok(count); + } + + let unquoted = trimmed.trim_matches('"'); + if let Ok(count) = unquoted.parse::() { + return Ok(count); + } + + Err(format!( + "failed to parse affected rows from: '{}'", + value_str + )) + } + + ///the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update` + pub fn has_affected_rows(&self) -> bool { + !self.schema.is_empty() && self.schema[0].name.contains("number of rows") + } } type PageFut = Pin> + Send>>; diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index f9add7ca..be3e5f30 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -24,7 +24,6 @@ use log::info; use tokio::fs::File; use tokio::io::BufReader; use tokio_stream::Stream; -use tokio_stream::StreamExt; use databend_client::APIClient; use databend_client::Pages; @@ -65,8 +64,11 @@ impl IConnection for RestAPIConnection { async fn exec(&self, sql: &str) -> Result { info!("exec: {}", sql); - // Use the new affected_rows method that internally uses query_iter - self.calculate_affected_rows_from_iter(sql).await + let page = self.client.query_all(sql).await?; + + let affected_rows = page.affected_rows().map_err(Error::InvalidResponse)?; + + Ok(affected_rows) } async fn kill_query(&self, query_id: &str) -> Result<()> { @@ -198,60 +200,6 @@ impl<'o> RestAPIConnection { fn default_copy_options() -> BTreeMap<&'o str, &'o str> { vec![("purge", "true")].into_iter().collect() } - - fn parse_row_count_string(value_str: &str) -> Result { - let trimmed = value_str.trim(); - - if trimmed.is_empty() { - return Ok(0); - } - - if let Ok(count) = trimmed.parse::() { - return Ok(count); - } - - if let Ok(count) = serde_json::from_str::(trimmed) { - return Ok(count); - } - - let unquoted = trimmed.trim_matches('"'); - if let Ok(count) = unquoted.parse::() { - return Ok(count); - } - - Err(format!( - "failed to parse affected rows from: '{}'", - value_str - )) - } - - async fn calculate_affected_rows_from_iter(&self, sql: &str) -> Result { - let mut rows = IConnection::query_iter(self, sql).await?; - let mut count = 0i64; - - // Get the first row to check if it has affected rows info - if let Some(first_row) = rows.next().await { - let row = first_row?; - let schema = row.schema(); - - // Check if this is an affected rows response - if !schema.fields().is_empty() && schema.fields()[0].name.contains("number of rows") { - let values = row.values(); - if !values.is_empty() { - let value = &values[0]; - let s: String = value.clone().try_into().map_err(|e| { - Error::InvalidResponse(format!("Failed to convert value to string: {}", e)) - })?; - count = Self::parse_row_count_string(&s).map_err(Error::InvalidResponse)?; - } - } else { - // If it's not affected rows info, count normally - count = -1; - } - } - - Ok(count) - } } pub struct RestAPIRows { From 81e1b143a30ed05526ac60c2a6fa0d8ab6586d98 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 14 Jun 2025 23:07:56 +0800 Subject: [PATCH 6/8] impl it in driver --- core/src/pages.rs | 52 ------------------------------------------ driver/src/rest_api.rs | 50 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 55 deletions(-) diff --git a/core/src/pages.rs b/core/src/pages.rs index 7cc8bea7..074996ad 100644 --- a/core/src/pages.rs +++ b/core/src/pages.rs @@ -47,58 +47,6 @@ impl Page { } self.stats = p.stats; } - - pub fn affected_rows(&self) -> Result { - if self.schema.is_empty() { - return Ok(0); - } - - let first_field = &self.schema[0]; - - if !first_field.name.contains("number of rows") { - return Ok(0); - } - - if self.data.is_empty() || self.data[0].is_empty() { - return Ok(0); - } - - match &self.data[0][0] { - Some(value_str) => self.parse_row_count_string(value_str), - None => Ok(0), - } - } - - fn parse_row_count_string(&self, value_str: &str) -> Result { - let trimmed = value_str.trim(); - - if trimmed.is_empty() { - return Ok(0); - } - - if let Ok(count) = trimmed.parse::() { - return Ok(count); - } - - if let Ok(count) = serde_json::from_str::(trimmed) { - return Ok(count); - } - - let unquoted = trimmed.trim_matches('"'); - if let Ok(count) = unquoted.parse::() { - return Ok(count); - } - - Err(format!( - "failed to parse affected rows from: '{}'", - value_str - )) - } - - ///the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update` - pub fn has_affected_rows(&self) -> bool { - !self.schema.is_empty() && self.schema[0].name.contains("number of rows") - } } type PageFut = Pin> + Send>>; diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index be3e5f30..5a4e6b6d 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -65,9 +65,7 @@ impl IConnection for RestAPIConnection { async fn exec(&self, sql: &str) -> Result { info!("exec: {}", sql); let page = self.client.query_all(sql).await?; - - let affected_rows = page.affected_rows().map_err(Error::InvalidResponse)?; - + let affected_rows = parse_affected_rows_from_page(&page)?; Ok(affected_rows) } @@ -290,3 +288,49 @@ impl FromRowStats for RawRowWithStats { Ok(RawRowWithStats::Row(RawRow::new(rows, row))) } } + +fn parse_affected_rows_from_page(page: &databend_client::Page) -> Result { + if page.schema.is_empty() { + return Ok(0); + } + + let first_field = &page.schema[0]; + if !first_field.name.contains("number of rows") { + return Ok(0); + } + + if page.data.is_empty() || page.data[0].is_empty() { + return Ok(0); + } + + match &page.data[0][0] { + Some(value_str) => parse_row_count_string(value_str).map_err(Error::InvalidResponse), + None => Ok(0), + } +} + +fn parse_row_count_string(value_str: &str) -> Result { + let trimmed = value_str.trim(); + + if trimmed.is_empty() { + return Ok(0); + } + + if let Ok(count) = trimmed.parse::() { + return Ok(count); + } + + if let Ok(count) = serde_json::from_str::(trimmed) { + return Ok(count); + } + + let unquoted = trimmed.trim_matches('"'); + if let Ok(count) = unquoted.parse::() { + return Ok(count); + } + + Err(format!( + "failed to parse affected rows from: '{}'", + value_str + )) +} From 3d4494bc571b30ffdd7470762295fbf262952ee2 Mon Sep 17 00:00:00 2001 From: hantmac Date: Mon, 23 Jun 2025 16:50:44 +0800 Subject: [PATCH 7/8] z --- driver/src/rest_api.rs | 115 ++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 52 deletions(-) diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 5a4e6b6d..030600bf 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -64,9 +64,7 @@ impl IConnection for RestAPIConnection { async fn exec(&self, sql: &str) -> Result { info!("exec: {}", sql); - let page = self.client.query_all(sql).await?; - let affected_rows = parse_affected_rows_from_page(&page)?; - Ok(affected_rows) + self.calculate_affected_rows_from_iter(sql).await } async fn kill_query(&self, query_id: &str) -> Result<()> { @@ -178,13 +176,13 @@ impl IConnection for RestAPIConnection { } } -impl<'o> RestAPIConnection { +impl RestAPIConnection { pub async fn try_create(dsn: &str, name: String) -> Result { let client = APIClient::new(dsn, Some(name)).await?; Ok(Self { client }) } - fn default_file_format_options() -> BTreeMap<&'o str, &'o str> { + fn default_file_format_options() -> BTreeMap<&'static str, &'static str> { vec![ ("type", "CSV"), ("field_delimiter", ","), @@ -195,9 +193,68 @@ impl<'o> RestAPIConnection { .collect() } - fn default_copy_options() -> BTreeMap<&'o str, &'o str> { + fn default_copy_options() -> BTreeMap<&'static str, &'static str> { vec![("purge", "true")].into_iter().collect() } + fn parse_row_count_string(value_str: &str) -> Result { + let trimmed = value_str.trim(); + + if trimmed.is_empty() { + return Ok(0); + } + + if let Ok(count) = trimmed.parse::() { + return Ok(count); + } + + if let Ok(count) = serde_json::from_str::(trimmed) { + return Ok(count); + } + + let unquoted = trimmed.trim_matches('"'); + if let Ok(count) = unquoted.parse::() { + return Ok(count); + } + + Err(format!( + "failed to parse affected rows from: '{}'", + value_str + )) + } + + async fn calculate_affected_rows_from_iter(&self, sql: &str) -> Result { + let mut rows = IConnection::query_iter(self, sql).await?; + let mut count = 0i64; + + use tokio_stream::StreamExt; + // Get the first row to check if it has affected rows info + if let Some(first_row) = rows.next().await { + let row = first_row?; + let schema = row.schema(); + + // Check if this is an affected rows response + if !schema.fields().is_empty() && schema.fields()[0].name.contains("number of rows") { + let values = row.values(); + if !values.is_empty() { + let value = &values[0]; + let s: String = value.clone().try_into().map_err(|e| { + Error::InvalidResponse(format!("Failed to convert value to string: {}", e)) + })?; + count = Self::parse_row_count_string(&s).map_err(Error::InvalidResponse)?; + } + } else { + // If it's not affected rows info, count normally + count = 1; + // Continue counting the rest + while let Some(row_result) = rows.next().await { + row_result?; + count += 1; + } + } + } + + Ok(count) + } } pub struct RestAPIRows { @@ -288,49 +345,3 @@ impl FromRowStats for RawRowWithStats { Ok(RawRowWithStats::Row(RawRow::new(rows, row))) } } - -fn parse_affected_rows_from_page(page: &databend_client::Page) -> Result { - if page.schema.is_empty() { - return Ok(0); - } - - let first_field = &page.schema[0]; - if !first_field.name.contains("number of rows") { - return Ok(0); - } - - if page.data.is_empty() || page.data[0].is_empty() { - return Ok(0); - } - - match &page.data[0][0] { - Some(value_str) => parse_row_count_string(value_str).map_err(Error::InvalidResponse), - None => Ok(0), - } -} - -fn parse_row_count_string(value_str: &str) -> Result { - let trimmed = value_str.trim(); - - if trimmed.is_empty() { - return Ok(0); - } - - if let Ok(count) = trimmed.parse::() { - return Ok(count); - } - - if let Ok(count) = serde_json::from_str::(trimmed) { - return Ok(count); - } - - let unquoted = trimmed.trim_matches('"'); - if let Ok(count) = unquoted.parse::() { - return Ok(count); - } - - Err(format!( - "failed to parse affected rows from: '{}'", - value_str - )) -} From 027fc173e39eba59a132a964c61a6a172aafb618 Mon Sep 17 00:00:00 2001 From: hantmac Date: Fri, 27 Jun 2025 11:39:51 +0800 Subject: [PATCH 8/8] f --- driver/src/rest_api.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 030600bf..87e25e56 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -176,13 +176,13 @@ impl IConnection for RestAPIConnection { } } -impl RestAPIConnection { +impl<'o> RestAPIConnection { pub async fn try_create(dsn: &str, name: String) -> Result { let client = APIClient::new(dsn, Some(name)).await?; Ok(Self { client }) } - fn default_file_format_options() -> BTreeMap<&'static str, &'static str> { + fn default_file_format_options() -> BTreeMap<&'o str, &'o str> { vec![ ("type", "CSV"), ("field_delimiter", ","), @@ -193,7 +193,7 @@ impl RestAPIConnection { .collect() } - fn default_copy_options() -> BTreeMap<&'static str, &'static str> { + fn default_copy_options() -> BTreeMap<&'o str, &'o str> { vec![("purge", "true")].into_iter().collect() } fn parse_row_count_string(value_str: &str) -> Result {