Skip to content

fix: impl and get affected rowcount from resp data #635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions bindings/python/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub struct BlockingDatabendCursor {
// buffer is used to store only the first row after execute
buffer: Vec<Row>,
schema: Option<SchemaRef>,
rowcount: i64,
}

impl BlockingDatabendCursor {
Expand All @@ -201,6 +202,7 @@ impl BlockingDatabendCursor {
rows: None,
buffer: Vec::new(),
schema: None,
rowcount: -1,
}
}
}
Expand All @@ -210,6 +212,7 @@ impl BlockingDatabendCursor {
self.rows = None;
self.buffer.clear();
self.schema = None;
self.rowcount = -1;
}
}

Expand Down Expand Up @@ -247,10 +250,9 @@ impl BlockingDatabendCursor {
}
}

/// Not supported currently
#[getter]
pub fn rowcount(&self, _py: Python) -> i64 {
-1
self.rowcount
}

pub fn close(&mut self, py: Python) -> PyResult<()> {
Expand All @@ -277,18 +279,40 @@ impl BlockingDatabendCursor {

self.reset();
let conn = self.conn.clone();
// fetch first row after execute
// then we could finish the query directly if there's no result
let params = to_sql_params(params);

// check if it is DML(INSERT, UPDATE, DELETE)
let sql_trimmed = operation.trim_start().to_lowercase();
let is_dml = sql_trimmed.starts_with("insert")
|| sql_trimmed.starts_with("update")
|| sql_trimmed.starts_with("delete")
|| sql_trimmed.starts_with("replace");

if is_dml {
let affected_rows = wait_for_future(py, async move {
conn.exec(&operation, params)
.await
.map_err(DriverError::new)
})?;
self.rowcount = affected_rows;
return Ok(py.None());
}

// for select, use query_iter
let (first, rows) = wait_for_future(py, async move {
let mut rows = conn.query_iter(&operation, params).await?;
let first = rows.next().await.transpose()?;
Ok::<_, databend_driver::Error>((first, rows))
})
.map_err(DriverError::new)?;

if let Some(first) = first {
self.buffer.push(Row::new(first));
self.rowcount = 1;
} else {
self.rowcount = 0;
}

self.rows = Some(Arc::new(Mutex::new(rows)));
self.set_schema(py);
Ok(py.None())
Expand Down Expand Up @@ -375,9 +399,14 @@ impl BlockingDatabendCursor {
for row in fetched {
result.push(Row::new(row.map_err(DriverError::new)?));
}

if self.rowcount == -1 {
self.rowcount = result.len() as i64;
}

Ok(result)
}
None => Ok(vec![]),
None => Ok(result),
}
}

Expand Down
52 changes: 52 additions & 0 deletions core/src/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,58 @@ impl Page {
}
self.stats = p.stats;
}

pub fn affected_rows(&self) -> Result<i64, String> {
if self.schema.is_empty() {
return Ok(0);
}

let first_field = &self.schema[0];

if !first_field.name.contains("number of rows") {
return Ok(0);
}

if self.data.is_empty() || self.data[0].is_empty() {
return Ok(0);
}

match &self.data[0][0] {
Some(value_str) => self.parse_row_count_string(value_str),
None => Ok(0),
}
}

fn parse_row_count_string(&self, value_str: &str) -> Result<i64, String> {
let trimmed = value_str.trim();

if trimmed.is_empty() {
return Ok(0);
}

if let Ok(count) = trimmed.parse::<i64>() {
return Ok(count);
}

if let Ok(count) = serde_json::from_str::<i64>(trimmed) {
return Ok(count);
}

let unquoted = trimmed.trim_matches('"');
if let Ok(count) = unquoted.parse::<i64>() {
return Ok(count);
}

Err(format!(
"failed to parse affected rows from: '{}'",
value_str
))
}

///the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update`
pub fn has_affected_rows(&self) -> bool {
!self.schema.is_empty() && self.schema[0].name.contains("number of rows")
}
}

type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
Expand Down
5 changes: 4 additions & 1 deletion driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ impl IConnection for RestAPIConnection {
async fn exec(&self, sql: &str) -> Result<i64> {
info!("exec: {}", sql);
let page = self.client.query_all(sql).await?;
Ok(page.stats.progresses.write_progress.rows as i64)

let affected_rows = page.affected_rows().map_err(Error::InvalidResponse)?;
Copy link
Member

@everpcpc everpcpc Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

affected_rows and other helper functions could be located not in core but also in this impl. Then we could take advantages of query_iter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can not take the advantage of query_iter when we parse the affected rows, because we need to query_all and use affected rows in the final page info.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could easily handle response in query_iter in driver rather than query_all in core.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could easily handle response in query_iter in driver rather than query_all in core.

That sounds make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we could make use of the value parser in driver.


Ok(affected_rows)
}

async fn kill_query(&self, query_id: &str) -> Result<()> {
Expand Down
Loading