Skip to content

Commit e09c1a2

Browse files
authored
Merge pull request #17 from Smityz/support-rawkv
support RawKV
2 parents a2f9fe7 + 6795c9b commit e09c1a2

File tree

8 files changed

+275
-13
lines changed

8 files changed

+275
-13
lines changed

Makefile

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@ pre-build: directories target/tikv_client_glue.cc include/tikv_client_glue.h
1717
clean:
1818
cargo clean
1919

20-
run-example: target/tikv-example
21-
RUST_LOG=debug $(cur_makefile_path)/target/tikv-example
20+
run-raw-example: target/tikv-raw-example
21+
RUST_LOG=debug $(cur_makefile_path)/target/tikv-raw-example
2222

23-
target/tikv-example: target/debug/libtikv_client.a example/main.cpp
24-
c++ $(cur_makefile_path)/example/main.cpp -o $(cur_makefile_path)/target/tikv-example -std=c++17 -g -I$(cur_makefile_path)/include -L$(cur_makefile_path)/target/debug -ltikv_client -lpthread -ldl -lssl -lcrypto
23+
run-txn-example: target/tikv-txn-example
24+
RUST_LOG=debug $(cur_makefile_path)/target/tikv-txn-example
2525

26+
target/tikv-raw-example: target/debug/libtikv_client.a example/raw.cpp
27+
c++ $(cur_makefile_path)/example/raw.cpp -o $(cur_makefile_path)/target/tikv-raw-example -std=c++17 -g -I$(cur_makefile_path)/include -L$(cur_makefile_path)/target/debug -ltikv_client -lpthread -ldl -lssl -lcrypto
28+
29+
target/tikv-txn-example: target/debug/libtikv_client.a example/txn.cpp
30+
c++ $(cur_makefile_path)/example/txn.cpp -o $(cur_makefile_path)/target/tikv-txn-example -std=c++17 -g -I$(cur_makefile_path)/include -L$(cur_makefile_path)/target/debug -ltikv_client -lpthread -ldl -lssl -lcrypto
2631

2732
target/tikv_client_glue.cc: src/lib.rs
2833
cxxbridge $(cur_makefile_path)/src/lib.rs > $(cur_makefile_path)/target/tikv_client_glue.cc

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,8 @@ make release
2828

2929
```bash
3030
tiup playground nightly
31-
make run-example
31+
# run rawkv example
32+
make run-raw-example
33+
# run txnkv example
34+
make run-txn-example
3235
```

example/raw.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
#include "tikv_client.h"
4+
#include <iostream>
5+
6+
int main() {
7+
auto client = tikv_client::RawKVClient({"127.0.0.1:2379"});
8+
9+
const std::uint32_t kTimeoutMs = 10;
10+
client.put("k1", "v1", kTimeoutMs);
11+
12+
auto val = client.get("k1",kTimeoutMs);
13+
if (val) {
14+
std::cout << "get key: \n(k1:" << *val << ")" << std::endl;
15+
} else {
16+
std::cout << "key not found" << std::endl;
17+
}
18+
19+
client.batch_put({{"k2","v2"},{"k3","v3"},{"k4","v4"},{"k5","v5"}}, kTimeoutMs);
20+
21+
const std::uint32_t kLimit = 20;
22+
// scan [k1,k6), limit 20, timeout 10ms
23+
auto kv_pairs = client.scan("k1","k6", kLimit ,kTimeoutMs);
24+
std::cout<<"scan[\"k1\",\"k6\"):"<<std::endl;
25+
for (auto iter = kv_pairs.begin(); iter != kv_pairs.end(); ++iter) {
26+
std::cout << "(" << iter->key << ": " << iter->value << ") ";
27+
}
28+
std::cout << std::endl;
29+
30+
// delete [k3,k5), so [k1,k6) should be [k1,k3) + [k5,k6)
31+
std::cout<<"scan[\"k1\",\"k6\") after delete:"<<std::endl;
32+
client.remove_range("k3","k5",kTimeoutMs);
33+
kv_pairs = client.scan("k1","k6", kLimit ,kTimeoutMs);
34+
for (auto iter = kv_pairs.begin(); iter != kv_pairs.end(); ++iter) {
35+
std::cout << "(" << iter->key << ": " << iter->value << ") ";
36+
}
37+
std::cout << std::endl;
38+
39+
return 0;
40+
}
File renamed without changes.

include/tikv_client.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ struct KvPair final {
1414
std::string value;
1515

1616
KvPair(std::string &&key, std::string &&value);
17+
ffi::KvPair to_ffi();
1718
};
1819

1920
class Transaction {
@@ -42,6 +43,20 @@ class TransactionClient {
4243
::rust::cxxbridge1::Box<tikv_client_glue::TransactionClient> _client;
4344
};
4445

45-
}
46+
class RawKVClient {
47+
public:
48+
RawKVClient(const std::vector<std::string> &pd_endpoints);
49+
std::optional<std::string> get(const std::string &key,const std::uint32_t timeout);
50+
void put(const std::string &key, const std::string &value, const std::uint32_t timeout);
51+
void batch_put(const std::vector<KvPair> &kvs, const std::uint32_t timeout);
52+
void remove(const std::string &key, const std::uint32_t timeout);
53+
void remove_range(const std::string &start_key, const std::string &end_key, const std::uint32_t timeout);
54+
std::vector<KvPair> scan(const std::string &startKey, const std::string &endKey, std::uint32_t limit, const std::uint32_t timeout);
55+
56+
private:
57+
::rust::cxxbridge1::Box<tikv_client_glue::RawKVClient> _client;
58+
};
59+
60+
} // namespace tikv_client
4661

4762
#endif //_TIKV_CLIENT_H_

include/tikv_client_glue.h

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -573,12 +573,15 @@ Vec<T>::Vec(unsafe_bitcopy_t, const Vec &bits) noexcept : repr(bits.repr) {}
573573
} // namespace rust
574574

575575
struct Key;
576-
struct KvPair;
577576
struct OptionalValue;
578577
enum class Bound : ::std::uint8_t;
578+
namespace ffi {
579+
struct KvPair;
580+
}
579581
namespace tikv_client_glue {
580582
struct TransactionClient;
581583
struct Transaction;
584+
struct RawKVClient;
582585
}
583586

584587
#ifndef CXXBRIDGE1_STRUCT_Key
@@ -590,15 +593,17 @@ struct Key final {
590593
};
591594
#endif // CXXBRIDGE1_STRUCT_Key
592595

593-
#ifndef CXXBRIDGE1_STRUCT_KvPair
594-
#define CXXBRIDGE1_STRUCT_KvPair
596+
namespace ffi {
597+
#ifndef CXXBRIDGE1_STRUCT_ffi$KvPair
598+
#define CXXBRIDGE1_STRUCT_ffi$KvPair
595599
struct KvPair final {
596600
::rust::Vec<::std::uint8_t> key;
597601
::rust::Vec<::std::uint8_t> value;
598602

599603
using IsRelocatable = ::std::true_type;
600604
};
601-
#endif // CXXBRIDGE1_STRUCT_KvPair
605+
#endif // CXXBRIDGE1_STRUCT_ffi$KvPair
606+
} // namespace ffi
602607

603608
#ifndef CXXBRIDGE1_STRUCT_OptionalValue
604609
#define CXXBRIDGE1_STRUCT_OptionalValue
@@ -620,6 +625,20 @@ enum class Bound : ::std::uint8_t {
620625
#endif // CXXBRIDGE1_ENUM_Bound
621626

622627
namespace tikv_client_glue {
628+
::rust::Box<::tikv_client_glue::RawKVClient> raw_client_new(const ::std::vector<::std::string> &pd_endpoints);
629+
630+
::OptionalValue raw_get(const ::tikv_client_glue::RawKVClient &client, const ::std::string &key, ::std::uint32_t timeout_ms);
631+
632+
void raw_put(const ::tikv_client_glue::RawKVClient &cli, const ::std::string &key, const ::std::string &val, ::std::uint32_t timeout_ms);
633+
634+
::rust::Vec<::ffi::KvPair> raw_scan(const ::tikv_client_glue::RawKVClient &cli, const ::std::string &start, const ::std::string &end, ::std::uint32_t limit, ::std::uint32_t timeout_ms);
635+
636+
void raw_delete(const ::tikv_client_glue::RawKVClient &cli, const ::std::string &key, ::std::uint32_t timeout_ms);
637+
638+
void raw_delete_range(const ::tikv_client_glue::RawKVClient &cli, const ::std::string &startKey, const ::std::string &endKey, ::std::uint32_t timeout_ms);
639+
640+
void raw_batch_put(const ::tikv_client_glue::RawKVClient &cli, const ::std::vector<::ffi::KvPair> &pairs, ::std::uint32_t timeout_ms);
641+
623642
::rust::Box<::tikv_client_glue::TransactionClient> transaction_client_new(const ::std::vector<::std::string> &pd_endpoints);
624643

625644
::rust::Box<::tikv_client_glue::Transaction> transaction_client_begin(const ::tikv_client_glue::TransactionClient &client);
@@ -630,11 +649,11 @@ ::OptionalValue transaction_get(const ::tikv_client_glue::Transaction &transacti
630649

631650
::OptionalValue transaction_get_for_update(::tikv_client_glue::Transaction &transaction, const ::std::string &key);
632651

633-
::rust::Vec<::KvPair> transaction_batch_get(::tikv_client_glue::Transaction &transaction, const ::std::vector<::std::string> &keys);
652+
::rust::Vec<::ffi::KvPair> transaction_batch_get(::tikv_client_glue::Transaction &transaction, const ::std::vector<::std::string> &keys);
634653

635-
::rust::Vec<::KvPair> transaction_batch_get_for_update(::tikv_client_glue::Transaction &transaction, const ::std::vector<::std::string> &keys);
654+
::rust::Vec<::ffi::KvPair> transaction_batch_get_for_update(::tikv_client_glue::Transaction &transaction, const ::std::vector<::std::string> &keys);
636655

637-
::rust::Vec<::KvPair> transaction_scan(::tikv_client_glue::Transaction &transaction, const ::std::string &start, ::Bound start_bound, const ::std::string &end, ::Bound end_bound, ::std::uint32_t limit);
656+
::rust::Vec<::ffi::KvPair> transaction_scan(::tikv_client_glue::Transaction &transaction, const ::std::string &start, ::Bound start_bound, const ::std::string &end, ::Bound end_bound, ::std::uint32_t limit);
638657

639658
::rust::Vec<::Key> transaction_scan_keys(::tikv_client_glue::Transaction &transaction, const ::std::string &start, ::Bound start_bound, const ::std::string &end, ::Bound end_bound, ::std::uint32_t limit);
640659

src/lib.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod ffi {
1515
key: Vec<u8>,
1616
}
1717

18+
#[namespace = "ffi"]
1819
struct KvPair {
1920
key: Vec<u8>,
2021
value: Vec<u8>,
@@ -35,6 +36,42 @@ mod ffi {
3536
extern "Rust" {
3637
type TransactionClient;
3738
type Transaction;
39+
type RawKVClient;
40+
41+
fn raw_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<RawKVClient>>;
42+
43+
fn raw_get(client: &RawKVClient, key: &CxxString, timeout_ms: u32)
44+
-> Result<OptionalValue>;
45+
46+
fn raw_put(
47+
cli: &RawKVClient,
48+
key: &CxxString,
49+
val: &CxxString,
50+
timeout_ms: u32,
51+
) -> Result<()>;
52+
53+
fn raw_scan(
54+
cli: &RawKVClient,
55+
start: &CxxString,
56+
end: &CxxString,
57+
limit: u32,
58+
timeout_ms: u32,
59+
) -> Result<Vec<KvPair>>;
60+
61+
fn raw_delete(cli: &RawKVClient, key: &CxxString, timeout_ms: u32) -> Result<()>;
62+
63+
fn raw_delete_range(
64+
cli: &RawKVClient,
65+
startKey: &CxxString,
66+
endKey: &CxxString,
67+
timeout_ms: u32,
68+
) -> Result<()>;
69+
70+
fn raw_batch_put(
71+
cli: &RawKVClient,
72+
pairs: &CxxVector<KvPair>,
73+
timeout_ms: u32,
74+
) -> Result<()>;
3875

3976
fn transaction_client_new(
4077
pd_endpoints: &CxxVector<CxxString>,
@@ -98,11 +135,28 @@ struct TransactionClient {
98135
inner: tikv_client::TransactionClient,
99136
}
100137

138+
struct RawKVClient {
139+
inner: tikv_client::RawClient,
140+
}
141+
101142
#[repr(transparent)]
102143
struct Transaction {
103144
inner: tikv_client::Transaction,
104145
}
105146

147+
fn raw_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<RawKVClient>> {
148+
env_logger::init();
149+
150+
let pd_endpoints = pd_endpoints
151+
.iter()
152+
.map(|str| str.to_str().map(ToOwned::to_owned))
153+
.collect::<std::result::Result<Vec<_>, _>>()?;
154+
155+
Ok(Box::new(RawKVClient {
156+
inner: block_on(tikv_client::RawClient::new(pd_endpoints))?,
157+
}))
158+
}
159+
106160
fn transaction_client_new(pd_endpoints: &CxxVector<CxxString>) -> Result<Box<TransactionClient>> {
107161
env_logger::init();
108162

@@ -128,6 +182,73 @@ fn transaction_client_begin_pessimistic(client: &TransactionClient) -> Result<Bo
128182
}))
129183
}
130184

185+
fn raw_get(cli: &RawKVClient, key: &CxxString, timeout_ms: u32) -> Result<OptionalValue> {
186+
match block_on(cli.inner.get(key.as_bytes().to_vec()))? {
187+
Some(value) => Ok(OptionalValue {
188+
is_none: false,
189+
value,
190+
}),
191+
None => Ok(OptionalValue {
192+
is_none: true,
193+
value: Vec::new(),
194+
}),
195+
}
196+
}
197+
198+
// TODO(smityz): implement timeout
199+
fn raw_put(cli: &RawKVClient, key: &CxxString, val: &CxxString, timeout_ms: u32) -> Result<()> {
200+
block_on(
201+
cli.inner
202+
.put(key.as_bytes().to_vec(), val.as_bytes().to_vec()),
203+
)?;
204+
Ok(())
205+
}
206+
207+
fn raw_scan(
208+
cli: &RawKVClient,
209+
start: &CxxString,
210+
end: &CxxString,
211+
limit: u32,
212+
timeout_ms: u32,
213+
) -> Result<Vec<KvPair>> {
214+
let rg = to_bound_range(start, Bound::Included, end, Bound::Excluded);
215+
let pairs = block_on(cli.inner.scan(rg, limit))?
216+
.into_iter()
217+
.map(|tikv_client::KvPair(key, value)| KvPair {
218+
key: key.into(),
219+
value,
220+
})
221+
.collect();
222+
Ok(pairs)
223+
}
224+
225+
fn raw_delete(cli: &RawKVClient, key: &CxxString, timeout_ms: u32) -> Result<()> {
226+
block_on(cli.inner.delete(key.as_bytes().to_vec()))?;
227+
Ok(())
228+
}
229+
230+
fn raw_delete_range(
231+
cli: &RawKVClient,
232+
start_key: &CxxString,
233+
end_key: &CxxString,
234+
timeout_ms: u32,
235+
) -> Result<()> {
236+
let rg = to_bound_range(start_key, Bound::Included, end_key, Bound::Excluded);
237+
block_on(cli.inner.delete_range(rg))?;
238+
Ok(())
239+
}
240+
241+
fn raw_batch_put(cli: &RawKVClient, pairs: &CxxVector<KvPair>, timeout_ms: u32) -> Result<()> {
242+
let tikv_pairs: Vec<tikv_client::KvPair> = pairs
243+
.iter()
244+
.map(|KvPair { key, value }| -> tikv_client::KvPair {
245+
tikv_client::KvPair(key.to_vec().into(), value.to_vec())
246+
})
247+
.collect();
248+
block_on(cli.inner.batch_put(tikv_pairs))?;
249+
Ok(())
250+
}
251+
131252
fn transaction_get(transaction: &Transaction, key: &CxxString) -> Result<OptionalValue> {
132253
match block_on(transaction.inner.get(key.as_bytes().to_vec()))? {
133254
Some(value) => Ok(OptionalValue {

src/tikv_client.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,68 @@ KvPair::KvPair(std::string &&key, std::string &&value)
1212
, value(std::move(value))
1313
{}
1414

15+
ffi::KvPair KvPair::to_ffi() {
16+
ffi::KvPair f_pair;
17+
f_pair.key.reserve(key.size());
18+
for (const auto &c : this->key) {
19+
f_pair.key.emplace_back(static_cast<std::uint8_t>(c));
20+
}
21+
f_pair.value.reserve(value.size());
22+
for (const auto &c : this->value) {
23+
f_pair.value.emplace_back(static_cast<std::uint8_t>(c));
24+
}
25+
return f_pair;
26+
}
27+
1528
TransactionClient::TransactionClient(const std::vector<std::string> &pd_endpoints):
1629
_client(tikv_client_glue::transaction_client_new(pd_endpoints)) {}
1730

31+
RawKVClient::RawKVClient(const std::vector<std::string> &pd_endpoints):
32+
_client(tikv_client_glue::raw_client_new(pd_endpoints)) {}
33+
34+
std::optional<std::string> RawKVClient::get(const std::string &key, const std::uint32_t timeout) {
35+
auto val = tikv_client_glue::raw_get(*_client,key,timeout);
36+
if (val.is_none) {
37+
return std::nullopt;
38+
} else {
39+
return std::string{val.value.begin(), val.value.end()};
40+
}
41+
}
42+
43+
void RawKVClient::put(const std::string &key, const std::string &value, const std::uint32_t timeout) {
44+
tikv_client_glue::raw_put(*_client,key,value,timeout);
45+
}
46+
47+
void RawKVClient::batch_put(const std::vector<KvPair> &kv_pairs, const std::uint32_t timeout) {
48+
std::vector<ffi::KvPair> pairs;
49+
pairs.reserve(kv_pairs.size());
50+
for (auto pair: kv_pairs) {
51+
pairs.emplace_back(pair.to_ffi());
52+
}
53+
tikv_client_glue::raw_batch_put(*_client,pairs,timeout);
54+
}
55+
56+
std::vector<KvPair> RawKVClient::scan(const std::string &startKey, const std::string &endKey, std::uint32_t limit, const std::uint32_t timeout){
57+
auto kv_pairs = tikv_client_glue::raw_scan(*_client,startKey,endKey,limit,timeout);
58+
std::vector<KvPair> result;
59+
result.reserve(kv_pairs.size());
60+
for (auto iter = kv_pairs.begin(); iter != kv_pairs.end(); ++iter) {
61+
result.emplace_back(
62+
std::string{(iter->key).begin(), (iter->key).end()},
63+
std::string{(iter->value).begin(), (iter->value).end()}
64+
);
65+
}
66+
return result;
67+
}
68+
69+
void RawKVClient::remove(const std::string &key, const std::uint32_t timeout) {
70+
tikv_client_glue::raw_delete(*_client,key,timeout);
71+
}
72+
73+
void RawKVClient::remove_range(const std::string &start_key,const std::string &end_key, const std::uint32_t timeout) {
74+
tikv_client_glue::raw_delete_range(*_client,start_key,end_key,timeout);
75+
}
76+
1877
Transaction TransactionClient::begin() {
1978
return Transaction(transaction_client_begin(*_client));
2079
}

0 commit comments

Comments
 (0)