diff --git a/taos-ws-py/src/cursor.rs b/taos-ws-py/src/cursor.rs index fe237507..533aad1f 100644 --- a/taos-ws-py/src/cursor.rs +++ b/taos-ws-py/src/cursor.rs @@ -1,6 +1,6 @@ use pyo3::{ prelude::*, - types::{PyDict, PySequence, PyString, PyTuple}, + types::{PyDict, PySequence, PyTuple}, }; use taos::{ sync::{Fetchable, Queryable}, @@ -66,6 +66,56 @@ impl Cursor { } } +fn sql_template_fill( + operation: &str, + py_args: Option<&PyTuple>, + parameters: Option<&PyDict>, +) -> PyResult { + Python::with_gil(|py| { + let local = PyDict::new(py); + local.set_item("t", operation)?; + let args = py_args.unwrap_or_else(|| PyTuple::new(py, [()])); + local.set_item("a", args)?; + if let Some(parameters) = parameters { + local.set_item("p", parameters)?; + if operation.contains("{") { + let sql = py.eval("t.format(*a, **p)", None, Some(local))?; + sql.extract() + } else if operation.contains("%(") { + let sql = py.eval("t % p", None, Some(local))?; + sql.extract() + } else if operation.contains("%") { + let sql = py.eval("t % p", None, Some(local))?; + sql.extract() + } else { + Ok(operation.to_string()) + } + } else if operation.contains("{") { + let sql = py.eval( + "t.format(**a[0]) if isinstance(a[0], dict) else t.format(*a)", + None, + Some(local), + )?; + sql.extract() + } else if operation.contains("%(") { + let sql = py.eval( + "t % a[0] if isinstance(a[0], dict) else t % a", + None, + Some(local), + )?; + sql.extract() + } else if operation.contains("%") { + let sql = py.eval( + "t % (a[0]) if isinstance(a[0], dict) else t % a", + None, + Some(local), + )?; + sql.extract() + } else { + Ok(operation.to_string()) + } + }) +} #[pymethods] impl Cursor { /// This read-only attribute is a sequence of 7-item sequences. @@ -129,31 +179,68 @@ impl Cursor { #[args(py_args = "*", parameters = "**")] pub fn execute( &mut self, - operation: &PyString, + operation: &str, py_args: &PyTuple, parameters: Option<&PyDict>, ) -> PyResult { - let sql = Python::with_gil(|py| { - let sql: String = if let Some(parameters) = parameters { - let local = PyDict::new(py); - local.set_item("parameters", parameters)?; - local.set_item("operation", operation)?; - local.set_item("args", py_args)?; - let sql = py.eval("operation.format(*args, **parameters)", None, Some(local))?; - sql.extract()? - } else { - let local = PyDict::new(py); - local.set_item("operation", operation)?; - local.set_item("args", py_args)?; - let sql = py.eval("operation.format(*args)", None, Some(local))?; - sql.extract()? - }; - Ok::<_, PyErr>(sql) - })?; - let result_set = self - .inner()? - .query(sql) - .map_err(|err| OperationalError::new_err(err.to_string()))?; + let result_set = + sql_template_fill(operation, Some(py_args), parameters).and_then(|sql| { + self.inner()? + .query(sql) + .map_err(|err| OperationalError::new_err(err.to_string())) + })?; + // let sql: &str = if let Some(parameters) = parameters { + // let local = PyDict::new(py); + // local.set_item("p", parameters)?; + // local.set_item("t", operation)?; + // local.set_item("a", py_args)?; + // if operation.contains("{") { + // let sql = py.eval("t.format(*a, **p)", None, Some(local))?; + // dbg!(&sql); + // sql.extract()? + // } else if operation.contains("%(") { + // let sql = py.eval("t % p", None, Some(local))?; + // dbg!(&sql); + // sql.extract()? + // } else if operation.contains("%") { + // let local = PyDict::new(py); + // local.set_item("t", operation)?; + // local.set_item("a", py_args)?; + // local.set_item("p", parameters)?; + // let sql = py.eval("t % p", None, Some(local))?; + // dbg!(&sql); + // sql.extract()? + // } else { + // operation + // } + // } else { + // let local = PyDict::new(py); + // local.set_item("t", operation)?; + // local.set_item("a", py_args)?; + // if operation.contains("{") { + // let sql = py.eval("t.format(**a[0])", None, Some(local))?; + // dbg!(&sql); + // sql.extract()? + // } else if operation.contains("%(") { + // let sql = py.eval("t % a[0]", None, Some(local))?; + // dbg!(&sql); + // sql.extract()? + // } else if operation.contains("%") { + // let local = PyDict::new(py); + // local.set_item("t", operation)?; + // local.set_item("a", py_args)?; + // let sql = py.eval("t % (a[0])", None, Some(local))?; + // dbg!(&sql); + // sql.extract()? + // } else { + // operation + // } + // }; + + // self.inner()? + // .query(sql) + // .map_err(|err| OperationalError::new_err(err.to_string())) + // })?; let affected_rows = result_set.affected_rows(); self.result_set.replace(result_set); self.row_count = affected_rows as _; @@ -163,28 +250,12 @@ impl Cursor { #[args(py_args = "*", parameters = "**")] pub fn execute_with_req_id( &mut self, - operation: &PyString, + operation: &str, py_args: &PyTuple, parameters: Option<&PyDict>, req_id: u64, ) -> PyResult { - let sql = Python::with_gil(|py| { - let sql: String = if let Some(parameters) = parameters { - let local = PyDict::new(py); - local.set_item("parameters", parameters)?; - local.set_item("operation", operation)?; - local.set_item("args", py_args)?; - let sql = py.eval("operation.format(*args, **parameters)", None, Some(local))?; - sql.extract()? - } else { - let local = PyDict::new(py); - local.set_item("operation", operation)?; - local.set_item("args", py_args)?; - let sql = py.eval("operation.format(*args)", None, Some(local))?; - sql.extract()? - }; - Ok::<_, PyErr>(sql) - })?; + let sql = sql_template_fill(operation, Some(py_args), parameters)?; let result_set = self .inner()? .query_with_req_id(sql, req_id) @@ -198,32 +269,22 @@ impl Cursor { #[args(py_args = "*", parameters = "**")] pub fn execute_many( &mut self, - operation: &PyString, + operation: &str, seq_of_parameters: &PySequence, ) -> PyResult { - let sql = Python::with_gil(|py| { - let vec: Vec<_> = seq_of_parameters - .iter()? - .map(|row| -> PyResult { - // let params = row.extract().unwrap(); - let row = row?; - if row.is_instance_of::()? { - let local = PyDict::new(py); - local.set_item("args", row)?; - local.set_item("operation", operation)?; - let sql = py.eval("operation.format(**args)", None, Some(local))?; - sql.extract() - } else { - let local = PyDict::new(py); - local.set_item("args", row)?; - local.set_item("operation", operation)?; - let sql = py.eval("operation.format(*args)", None, Some(local))?; - sql.extract() - } - }) - .try_collect()?; - Ok::<_, PyErr>(vec) - })?; + let sql: Vec<_> = seq_of_parameters + .iter()? + .map(|row| -> PyResult { + let row = row?; + if row.is_instance_of::()? { + sql_template_fill(operation, None, row.downcast::().ok()) + } else if row.is_instance_of::()? { + sql_template_fill(operation, Some(row.downcast::()?), None) + } else { + Ok(operation.to_string()) + } + }) + .try_collect()?; let affected_rows = self .inner()? .exec_many(sql) @@ -235,32 +296,23 @@ impl Cursor { #[args(py_args = "*", parameters = "**")] pub fn execute_many_with_req_id( &mut self, - operation: &PyString, + operation: &str, seq_of_parameters: &PySequence, req_id: u64, ) -> PyResult { - let sql = Python::with_gil(|py| { - let vec: Vec<_> = seq_of_parameters - .iter()? - .map(|row| -> PyResult { - let row = row?; - if row.is_instance_of::()? { - let local = PyDict::new(py); - local.set_item("args", row)?; - local.set_item("operation", operation)?; - let sql = py.eval("operation.format(**args)", None, Some(local))?; - sql.extract() - } else { - let local = PyDict::new(py); - local.set_item("args", row)?; - local.set_item("operation", operation)?; - let sql = py.eval("operation.format(*args)", None, Some(local))?; - sql.extract() - } - }) - .try_collect()?; - Ok::<_, PyErr>(vec) - })?; + let sql: Vec<_> = seq_of_parameters + .iter()? + .map(|row| -> PyResult { + let row = row?; + if row.is_instance_of::()? { + sql_template_fill(operation, None, row.downcast::().ok()) + } else if row.is_instance_of::()? { + sql_template_fill(operation, Some(row.downcast::()?), None) + } else { + Ok(operation.to_string()) + } + }) + .try_collect()?; let affected_rows = sql .into_iter() .map(|sql| { diff --git a/taos-ws-py/src/lib.rs b/taos-ws-py/src/lib.rs index a7438795..07f5c1bf 100644 --- a/taos-ws-py/src/lib.rs +++ b/taos-ws-py/src/lib.rs @@ -146,9 +146,37 @@ impl Connection { } } - pub fn execute(&self, sql: &str) -> PyResult { + #[args(py_args = "*", parameters = "**")] + pub fn execute( + &mut self, + operation: &PyString, + py_args: &PyTuple, + parameters: Option<&PyDict>, + ) -> PyResult { + dbg!(&operation, &py_args, ¶meters); + let sql = Python::with_gil(|py| { + let sql: String = if let Some(parameters) = parameters { + let local = PyDict::new(py); + local.set_item("parameters", parameters)?; + local.set_item("operation", operation)?; + local.set_item("args", py_args)?; + dbg!(&operation); + dbg!(¶meters); + let sql = py.eval("operation.format(*args, **parameters)", None, Some(local))?; + dbg!(sql.extract::()?); + let sql = py.eval("operation % (parameters)", None, Some(local))?; + dbg!(sql.extract()?) + } else { + let local = PyDict::new(py); + local.set_item("operation", operation)?; + local.set_item("args", py_args)?; + let sql = py.eval("operation.format(*args)", None, Some(local))?; + sql.extract()? + }; + Ok::<_, PyErr>(sql) + })?; match self.current_cursor()?.query(sql) { - Ok(rs) => Ok(rs.affected_rows()), + Ok(rs) => Ok(rs.affected_rows() as _), Err(err) => Err(QueryError::new_err(err.to_string())), } } diff --git a/taos/connection.py b/taos/connection.py index 2656ffa4..7dde5fb7 100644 --- a/taos/connection.py +++ b/taos/connection.py @@ -151,7 +151,7 @@ def subscribe(self, restart: bool, topic: str, sql: str, interval: int, return TaosSubscription(sub, callback is not None) def statement(self, sql=None): - # type: (str | None) -> TaosStmt|None + # type: (str | None) -> TaosStmt if self._conn is None: return None stmt = taos_stmt_init(self._conn) diff --git a/taos/cursor.py b/taos/cursor.py index c0dca55e..e273ea12 100644 --- a/taos/cursor.py +++ b/taos/cursor.py @@ -2,6 +2,7 @@ from typing import Optional from taos.cinterface import * +from taos.connection import TaosConnection from taos.error import * from taos.constants import FieldType @@ -27,7 +28,12 @@ class TaosCursor(object): .execute*() produced (for DQL statements like SELECT) or affected """ - def __init__(self, connection=None, decode_binary=True): + def __init__(self, connection: TaosConnection = None, decode_binary=True): + """ + Initialize the cursor with a database connection. + :param connection: The database connection to use. + :param decode_binary: If True, decode binary data to string. + """ self._description = [] self._rowcount = -1 self._connection = None @@ -57,7 +63,9 @@ def _taos_next(self): raise OperationalError("Invalid use of fetch iterator") if self._block_rows <= self._block_iter: - block, self._block_rows = taos_fetch_row(self._result, self._fields, decode_binary=self.decode_binary) + block, self._block_rows = taos_fetch_row( + self._result, self._fields, decode_binary=self.decode_binary + ) if self._block_rows == 0: raise StopIteration self._block = list(map(tuple, zip(*block))) @@ -107,6 +115,7 @@ def close(self): return True def execute(self, operation, params=None, req_id: Optional[int] = None): + # type: (str, dict | tuple | None, Optional[int]) -> int | c_void_p """Prepare and execute a database operation (query or command).""" if not operation: return None @@ -119,8 +128,7 @@ def execute(self, operation, params=None, req_id: Optional[int] = None): stmt = operation if params is not None: - pass - + stmt = stmt % (params) # global querySeqNum # querySeqNum += 1 # localSeqNum = querySeqNum # avoid race condition @@ -157,9 +165,9 @@ def execute_many(self, operation, data_list, req_id: Optional[int] = None): # print(f'execute: {sql.format(**line)}') affected_rows += self.execute(sql.format(**line), req_id=req_id) elif isinstance(line, list): - sql += f' {tuple(line)} ' + sql += f" {tuple(line)} " elif isinstance(line, tuple): - sql += f' {line} ' + sql += f" {line} " if flag: # print(f'execute_many: {sql}') affected_rows += self.execute(sql, req_id=req_id) @@ -232,7 +240,9 @@ def fetchall_row(self): buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: - block, num_of_rows = taos_fetch_row(self._result, self._fields, decode_binary=self.decode_binary) + block, num_of_rows = taos_fetch_row( + self._result, self._fields, decode_binary=self.decode_binary + ) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) @@ -246,12 +256,17 @@ def fetchall_row(self): def fetchall(self): if self._result is None: raise OperationalError("Invalid use of fetchall") - fields = self._fields if self._fields is not None else taos_fetch_fields( - self._result) + fields = ( + self._fields + if self._fields is not None + else taos_fetch_fields(self._result) + ) buffer = [[] for i in range(len(fields))] self._rowcount = 0 while True: - block, num_of_rows = taos_fetch_block(self._result, self._fields, decode_binary=self.decode_binary) + block, num_of_rows = taos_fetch_block( + self._result, self._fields, decode_binary=self.decode_binary + ) errno = taos_errno(self._result) if errno != 0: raise ProgrammingError(taos_errstr(self._result), errno) @@ -294,7 +309,8 @@ def _handle_result(self): self._description = [] for ele in self._fields: self._description.append( - (ele["name"], ele["type"], None, None, None, None, False)) + (ele["name"], ele["type"], None, None, None, None, False) + ) return self._result