A lightweight, elegant Rust client for Timeplus with minimal dependencies and maximum performance.
- π Lightweight: Minimal dependencies (only
reqwest
+serde_json
) - π Stream Management: Create, insert, query, and manage streams
- π Python UDF Support: Create and manage Python User-Defined Functions
- π Real-time Streaming: Callback-based streaming with real-time data processing
- π§ Advanced SQL: Support for aggregations, functions, and complex queries
- π‘οΈ Type Safe: Full Rust type system support with comprehensive error handling
- π Well Documented: Extensive documentation and examples
- Install Docker (for running Timeplus)
- Rust 1.70+
# Pull and run Timeplus Enterprise
docker run -d \
--name timeplus-server \
-p 8000:8000 \
-p 8123:8123 \
-p 8463:8463 \
-p 3218:3218 \
-v /tmp/timeplus_data:/timeplus/data \
docker.timeplus.com/timeplus/timeplus-enterprise:2.7.9
# Wait for server to start (about 30 seconds)
sleep 30
# Verify server is running
curl http://localhost:8123
[dependencies]
timeplus-client = "0.1.0"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
use timeplus_client::{TimeplusClient, callback::PrintCallback};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let client = TimeplusClient::new(
"http://localhost:3218",
"proton",
"timeplus@t+"
)?;
// Create a stream
client.streams().create(
"my_stream",
"id uint32, name string, value float64"
).await?;
// Insert data
let data = vec![
("id", 1.into()),
("name", "Alice".into()),
("value", 10.5.into()),
];
client.streams().insert_data("my_stream", &data).await?;
// Query historical data
let result = client.streams().query("my_stream", Some(10)).await?;
println!("Found {} records", result.row_count());
// Stream real-time data with callback
let callback = Arc::new(PrintCallback::new("MyStream"));
let _handle = client.stream(
"SELECT * FROM my_stream",
callback
).await?;
Ok(())
}
// Create stream
client.streams().create("events", "id uint32, message string").await?;
// Insert single record
client.streams().insert_data("events", &[
("id", 1.into()),
("message", "Hello".into())
]).await?;
// Insert batch
let records = vec![
HashMap::from([("id".to_string(), 1.into()), ("message".to_string(), "A".into())]),
HashMap::from([("id".to_string(), 2.into()), ("message".to_string(), "B".into())]),
];
client.streams().insert_batch("events", &records).await?;
// Query data
let result = client.streams().query("events", Some(100)).await?;
// Get statistics
let stats = client.streams().stats("events").await?;
// Create a Python UDF
let code = r#"
def multiply_by_factor(values, factor):
return [v * factor for v in values]
"#;
client.udfs().create(
"multiply_by_factor",
"values array(float64), factor float64",
"array(float64)",
code
).await?;
// Test the UDF
let result = client.udfs().test("multiply_by_factor", "[1.0, 2.0, 3.0], 2.0").await?;
// Use in queries
let query_result = client.query(
"SELECT multiply_by_factor([1.0, 2.0, 3.0], 2.5) as result"
).await?;
use timeplus_client::callback::{CountingCallback, FilterCallback, AggregationCallback};
// Counting callback
let counter = Arc::new(CountingCallback::new("EventCounter"));
client.stream("SELECT * FROM events", counter.clone()).await?;
// Filter callback
let filter = Arc::new(FilterCallback::new("HighValue", |data| {
data.get("value").and_then(|v| v.as_f64()).unwrap_or(0.0) > 100.0
}));
client.stream("SELECT * FROM events", filter).await?;
// Aggregation callback
let aggregator = Arc::new(AggregationCallback::new("ValueStats", "value"));
client.stream("SELECT * FROM events", aggregator).await?;
Port | Type | Purpose | Default Query Mode |
---|---|---|---|
8123 | HTTP | Batch queries and DDL | Batch |
3218 | HTTP | Streaming queries | Streaming |
8463 | TCP | Native client (streaming) | Streaming |
7587 | TCP | Native client (batch) | Batch |
8000 | HTTP | Web UI | - |
use timeplus_client::TimeplusConfig;
let config = TimeplusConfig::new("http://localhost:3218", "proton", "timeplus@t+")
.with_timeout(60); // 60 seconds timeout
let client = TimeplusClient::with_config(config)?;
See the examples/
directory for complete examples:
basic_usage.rs
- Basic stream operationsstreaming_callbacks.rs
- Real-time streamingpython_udfs.rs
- Python UDF managementadvanced_queries.rs
- Complex SQL queries
# Start Timeplus server first
docker run -d --name timeplus-server \
-p 8000:8000 -p 8123:8123 -p 8463:8463 -p 3218:3218 \
-v /tmp/timeplus_data:/timeplus/data \
docker.timeplus.com/timeplus/timeplus-enterprise:2.7.9
# Run examples
cargo run --example basic_usage
cargo run --example streaming_callbacks
cargo run --example python_udfs
# Run all tests
cargo test
# Run integration tests (requires running Timeplus server)
cargo test --test integration_tests
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Timeplus for the amazing streaming database
- Reqwest for HTTP client functionality
- The Rust community for excellent async ecosystem
- π Documentation
- π Issue Tracker
- π¬ Discussions