diff --git a/example/txn.cpp b/example/txn.cpp index eca8420..f6f99cd 100644 --- a/example/txn.cpp +++ b/example/txn.cpp @@ -4,7 +4,9 @@ #include int main() { + tikv_client::Logger::init(); auto client = tikv_client::TransactionClient({"127.0.0.1:2379"}); + auto client2 = tikv_client::TransactionClient({"127.0.0.1:2379"}); auto txn = client.begin(); txn.put("k1", "v2"); diff --git a/include/tikv_client.h b/include/tikv_client.h index 7f52684..ba72c47 100644 --- a/include/tikv_client.h +++ b/include/tikv_client.h @@ -9,6 +9,10 @@ namespace tikv_client { +class Logger { +public: + static void init(); +}; struct KvPair final { std::string key; std::string value; diff --git a/src/lib.rs b/src/lib.rs index 495c754..3fa13c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ use std::ops; use anyhow::Result; use cxx::{CxxString, CxxVector}; -use futures::executor::block_on; use tokio::{ runtime::Runtime, time::{timeout, Duration}, @@ -42,6 +41,7 @@ mod ffi { type Transaction; type RawKVClient; + fn init_logger(); fn raw_client_new(pd_endpoints: &CxxVector) -> Result>; fn raw_get(client: &RawKVClient, key: &CxxString, timeout_ms: u64) @@ -135,8 +135,8 @@ mod ffi { } } -#[repr(transparent)] struct TransactionClient { + rt: tokio::runtime::Runtime, inner: tikv_client::TransactionClient, } @@ -145,17 +145,19 @@ struct RawKVClient { inner: tikv_client::RawClient, } -#[repr(transparent)] struct Transaction { + rt: tokio::runtime::Handle, inner: tikv_client::Transaction, } -fn raw_client_new(pd_endpoints: &CxxVector) -> Result> { +fn init_logger() { env_logger::builder() .filter_level(log::LevelFilter::Info) .init(); - let runtime = Runtime::new().unwrap(); +} +fn raw_client_new(pd_endpoints: &CxxVector) -> Result> { + let runtime = Runtime::new().unwrap(); let pd_endpoints = pd_endpoints .iter() .map(|str| str.to_str().map(ToOwned::to_owned)) @@ -168,27 +170,29 @@ fn raw_client_new(pd_endpoints: &CxxVector) -> Result) -> Result> { - env_logger::init(); - + let runtime = Runtime::new().unwrap(); let pd_endpoints = pd_endpoints .iter() .map(|str| str.to_str().map(ToOwned::to_owned)) .collect::, _>>()?; Ok(Box::new(TransactionClient { - inner: block_on(tikv_client::TransactionClient::new(pd_endpoints))?, + inner: runtime.block_on(tikv_client::TransactionClient::new(pd_endpoints))?, + rt: runtime, })) } fn transaction_client_begin(client: &TransactionClient) -> Result> { Ok(Box::new(Transaction { - inner: block_on(client.inner.begin_optimistic())?, + rt: client.rt.handle().clone(), + inner: client.rt.block_on(client.inner.begin_optimistic())?, })) } fn transaction_client_begin_pessimistic(client: &TransactionClient) -> Result> { Ok(Box::new(Transaction { - inner: block_on(client.inner.begin_pessimistic())?, + rt: client.rt.handle().clone(), + inner: client.rt.block_on(client.inner.begin_pessimistic())?, })) } @@ -292,7 +296,10 @@ fn raw_batch_put(cli: &RawKVClient, pairs: &CxxVector, timeout_ms: u64) } fn transaction_get(transaction: &mut Transaction, key: &CxxString) -> Result { - match block_on(transaction.inner.get(key.as_bytes().to_vec()))? { + match transaction + .rt + .block_on(transaction.inner.get(key.as_bytes().to_vec()))? + { Some(value) => Ok(OptionalValue { is_none: false, value, @@ -308,7 +315,10 @@ fn transaction_get_for_update( transaction: &mut Transaction, key: &CxxString, ) -> Result { - match block_on(transaction.inner.get_for_update(key.as_bytes().to_vec()))? { + match transaction + .rt + .block_on(transaction.inner.get_for_update(key.as_bytes().to_vec()))? + { Some(value) => Ok(OptionalValue { is_none: false, value, @@ -325,7 +335,9 @@ fn transaction_batch_get( keys: &CxxVector, ) -> Result> { let keys = keys.iter().map(|key| key.as_bytes().to_vec()); - let kv_pairs = block_on(transaction.inner.batch_get(keys))? + let kv_pairs = transaction + .rt + .block_on(transaction.inner.batch_get(keys))? .map(|tikv_client::KvPair(key, value)| KvPair { key: key.into(), value, @@ -358,7 +370,9 @@ fn transaction_scan( limit: u32, ) -> Result> { let range = to_bound_range(start, start_bound, end, end_bound); - let kv_pairs = block_on(transaction.inner.scan(range, limit))? + let kv_pairs = transaction + .rt + .block_on(transaction.inner.scan(range, limit))? .map(|tikv_client::KvPair(key, value)| KvPair { key: key.into(), value, @@ -376,14 +390,16 @@ fn transaction_scan_keys( limit: u32, ) -> Result> { let range = to_bound_range(start, start_bound, end, end_bound); - let keys = block_on(transaction.inner.scan_keys(range, limit))? + let keys = transaction + .rt + .block_on(transaction.inner.scan_keys(range, limit))? .map(|key| Key { key: key.into() }) .collect(); Ok(keys) } fn transaction_put(transaction: &mut Transaction, key: &CxxString, val: &CxxString) -> Result<()> { - block_on( + transaction.rt.block_on( transaction .inner .put(key.as_bytes().to_vec(), val.as_bytes().to_vec()), @@ -392,12 +408,14 @@ fn transaction_put(transaction: &mut Transaction, key: &CxxString, val: &CxxStri } fn transaction_delete(transaction: &mut Transaction, key: &CxxString) -> Result<()> { - block_on(transaction.inner.delete(key.as_bytes().to_vec()))?; + transaction + .rt + .block_on(transaction.inner.delete(key.as_bytes().to_vec()))?; Ok(()) } fn transaction_commit(transaction: &mut Transaction) -> Result<()> { - block_on(transaction.inner.commit())?; + transaction.rt.block_on(transaction.inner.commit())?; Ok(()) } diff --git a/src/tikv_client.cpp b/src/tikv_client.cpp index 4d4a49a..b7590f7 100644 --- a/src/tikv_client.cpp +++ b/src/tikv_client.cpp @@ -7,6 +7,10 @@ using ::rust::cxxbridge1::Box; namespace tikv_client { +void Logger::init() { + tikv_client_glue::init_logger(); +} + KvPair::KvPair(std::string &&key, std::string &&value) : key(std::move(key)) , value(std::move(value))