Skip to content

feat(ws): support blob type #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/taos-ws-py-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ jobs:
TDENGINE_URL: localhost:6041
WS_CLOUD_URL: ${{ secrets.WS_CLOUD_URL }}
WS_CLOUD_TOKEN: ${{ secrets.WS_CLOUD_TOKEN }}
TEST_TD_3360: "true"
run: |
pip3 install pytest
pytest ./taos-ws-py/tests/
54 changes: 22 additions & 32 deletions taos-ws-py/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions taos-ws-py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ version = "0.17.3"
features = ["extension-module", "anyhow", "chrono", "abi3-py37"]

[target.'cfg(windows)'.dependencies]
taos = { git = "https://github.com/taosdata/taos-connector-rust.git", branch = "main", default-features = false, features = [
taos = { git = "https://github.com/taosdata/taos-connector-rust.git", branch = "feat/TD-35106", default-features = false, features = [
"optin",
"ws-rustls",
"ws-rustls-aws-lc-crypto-provider",
] }
[target.'cfg(unix)'.dependencies]
taos = { git = "https://github.com/taosdata/taos-connector-rust.git", branch = "main", default-features = false, features = [
taos = { git = "https://github.com/taosdata/taos-connector-rust.git", branch = "feat/TD-35106", default-features = false, features = [
"optin",
"ws-rustls",
"ws-rustls-ring-crypto-provider",
Expand Down
3 changes: 1 addition & 2 deletions taos-ws-py/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub fn to_py_datetime(dt: chrono::NaiveDateTime, py: Python) -> PyResult<PyObjec
Ok(datetime.call(args, None)?.into_py(py))
}

// pub fn datetime_to_py(t: chrono py: Python)
pub unsafe fn get_row_of_block_unchecked(py: Python, block: &RawBlock, index: usize) -> PyObject {
let mut vec = Vec::new();
for i in 0..block.ncols() {
Expand Down Expand Up @@ -58,7 +57,7 @@ pub unsafe fn get_row_of_block_unchecked(py: Python, block: &RawBlock, index: us
BorrowedValue::Geometry(v) => v.into_py(py),
BorrowedValue::Decimal64(v) => v.to_string().into_py(py),
BorrowedValue::Decimal(v) => v.to_string().into_py(py),
BorrowedValue::Blob(_) => todo!(),
BorrowedValue::Blob(v) => v.into_py(py),
BorrowedValue::MediumBlob(_) => todo!(),
};
vec.push(val);
Expand Down
2 changes: 1 addition & 1 deletion taos-ws-py/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl Consumer {
Ok(topics)
}

///
/// Poll for a message with an optional timeout.
pub fn poll(&mut self, timeout: Option<f64>) -> PyResult<Option<Message>> {
let timeout = if let Some(timeout) = timeout {
Timeout::Duration(Duration::from_secs_f64(timeout))
Expand Down
1 change: 0 additions & 1 deletion taos-ws-py/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ impl Cursor {
let vec: Vec<_> = seq_of_parameters
.iter()?
.map(|row| -> PyResult<String> {
// let params = row.extract().unwrap();
let row = row?;
if row.is_instance_of::<PyDict>()? {
let local = PyDict::new(py);
Expand Down
14 changes: 14 additions & 0 deletions taos-ws-py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl TaosResult {
BorrowedValue::Json(j) => std::str::from_utf8(&j).unwrap().into_py(py),
BorrowedValue::VarBinary(v) => v.as_ref().into_py(py),
BorrowedValue::Geometry(v) => v.as_ref().into_py(py),
BorrowedValue::Blob(v) => v.as_ref().into_py(py),
_ => Option::<()>::None.into_py(py),
};
vec.push(value);
Expand Down Expand Up @@ -991,6 +992,18 @@ fn geometry_to_column(values: Vec<Option<Vec<u8>>>) -> PyColumnView {
}
}

#[pyfunction]
fn blob_to_column(values: Vec<Option<Vec<u8>>>) -> PyColumnView {
PyColumnView {
_inner: ColumnView::from_blob_bytes::<
Vec<u8>,
Option<Vec<u8>>,
std::vec::IntoIter<Option<Vec<u8>>>,
Vec<Option<Vec<u8>>>,
>(values),
}
}

#[pymodule]
fn taosws(py: Python<'_>, m: &PyModule) -> PyResult<()> {
if std::env::var("RUST_LOG").is_ok() {
Expand Down Expand Up @@ -1051,6 +1064,7 @@ fn taosws(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(binary_to_column, m)?)?;
m.add_function(wrap_pyfunction!(varbinary_to_column, m)?)?;
m.add_function(wrap_pyfunction!(geometry_to_column, m)?)?;
m.add_function(wrap_pyfunction!(blob_to_column, m)?)?;
m.add_function(wrap_pyfunction!(stmt2_bind_param_view, m)?)?;

m.add("apilevel", API_LEVEL)?;
Expand Down
149 changes: 149 additions & 0 deletions taos-ws-py/tests/test_blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import datetime
import time
import taosws
from taosws import Consumer
import os


def test_blob_sql():
value = os.getenv("TEST_TD_3360")
if value is not None:
return

conn = taosws.connect("ws://localhost:6041")
cursor = conn.cursor()

try:
cursor.execute("drop database if exists test_1753269319"),
cursor.execute("create database test_1753269319"),
cursor.execute("use test_1753269319"),
cursor.execute("create table t0(ts timestamp, c1 blob)"),
cursor.execute("insert into t0 values(1752218982761, null)"),
cursor.execute("insert into t0 values(1752218982762, '')"),
cursor.execute("insert into t0 values(1752218982763, 'hello')"),
cursor.execute("insert into t0 values(1752218982764, '\\x12345678')"),

cursor.execute("select * from t0")
rows = cursor.fetchall()

assert len(rows) == 4

assert rows[0][0].timestamp() * 1000 == 1752218982761
assert rows[1][0].timestamp() * 1000 == 1752218982762
assert rows[2][0].timestamp() * 1000 == 1752218982763
assert rows[3][0].timestamp() * 1000 == 1752218982764

assert rows[0][1] is None
assert rows[1][1] == b""
assert rows[2][1] == b"hello"
assert rows[3][1] == b"\x124Vx"

finally:
cursor.execute("drop database test_1753269319")
conn.close()


def test_blob_stmt2():
value = os.getenv("TEST_TD_3360")
if value is not None:
return

conn = taosws.connect("ws://localhost:6041")
try:
conn.execute("drop database if exists test_1753269333"),
conn.execute("create database test_1753269333"),
conn.execute("use test_1753269333"),
conn.execute("create table t0 (ts timestamp, c1 blob)"),

test_timestamps = [1726803356466, 1726803356467, 1726803356468, 1726803356469]
test_blobs = [None, b"", b"hello", b"\x124Vx"]

stmt2 = conn.stmt2_statement()
stmt2.prepare("insert into t0 values (?, ?)")

param = taosws.stmt2_bind_param_view(
table_name="",
tags=None,
columns=[
taosws.millis_timestamps_to_column(test_timestamps),
taosws.blob_to_column(test_blobs),
],
)
stmt2.bind([param])

affected_rows = stmt2.execute()
assert affected_rows == 4

stmt2.prepare("select * from t0 where ts > ?")

param = taosws.stmt2_bind_param_view(
table_name="",
tags=None,
columns=[
taosws.millis_timestamps_to_column([1726803356465]),
],
)
stmt2.bind([param])
stmt2.execute()

result = stmt2.result_set()
expected_results = list(zip(test_timestamps, test_blobs))

actual_results = []
for row in result:
dt = datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S.%f %z")
timestamp = int(dt.timestamp() * 1000)
actual_results.append((timestamp, row[1]))

assert actual_results == expected_results

finally:
conn.execute("drop database test_1753269333")
conn.close()


def test_blob_tmq():
value = os.getenv("TEST_TD_3360")
if value is not None:
return

conn = taosws.connect("ws://localhost:6041")
try:
conn.execute("drop topic if exists topic_1753270984"),
conn.execute("drop database if exists test_1753270984"),
conn.execute("create database test_1753270984"),
conn.execute("create topic topic_1753270984 as database test_1753270984"),
conn.execute("use test_1753270984"),
conn.execute("create table t0 (ts timestamp, c1 int, c2 blob)"),

num = 100

sql = "insert into t0 values "
for i in range(num):
ts = 1726803356466 + i
sql += f"({ts}, {i}, 'blob_{i}'), "
conn.execute(sql)

consumer = Consumer(dsn="ws://localhost:6041?group.id=10&auto.offset.reset=earliest")
consumer.subscribe(["topic_1753270984"])

cnt = 0

while 1:
message = consumer.poll(timeout=5.0)
if message:
for block in message:
cnt += block.nrows()
consumer.commit(message)
else:
break

assert cnt == num

consumer.unsubscribe()

finally:
time.sleep(3)
conn.execute("drop topic topic_1753270984")
conn.execute("drop database test_1753270984")
conn.close()
Loading
Loading