Skip to content

Commit 56bddc2

Browse files
authored
Merge pull request #72 from nrc/mods
Split raw and transaction into smaller modules
2 parents 11b0e1b + b578f63 commit 56bddc2

File tree

8 files changed

+1217
-1220
lines changed

8 files changed

+1217
-1220
lines changed

src/raw.rs

Lines changed: 0 additions & 871 deletions
This file was deleted.

src/raw/client.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
use super::{BatchDelete, BatchGet, BatchPut, BatchScan, Delete, DeleteRange, Get, Put, Scan};
4+
use crate::{rpc::RpcClient, BoundRange, Config, Key, KvPair, Result, Value};
5+
6+
use futures::prelude::*;
7+
use futures::task::{Context, Poll};
8+
use std::{pin::Pin, sync::Arc, u32};
9+
10+
/// The TiKV raw [`Client`](Client) is used to issue requests to the TiKV server and PD cluster.
11+
pub struct Client {
12+
rpc: Arc<RpcClient>,
13+
}
14+
15+
impl Client {
16+
/// Create a new [`Client`](Client) once the [`Connect`](Connect) resolves.
17+
///
18+
/// ```rust,no_run
19+
/// # #![feature(async_await)]
20+
/// # use tikv_client::{Config, raw::Client};
21+
/// # use futures::prelude::*;
22+
/// # futures::executor::block_on(async {
23+
/// let connect = Client::connect(Config::default());
24+
/// let client = connect.await.unwrap();
25+
/// # });
26+
/// ```
27+
pub fn connect(config: Config) -> Connect {
28+
Connect::new(config)
29+
}
30+
31+
#[inline]
32+
fn rpc(&self) -> Arc<RpcClient> {
33+
Arc::clone(&self.rpc)
34+
}
35+
36+
/// Create a new [`Get`](Get) request.
37+
///
38+
/// Once resolved this request will result in the fetching of the value associated with the
39+
/// given key.
40+
///
41+
/// ```rust,no_run
42+
/// # #![feature(async_await)]
43+
/// # use tikv_client::{Value, Config, raw::Client};
44+
/// # use futures::prelude::*;
45+
/// # futures::executor::block_on(async {
46+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
47+
/// # let connected_client = connecting_client.await.unwrap();
48+
/// let key = "TiKV";
49+
/// let req = connected_client.get(key);
50+
/// let result: Option<Value> = req.await.unwrap();
51+
/// # });
52+
/// ```
53+
pub fn get(&self, key: impl Into<Key>) -> Get {
54+
Get::new(self.rpc(), key.into())
55+
}
56+
57+
/// Create a new [`BatchGet`](BatchGet) request.
58+
///
59+
/// Once resolved this request will result in the fetching of the values associated with the
60+
/// given keys.
61+
///
62+
/// ```rust,no_run
63+
/// # #![feature(async_await)]
64+
/// # use tikv_client::{KvPair, Config, raw::Client};
65+
/// # use futures::prelude::*;
66+
/// # futures::executor::block_on(async {
67+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
68+
/// # let connected_client = connecting_client.await.unwrap();
69+
/// let keys = vec!["TiKV", "TiDB"];
70+
/// let req = connected_client.batch_get(keys);
71+
/// let result: Vec<KvPair> = req.await.unwrap();
72+
/// # });
73+
/// ```
74+
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
75+
BatchGet::new(self.rpc(), keys.into_iter().map(Into::into).collect())
76+
}
77+
78+
/// Create a new [`Put`](Put) request.
79+
///
80+
/// Once resolved this request will result in the setting of the value associated with the given key.
81+
///
82+
/// ```rust,no_run
83+
/// # #![feature(async_await)]
84+
/// # use tikv_client::{Key, Value, Config, raw::Client};
85+
/// # use futures::prelude::*;
86+
/// # futures::executor::block_on(async {
87+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
88+
/// # let connected_client = connecting_client.await.unwrap();
89+
/// let key = "TiKV";
90+
/// let val = "TiKV";
91+
/// let req = connected_client.put(key, val);
92+
/// let result: () = req.await.unwrap();
93+
/// # });
94+
/// ```
95+
pub fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Put {
96+
Put::new(self.rpc(), key.into(), value.into())
97+
}
98+
99+
/// Create a new [`BatchPut`](BatchPut) request.
100+
///
101+
/// Once resolved this request will result in the setting of the value associated with the given key.
102+
///
103+
/// ```rust,no_run
104+
/// # #![feature(async_await)]
105+
/// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client};
106+
/// # use futures::prelude::*;
107+
/// # futures::executor::block_on(async {
108+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
109+
/// # let connected_client = connecting_client.await.unwrap();
110+
/// let kvpair1 = ("PD", "Go");
111+
/// let kvpair2 = ("TiKV", "Rust");
112+
/// let iterable = vec![kvpair1, kvpair2];
113+
/// let req = connected_client.batch_put(iterable);
114+
/// let result: () = req.await.unwrap();
115+
/// # });
116+
/// ```
117+
pub fn batch_put(&self, pairs: impl IntoIterator<Item = impl Into<KvPair>>) -> BatchPut {
118+
BatchPut::new(self.rpc(), pairs.into_iter().map(Into::into).collect())
119+
}
120+
121+
/// Create a new [`Delete`](Delete) request.
122+
///
123+
/// Once resolved this request will result in the deletion of the given key.
124+
///
125+
/// ```rust,no_run
126+
/// # #![feature(async_await)]
127+
/// # use tikv_client::{Key, Config, raw::Client};
128+
/// # use futures::prelude::*;
129+
/// # futures::executor::block_on(async {
130+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
131+
/// # let connected_client = connecting_client.await.unwrap();
132+
/// let key = "TiKV";
133+
/// let req = connected_client.delete(key);
134+
/// let result: () = req.await.unwrap();
135+
/// # });
136+
/// ```
137+
pub fn delete(&self, key: impl Into<Key>) -> Delete {
138+
Delete::new(self.rpc(), key.into())
139+
}
140+
141+
/// Create a new [`BatchDelete`](BatchDelete) request.
142+
///
143+
/// Once resolved this request will result in the deletion of the given keys.
144+
///
145+
/// ```rust,no_run
146+
/// # #![feature(async_await)]
147+
/// # use tikv_client::{Config, raw::Client};
148+
/// # use futures::prelude::*;
149+
/// # futures::executor::block_on(async {
150+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
151+
/// # let connected_client = connecting_client.await.unwrap();
152+
/// let keys = vec!["TiKV", "TiDB"];
153+
/// let req = connected_client.batch_delete(keys);
154+
/// let result: () = req.await.unwrap();
155+
/// # });
156+
/// ```
157+
pub fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchDelete {
158+
BatchDelete::new(self.rpc(), keys.into_iter().map(Into::into).collect())
159+
}
160+
161+
/// Create a new [`Scan`](Scan) request.
162+
///
163+
/// Once resolved this request will result in a scanner over the given keys.
164+
///
165+
/// ```rust,no_run
166+
/// # #![feature(async_await)]
167+
/// # use tikv_client::{KvPair, Config, raw::Client};
168+
/// # use futures::prelude::*;
169+
/// # futures::executor::block_on(async {
170+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
171+
/// # let connected_client = connecting_client.await.unwrap();
172+
/// let inclusive_range = "TiKV"..="TiDB";
173+
/// let req = connected_client.scan(inclusive_range, 2);
174+
/// let result: Vec<KvPair> = req.await.unwrap();
175+
/// # });
176+
/// ```
177+
pub fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Scan {
178+
Scan::new(self.rpc(), range.into(), limit)
179+
}
180+
181+
/// Create a new [`BatchScan`](BatchScan) request.
182+
///
183+
/// Once resolved this request will result in a set of scanners over the given keys.
184+
///
185+
/// ```rust,no_run
186+
/// # #![feature(async_await)]
187+
/// # use tikv_client::{Key, Config, raw::Client};
188+
/// # use futures::prelude::*;
189+
/// # futures::executor::block_on(async {
190+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
191+
/// # let connected_client = connecting_client.await.unwrap();
192+
/// let inclusive_range1 = "TiDB"..="TiKV";
193+
/// let inclusive_range2 = "TiKV"..="TiSpark";
194+
/// let iterable = vec![inclusive_range1, inclusive_range2];
195+
/// let req = connected_client.batch_scan(iterable, 2);
196+
/// let result = req.await;
197+
/// # });
198+
/// ```
199+
pub fn batch_scan(
200+
&self,
201+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
202+
each_limit: u32,
203+
) -> BatchScan {
204+
BatchScan::new(
205+
self.rpc(),
206+
ranges.into_iter().map(Into::into).collect(),
207+
each_limit,
208+
)
209+
}
210+
211+
/// Create a new [`DeleteRange`](DeleteRange) request.
212+
///
213+
/// Once resolved this request will result in the deletion of all keys over the given range.
214+
///
215+
/// ```rust,no_run
216+
/// # #![feature(async_await)]
217+
/// # use tikv_client::{Key, Config, raw::Client};
218+
/// # use futures::prelude::*;
219+
/// # futures::executor::block_on(async {
220+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
221+
/// # let connected_client = connecting_client.await.unwrap();
222+
/// let inclusive_range = "TiKV"..="TiDB";
223+
/// let req = connected_client.delete_range(inclusive_range);
224+
/// let result: () = req.await.unwrap();
225+
/// # });
226+
/// ```
227+
pub fn delete_range(&self, range: impl Into<BoundRange>) -> DeleteRange {
228+
DeleteRange::new(self.rpc(), range.into())
229+
}
230+
}
231+
232+
/// An unresolved [`Client`](Client) connection to a TiKV cluster.
233+
///
234+
/// Once resolved it will result in a connected [`Client`](Client).
235+
///
236+
/// ```rust,no_run
237+
/// # #![feature(async_await)]
238+
/// use tikv_client::{Config, raw::{Client, Connect}};
239+
/// use futures::prelude::*;
240+
///
241+
/// # futures::executor::block_on(async {
242+
/// let connect: Connect = Client::connect(Config::default());
243+
/// let client: Client = connect.await.unwrap();
244+
/// # });
245+
/// ```
246+
pub struct Connect {
247+
config: Config,
248+
}
249+
250+
impl Connect {
251+
fn new(config: Config) -> Self {
252+
Connect { config }
253+
}
254+
}
255+
256+
impl Future for Connect {
257+
type Output = Result<Client>;
258+
259+
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
260+
let config = &self.config;
261+
let rpc = Arc::new(RpcClient::connect(config)?);
262+
Poll::Ready(Ok(Client { rpc }))
263+
}
264+
}

src/raw/mod.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
//! Raw related functionality.
4+
//!
5+
//! Using the [`raw::Client`](raw::Client) you can utilize TiKV's raw interface.
6+
//!
7+
//! This interface offers optimal performance as it does not require coordination with a timestamp
8+
//! oracle, while the transactional interface does.
9+
//!
10+
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
11+
//!
12+
13+
pub use self::client::{Client, Connect};
14+
pub use self::requests::{
15+
BatchDelete, BatchGet, BatchPut, BatchScan, Delete, DeleteRange, Get, Put, Scan,
16+
};
17+
18+
use std::fmt;
19+
20+
mod client;
21+
mod requests;
22+
23+
/// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests.
24+
///
25+
/// TiKV uses RocksDB's `ColumnFamily` support. You can learn more about RocksDB's `ColumnFamily`s [on their wiki](https://github.com/facebook/rocksdb/wiki/Column-Families).
26+
///
27+
/// By default in TiKV data is stored in three different `ColumnFamily` values, configurable in the TiKV server's configuration:
28+
///
29+
/// * Default: Where real user data is stored. Set by `[rocksdb.defaultcf]`.
30+
/// * Write: Where MVCC and index related data are stored. Set by `[rocksdb.writecf]`.
31+
/// * Lock: Where lock information is stored. Set by `[rocksdb.lockcf]`.
32+
///
33+
/// Not providing a call a `ColumnFamily` means it will use the default value of `default`.
34+
///
35+
/// The best (and only) way to create a [`ColumnFamily`](ColumnFamily) is via the `From` implementation:
36+
///
37+
/// ```rust
38+
/// # use tikv_client::raw::ColumnFamily;
39+
///
40+
/// let cf = ColumnFamily::from("write");
41+
/// let cf = ColumnFamily::from(String::from("write"));
42+
/// let cf = ColumnFamily::from(&String::from("write"));
43+
/// ```
44+
///
45+
/// **But, you should not need to worry about all this:** Many functions which accept a
46+
/// `ColumnFamily` accept an `Into<ColumnFamily>`, which means all of the above types can be passed
47+
/// directly to those functions.
48+
#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
49+
pub struct ColumnFamily(String);
50+
51+
impl<T: Into<String>> From<T> for ColumnFamily {
52+
fn from(i: T) -> ColumnFamily {
53+
ColumnFamily(i.into())
54+
}
55+
}
56+
57+
impl fmt::Display for ColumnFamily {
58+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
59+
self.0.fmt(f)
60+
}
61+
}

0 commit comments

Comments
 (0)