Skip to content

Use tokio runtime to run functions in TransactionClient and Transaction #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <tikv/tikv_client.h>

int main() {
tikv_client::Logger::init();
auto client = tikv_client::TransactionClient({"127.0.0.1:2379"});
auto client2 = tikv_client::TransactionClient({"127.0.0.1:2379"});
auto txn = client.begin();

txn.put("k1", "v2");
Expand Down
4 changes: 4 additions & 0 deletions include/tikv_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

namespace tikv_client {

class Logger {
public:
static void init();
};
struct KvPair final {
std::string key;
std::string value;
Expand Down
54 changes: 36 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::ops;

use anyhow::Result;
use cxx::{CxxString, CxxVector};
use futures::executor::block_on;
use tokio::{
runtime::Runtime,
time::{timeout, Duration},
Expand Down Expand Up @@ -42,6 +41,7 @@ mod ffi {
type Transaction;
type RawKVClient;

fn init_logger();
fn raw_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<RawKVClient>>;

fn raw_get(client: &RawKVClient, key: &CxxString, timeout_ms: u64)
Expand Down Expand Up @@ -135,8 +135,8 @@ mod ffi {
}
}

#[repr(transparent)]
struct TransactionClient {
rt: tokio::runtime::Runtime,
inner: tikv_client::TransactionClient,
}

Expand All @@ -145,17 +145,19 @@ struct RawKVClient {
inner: tikv_client::RawClient,
}

#[repr(transparent)]
struct Transaction {
rt: tokio::runtime::Handle,
inner: tikv_client::Transaction,
}

fn raw_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<RawKVClient>> {
fn init_logger() {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
let runtime = Runtime::new().unwrap();
}

fn raw_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<RawKVClient>> {
let runtime = Runtime::new().unwrap();
let pd_endpoints = pd_endpoints
.iter()
.map(|str| str.to_str().map(ToOwned::to_owned))
Expand All @@ -168,27 +170,29 @@ fn raw_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<RawKVClient
}

fn transaction_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<TransactionClient>> {
env_logger::init();

let runtime = Runtime::new().unwrap();
let pd_endpoints = pd_endpoints
.iter()
.map(|str| str.to_str().map(ToOwned::to_owned))
.collect::<std::result::Result<Vec<_>, _>>()?;

Ok(Box::new(TransactionClient {
inner: block_on(tikv_client::TransactionClient::new(pd_endpoints))?,
inner: runtime.block_on(tikv_client::TransactionClient::new(pd_endpoints))?,
rt: runtime,
}))
}

fn transaction_client_begin(client: &TransactionClient) -> Result<Box<Transaction>> {
Ok(Box::new(Transaction {
inner: block_on(client.inner.begin_optimistic())?,
rt: client.rt.handle().clone(),
inner: client.rt.block_on(client.inner.begin_optimistic())?,
}))
}

fn transaction_client_begin_pessimistic(client: &TransactionClient) -> Result<Box<Transaction>> {
Ok(Box::new(Transaction {
inner: block_on(client.inner.begin_pessimistic())?,
rt: client.rt.handle().clone(),
inner: client.rt.block_on(client.inner.begin_pessimistic())?,
}))
}

Expand Down Expand Up @@ -292,7 +296,10 @@ fn raw_batch_put(cli: &RawKVClient, pairs: &CxxVector<KvPair>, timeout_ms: u64)
}

fn transaction_get(transaction: &mut Transaction, key: &CxxString) -> Result<OptionalValue> {
match block_on(transaction.inner.get(key.as_bytes().to_vec()))? {
match transaction
.rt
.block_on(transaction.inner.get(key.as_bytes().to_vec()))?
{
Some(value) => Ok(OptionalValue {
is_none: false,
value,
Expand All @@ -308,7 +315,10 @@ fn transaction_get_for_update(
transaction: &mut Transaction,
key: &CxxString,
) -> Result<OptionalValue> {
match block_on(transaction.inner.get_for_update(key.as_bytes().to_vec()))? {
match transaction
.rt
.block_on(transaction.inner.get_for_update(key.as_bytes().to_vec()))?
{
Some(value) => Ok(OptionalValue {
is_none: false,
value,
Expand All @@ -325,7 +335,9 @@ fn transaction_batch_get(
keys: &CxxVector<CxxString>,
) -> Result<Vec<KvPair>> {
let keys = keys.iter().map(|key| key.as_bytes().to_vec());
let kv_pairs = block_on(transaction.inner.batch_get(keys))?
let kv_pairs = transaction
.rt
.block_on(transaction.inner.batch_get(keys))?
.map(|tikv_client::KvPair(key, value)| KvPair {
key: key.into(),
value,
Expand Down Expand Up @@ -358,7 +370,9 @@ fn transaction_scan(
limit: u32,
) -> Result<Vec<KvPair>> {
let range = to_bound_range(start, start_bound, end, end_bound);
let kv_pairs = block_on(transaction.inner.scan(range, limit))?
let kv_pairs = transaction
.rt
.block_on(transaction.inner.scan(range, limit))?
.map(|tikv_client::KvPair(key, value)| KvPair {
key: key.into(),
value,
Expand All @@ -376,14 +390,16 @@ fn transaction_scan_keys(
limit: u32,
) -> Result<Vec<Key>> {
let range = to_bound_range(start, start_bound, end, end_bound);
let keys = block_on(transaction.inner.scan_keys(range, limit))?
let keys = transaction
.rt
.block_on(transaction.inner.scan_keys(range, limit))?
.map(|key| Key { key: key.into() })
.collect();
Ok(keys)
}

fn transaction_put(transaction: &mut Transaction, key: &CxxString, val: &CxxString) -> Result<()> {
block_on(
transaction.rt.block_on(
transaction
.inner
.put(key.as_bytes().to_vec(), val.as_bytes().to_vec()),
Expand All @@ -392,12 +408,14 @@ fn transaction_put(transaction: &mut Transaction, key: &CxxString, val: &CxxStri
}

fn transaction_delete(transaction: &mut Transaction, key: &CxxString) -> Result<()> {
block_on(transaction.inner.delete(key.as_bytes().to_vec()))?;
transaction
.rt
.block_on(transaction.inner.delete(key.as_bytes().to_vec()))?;
Ok(())
}

fn transaction_commit(transaction: &mut Transaction) -> Result<()> {
block_on(transaction.inner.commit())?;
transaction.rt.block_on(transaction.inner.commit())?;
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions src/tikv_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ using ::rust::cxxbridge1::Box;

namespace tikv_client {

void Logger::init() {
tikv_client_glue::init_logger();
}

KvPair::KvPair(std::string &&key, std::string &&value)
: key(std::move(key))
, value(std::move(value))
Expand Down