Skip to content

feat: support parameterized execute for sqlalchemy #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 140 additions & 88 deletions taos-ws-py/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use pyo3::{
prelude::*,
types::{PyDict, PySequence, PyString, PyTuple},
types::{PyDict, PySequence, PyTuple},
};
use taos::{
sync::{Fetchable, Queryable},
Expand Down Expand Up @@ -66,6 +66,56 @@ impl Cursor {
}
}

fn sql_template_fill(
operation: &str,
py_args: Option<&PyTuple>,
parameters: Option<&PyDict>,
) -> PyResult<String> {
Python::with_gil(|py| {
let local = PyDict::new(py);
local.set_item("t", operation)?;
let args = py_args.unwrap_or_else(|| PyTuple::new(py, [()]));
local.set_item("a", args)?;
if let Some(parameters) = parameters {
local.set_item("p", parameters)?;
if operation.contains("{") {
let sql = py.eval("t.format(*a, **p)", None, Some(local))?;
sql.extract()
} else if operation.contains("%(") {
let sql = py.eval("t % p", None, Some(local))?;
sql.extract()
} else if operation.contains("%") {
let sql = py.eval("t % p", None, Some(local))?;
sql.extract()
} else {
Ok(operation.to_string())
}
} else if operation.contains("{") {
let sql = py.eval(
"t.format(**a[0]) if isinstance(a[0], dict) else t.format(*a)",
None,
Some(local),
)?;
sql.extract()
} else if operation.contains("%(") {
let sql = py.eval(
"t % a[0] if isinstance(a[0], dict) else t % a",
None,
Some(local),
)?;
sql.extract()
} else if operation.contains("%") {
let sql = py.eval(
"t % (a[0]) if isinstance(a[0], dict) else t % a",
None,
Some(local),
)?;
sql.extract()
} else {
Ok(operation.to_string())
}
})
}
#[pymethods]
impl Cursor {
/// This read-only attribute is a sequence of 7-item sequences.
Expand Down Expand Up @@ -129,31 +179,68 @@ impl Cursor {
#[args(py_args = "*", parameters = "**")]
pub fn execute(
&mut self,
operation: &PyString,
operation: &str,
py_args: &PyTuple,
parameters: Option<&PyDict>,
) -> PyResult<usize> {
let sql = Python::with_gil(|py| {
let sql: String = if let Some(parameters) = parameters {
let local = PyDict::new(py);
local.set_item("parameters", parameters)?;
local.set_item("operation", operation)?;
local.set_item("args", py_args)?;
let sql = py.eval("operation.format(*args, **parameters)", None, Some(local))?;
sql.extract()?
} else {
let local = PyDict::new(py);
local.set_item("operation", operation)?;
local.set_item("args", py_args)?;
let sql = py.eval("operation.format(*args)", None, Some(local))?;
sql.extract()?
};
Ok::<_, PyErr>(sql)
})?;
let result_set = self
.inner()?
.query(sql)
.map_err(|err| OperationalError::new_err(err.to_string()))?;
let result_set =
sql_template_fill(operation, Some(py_args), parameters).and_then(|sql| {
self.inner()?
.query(sql)
.map_err(|err| OperationalError::new_err(err.to_string()))
})?;
// let sql: &str = if let Some(parameters) = parameters {
// let local = PyDict::new(py);
// local.set_item("p", parameters)?;
// local.set_item("t", operation)?;
// local.set_item("a", py_args)?;
// if operation.contains("{") {
// let sql = py.eval("t.format(*a, **p)", None, Some(local))?;
// dbg!(&sql);
// sql.extract()?
// } else if operation.contains("%(") {
// let sql = py.eval("t % p", None, Some(local))?;
// dbg!(&sql);
// sql.extract()?
// } else if operation.contains("%") {
// let local = PyDict::new(py);
// local.set_item("t", operation)?;
// local.set_item("a", py_args)?;
// local.set_item("p", parameters)?;
// let sql = py.eval("t % p", None, Some(local))?;
// dbg!(&sql);
// sql.extract()?
// } else {
// operation
// }
// } else {
// let local = PyDict::new(py);
// local.set_item("t", operation)?;
// local.set_item("a", py_args)?;
// if operation.contains("{") {
// let sql = py.eval("t.format(**a[0])", None, Some(local))?;
// dbg!(&sql);
// sql.extract()?
// } else if operation.contains("%(") {
// let sql = py.eval("t % a[0]", None, Some(local))?;
// dbg!(&sql);
// sql.extract()?
// } else if operation.contains("%") {
// let local = PyDict::new(py);
// local.set_item("t", operation)?;
// local.set_item("a", py_args)?;
// let sql = py.eval("t % (a[0])", None, Some(local))?;
// dbg!(&sql);
// sql.extract()?
// } else {
// operation
// }
// };

// self.inner()?
// .query(sql)
// .map_err(|err| OperationalError::new_err(err.to_string()))
// })?;
let affected_rows = result_set.affected_rows();
self.result_set.replace(result_set);
self.row_count = affected_rows as _;
Expand All @@ -163,28 +250,12 @@ impl Cursor {
#[args(py_args = "*", parameters = "**")]
pub fn execute_with_req_id(
&mut self,
operation: &PyString,
operation: &str,
py_args: &PyTuple,
parameters: Option<&PyDict>,
req_id: u64,
) -> PyResult<usize> {
let sql = Python::with_gil(|py| {
let sql: String = if let Some(parameters) = parameters {
let local = PyDict::new(py);
local.set_item("parameters", parameters)?;
local.set_item("operation", operation)?;
local.set_item("args", py_args)?;
let sql = py.eval("operation.format(*args, **parameters)", None, Some(local))?;
sql.extract()?
} else {
let local = PyDict::new(py);
local.set_item("operation", operation)?;
local.set_item("args", py_args)?;
let sql = py.eval("operation.format(*args)", None, Some(local))?;
sql.extract()?
};
Ok::<_, PyErr>(sql)
})?;
let sql = sql_template_fill(operation, Some(py_args), parameters)?;
let result_set = self
.inner()?
.query_with_req_id(sql, req_id)
Expand All @@ -198,32 +269,22 @@ impl Cursor {
#[args(py_args = "*", parameters = "**")]
pub fn execute_many(
&mut self,
operation: &PyString,
operation: &str,
seq_of_parameters: &PySequence,
) -> PyResult<usize> {
let sql = Python::with_gil(|py| {
let vec: Vec<_> = seq_of_parameters
.iter()?
.map(|row| -> PyResult<String> {
// let params = row.extract().unwrap();
let row = row?;
if row.is_instance_of::<PyDict>()? {
let local = PyDict::new(py);
local.set_item("args", row)?;
local.set_item("operation", operation)?;
let sql = py.eval("operation.format(**args)", None, Some(local))?;
sql.extract()
} else {
let local = PyDict::new(py);
local.set_item("args", row)?;
local.set_item("operation", operation)?;
let sql = py.eval("operation.format(*args)", None, Some(local))?;
sql.extract()
}
})
.try_collect()?;
Ok::<_, PyErr>(vec)
})?;
let sql: Vec<_> = seq_of_parameters
.iter()?
.map(|row| -> PyResult<String> {
let row = row?;
if row.is_instance_of::<PyDict>()? {
sql_template_fill(operation, None, row.downcast::<PyDict>().ok())
} else if row.is_instance_of::<PyTuple>()? {
sql_template_fill(operation, Some(row.downcast::<PyTuple>()?), None)
} else {
Ok(operation.to_string())
}
})
.try_collect()?;
let affected_rows = self
.inner()?
.exec_many(sql)
Expand All @@ -235,32 +296,23 @@ impl Cursor {
#[args(py_args = "*", parameters = "**")]
pub fn execute_many_with_req_id(
&mut self,
operation: &PyString,
operation: &str,
seq_of_parameters: &PySequence,
req_id: u64,
) -> PyResult<usize> {
let sql = Python::with_gil(|py| {
let vec: Vec<_> = seq_of_parameters
.iter()?
.map(|row| -> PyResult<String> {
let row = row?;
if row.is_instance_of::<PyDict>()? {
let local = PyDict::new(py);
local.set_item("args", row)?;
local.set_item("operation", operation)?;
let sql = py.eval("operation.format(**args)", None, Some(local))?;
sql.extract()
} else {
let local = PyDict::new(py);
local.set_item("args", row)?;
local.set_item("operation", operation)?;
let sql = py.eval("operation.format(*args)", None, Some(local))?;
sql.extract()
}
})
.try_collect()?;
Ok::<_, PyErr>(vec)
})?;
let sql: Vec<_> = seq_of_parameters
.iter()?
.map(|row| -> PyResult<String> {
let row = row?;
if row.is_instance_of::<PyDict>()? {
sql_template_fill(operation, None, row.downcast::<PyDict>().ok())
} else if row.is_instance_of::<PyTuple>()? {
sql_template_fill(operation, Some(row.downcast::<PyTuple>()?), None)
} else {
Ok(operation.to_string())
}
})
.try_collect()?;
let affected_rows = sql
.into_iter()
.map(|sql| {
Expand Down
32 changes: 30 additions & 2 deletions taos-ws-py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,37 @@ impl Connection {
}
}

pub fn execute(&self, sql: &str) -> PyResult<i32> {
#[args(py_args = "*", parameters = "**")]
pub fn execute(
&mut self,
operation: &PyString,
py_args: &PyTuple,
parameters: Option<&PyDict>,
) -> PyResult<usize> {
dbg!(&operation, &py_args, &parameters);
let sql = Python::with_gil(|py| {
let sql: String = if let Some(parameters) = parameters {
let local = PyDict::new(py);
local.set_item("parameters", parameters)?;
local.set_item("operation", operation)?;
local.set_item("args", py_args)?;
dbg!(&operation);
dbg!(&parameters);
let sql = py.eval("operation.format(*args, **parameters)", None, Some(local))?;
dbg!(sql.extract::<String>()?);
let sql = py.eval("operation % (parameters)", None, Some(local))?;
dbg!(sql.extract()?)
} else {
let local = PyDict::new(py);
local.set_item("operation", operation)?;
local.set_item("args", py_args)?;
let sql = py.eval("operation.format(*args)", None, Some(local))?;
sql.extract()?
};
Ok::<_, PyErr>(sql)
})?;
match self.current_cursor()?.query(sql) {
Ok(rs) => Ok(rs.affected_rows()),
Ok(rs) => Ok(rs.affected_rows() as _),
Err(err) => Err(QueryError::new_err(err.to_string())),
}
}
Expand Down
2 changes: 1 addition & 1 deletion taos/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def subscribe(self, restart: bool, topic: str, sql: str, interval: int,
return TaosSubscription(sub, callback is not None)

def statement(self, sql=None):
# type: (str | None) -> TaosStmt|None
# type: (str | None) -> TaosStmt
if self._conn is None:
return None
stmt = taos_stmt_init(self._conn)
Expand Down
Loading
Loading