Skip to content

graceyangfan/timeplus-rust-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commit
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Timeplus Rust Client

Crates.io Documentation License: MIT

A lightweight, elegant Rust client for Timeplus with minimal dependencies and maximum performance.

✨ Features

  • πŸš€ Lightweight: Minimal dependencies (only reqwest + serde_json)
  • πŸ“Š Stream Management: Create, insert, query, and manage streams
  • 🐍 Python UDF Support: Create and manage Python User-Defined Functions
  • 🌊 Real-time Streaming: Callback-based streaming with real-time data processing
  • πŸ”§ Advanced SQL: Support for aggregations, functions, and complex queries
  • πŸ›‘οΈ Type Safe: Full Rust type system support with comprehensive error handling
  • πŸ“– Well Documented: Extensive documentation and examples

πŸš€ Quick Start

Prerequisites

  1. Install Docker (for running Timeplus)
  2. Rust 1.70+

Setup Timeplus Server

# Pull and run Timeplus Enterprise
docker run -d \
  --name timeplus-server \
  -p 8000:8000 \
  -p 8123:8123 \
  -p 8463:8463 \
  -p 3218:3218 \
  -v /tmp/timeplus_data:/timeplus/data \
  docker.timeplus.com/timeplus/timeplus-enterprise:2.7.9

# Wait for server to start (about 30 seconds)
sleep 30

# Verify server is running
curl http://localhost:8123

Add to Your Project

[dependencies]
timeplus-client = "0.1.0"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }

Basic Usage

use timeplus_client::{TimeplusClient, callback::PrintCallback};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client
    let client = TimeplusClient::new(
        "http://localhost:3218", 
        "proton", 
        "timeplus@t+"
    )?;

    // Create a stream
    client.streams().create(
        "my_stream", 
        "id uint32, name string, value float64"
    ).await?;

    // Insert data
    let data = vec![
        ("id", 1.into()),
        ("name", "Alice".into()),
        ("value", 10.5.into()),
    ];
    client.streams().insert_data("my_stream", &data).await?;

    // Query historical data
    let result = client.streams().query("my_stream", Some(10)).await?;
    println!("Found {} records", result.row_count());

    // Stream real-time data with callback
    let callback = Arc::new(PrintCallback::new("MyStream"));
    let _handle = client.stream(
        "SELECT * FROM my_stream", 
        callback
    ).await?;

    Ok(())
}

πŸ“– Documentation

Stream Management

// Create stream
client.streams().create("events", "id uint32, message string").await?;

// Insert single record
client.streams().insert_data("events", &[
    ("id", 1.into()),
    ("message", "Hello".into())
]).await?;

// Insert batch
let records = vec![
    HashMap::from([("id".to_string(), 1.into()), ("message".to_string(), "A".into())]),
    HashMap::from([("id".to_string(), 2.into()), ("message".to_string(), "B".into())]),
];
client.streams().insert_batch("events", &records).await?;

// Query data
let result = client.streams().query("events", Some(100)).await?;

// Get statistics
let stats = client.streams().stats("events").await?;

Python UDF Management

// Create a Python UDF
let code = r#"
def multiply_by_factor(values, factor):
    return [v * factor for v in values]
"#;

client.udfs().create(
    "multiply_by_factor",
    "values array(float64), factor float64", 
    "array(float64)",
    code
).await?;

// Test the UDF
let result = client.udfs().test("multiply_by_factor", "[1.0, 2.0, 3.0], 2.0").await?;

// Use in queries
let query_result = client.query(
    "SELECT multiply_by_factor([1.0, 2.0, 3.0], 2.5) as result"
).await?;

Streaming with Callbacks

use timeplus_client::callback::{CountingCallback, FilterCallback, AggregationCallback};

// Counting callback
let counter = Arc::new(CountingCallback::new("EventCounter"));
client.stream("SELECT * FROM events", counter.clone()).await?;

// Filter callback
let filter = Arc::new(FilterCallback::new("HighValue", |data| {
    data.get("value").and_then(|v| v.as_f64()).unwrap_or(0.0) > 100.0
}));
client.stream("SELECT * FROM events", filter).await?;

// Aggregation callback
let aggregator = Arc::new(AggregationCallback::new("ValueStats", "value"));
client.stream("SELECT * FROM events", aggregator).await?;

πŸ”§ Configuration

Timeplus Server Ports

Port Type Purpose Default Query Mode
8123 HTTP Batch queries and DDL Batch
3218 HTTP Streaming queries Streaming
8463 TCP Native client (streaming) Streaming
7587 TCP Native client (batch) Batch
8000 HTTP Web UI -

Client Configuration

use timeplus_client::TimeplusConfig;

let config = TimeplusConfig::new("http://localhost:3218", "proton", "timeplus@t+")
    .with_timeout(60); // 60 seconds timeout

let client = TimeplusClient::with_config(config)?;

πŸ§ͺ Examples

See the examples/ directory for complete examples:

πŸš€ Running Examples

# Start Timeplus server first
docker run -d --name timeplus-server \
  -p 8000:8000 -p 8123:8123 -p 8463:8463 -p 3218:3218 \
  -v /tmp/timeplus_data:/timeplus/data \
  docker.timeplus.com/timeplus/timeplus-enterprise:2.7.9

# Run examples
cargo run --example basic_usage
cargo run --example streaming_callbacks
cargo run --example python_udfs

πŸ§ͺ Testing

# Run all tests
cargo test

# Run integration tests (requires running Timeplus server)
cargo test --test integration_tests

🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments

  • Timeplus for the amazing streaming database
  • Reqwest for HTTP client functionality
  • The Rust community for excellent async ecosystem

πŸ“ž Support

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published