Skip to content

Commit 69aed77

Browse files
committed
Split up the raw and transaction modules
Signed-off-by: Nick Cameron <nrc@ncameron.org>
1 parent 8ae2e19 commit 69aed77

File tree

7 files changed

+1480
-1425
lines changed

7 files changed

+1480
-1425
lines changed

src/raw/client.rs

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
use super::requests::{
4+
BatchDeleteInner, BatchGetInner, BatchPutInner, BatchScanInner, DeleteInner, DeleteRangeInner,
5+
GetInner, PutInner, ScanInner,
6+
};
7+
use super::{BatchDelete, BatchGet, BatchPut, BatchScan, Delete, DeleteRange, Get, Put, Scan};
8+
use crate::{rpc::RpcClient, Config, Key, BoundRange, KvPair, Result, Value};
9+
10+
use futures::prelude::*;
11+
use futures::task::{Context, Poll};
12+
use std::{pin::Pin, sync::Arc, u32};
13+
14+
/// The TiKV raw [`Client`](Client) is used to issue requests to the TiKV server and PD cluster.
15+
pub struct Client {
16+
rpc: Arc<RpcClient>,
17+
}
18+
19+
impl Client {
20+
/// Create a new [`Client`](Client) once the [`Connect`](Connect) resolves.
21+
///
22+
/// ```rust,no_run
23+
/// # #![feature(async_await)]
24+
/// # use tikv_client::{Config, raw::Client};
25+
/// # use futures::prelude::*;
26+
/// # futures::executor::block_on(async {
27+
/// let connect = Client::connect(Config::default());
28+
/// let client = connect.await.unwrap();
29+
/// # });
30+
/// ```
31+
pub fn connect(config: Config) -> Connect {
32+
Connect::new(config)
33+
}
34+
35+
#[inline]
36+
fn rpc(&self) -> Arc<RpcClient> {
37+
Arc::clone(&self.rpc)
38+
}
39+
40+
/// Create a new [`Get`](Get) request.
41+
///
42+
/// Once resolved this request will result in the fetching of the value associated with the
43+
/// given key.
44+
///
45+
/// ```rust,no_run
46+
/// # #![feature(async_await)]
47+
/// # use tikv_client::{Value, Config, raw::Client};
48+
/// # use futures::prelude::*;
49+
/// # futures::executor::block_on(async {
50+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
51+
/// # let connected_client = connecting_client.await.unwrap();
52+
/// let key = "TiKV";
53+
/// let req = connected_client.get(key);
54+
/// let result: Option<Value> = req.await.unwrap();
55+
/// # });
56+
/// ```
57+
pub fn get(&self, key: impl Into<Key>) -> Get {
58+
Get::new(self.rpc(), GetInner::new(key.into()))
59+
}
60+
61+
/// Create a new [`BatchGet`](BatchGet) request.
62+
///
63+
/// Once resolved this request will result in the fetching of the values associated with the
64+
/// given keys.
65+
///
66+
/// ```rust,no_run
67+
/// # #![feature(async_await)]
68+
/// # use tikv_client::{KvPair, Config, raw::Client};
69+
/// # use futures::prelude::*;
70+
/// # futures::executor::block_on(async {
71+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
72+
/// # let connected_client = connecting_client.await.unwrap();
73+
/// let keys = vec!["TiKV", "TiDB"];
74+
/// let req = connected_client.batch_get(keys);
75+
/// let result: Vec<KvPair> = req.await.unwrap();
76+
/// # });
77+
/// ```
78+
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
79+
BatchGet::new(
80+
self.rpc(),
81+
BatchGetInner::new(keys.into_iter().map(Into::into).collect()),
82+
)
83+
}
84+
85+
/// Create a new [`Put`](Put) request.
86+
///
87+
/// Once resolved this request will result in the setting of the value associated with the given key.
88+
///
89+
/// ```rust,no_run
90+
/// # #![feature(async_await)]
91+
/// # use tikv_client::{Key, Value, Config, raw::Client};
92+
/// # use futures::prelude::*;
93+
/// # futures::executor::block_on(async {
94+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
95+
/// # let connected_client = connecting_client.await.unwrap();
96+
/// let key = "TiKV";
97+
/// let val = "TiKV";
98+
/// let req = connected_client.put(key, val);
99+
/// let result: () = req.await.unwrap();
100+
/// # });
101+
/// ```
102+
pub fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Put {
103+
Put::new(self.rpc(), PutInner::new(key.into(), value.into()))
104+
}
105+
106+
/// Create a new [`BatchPut`](BatchPut) request.
107+
///
108+
/// Once resolved this request will result in the setting of the value associated with the given key.
109+
///
110+
/// ```rust,no_run
111+
/// # #![feature(async_await)]
112+
/// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client};
113+
/// # use futures::prelude::*;
114+
/// # futures::executor::block_on(async {
115+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
116+
/// # let connected_client = connecting_client.await.unwrap();
117+
/// let kvpair1 = ("PD", "Go");
118+
/// let kvpair2 = ("TiKV", "Rust");
119+
/// let iterable = vec![kvpair1, kvpair2];
120+
/// let req = connected_client.batch_put(iterable);
121+
/// let result: () = req.await.unwrap();
122+
/// # });
123+
/// ```
124+
pub fn batch_put(&self, pairs: impl IntoIterator<Item = impl Into<KvPair>>) -> BatchPut {
125+
BatchPut::new(
126+
self.rpc(),
127+
BatchPutInner::new(pairs.into_iter().map(Into::into).collect()),
128+
)
129+
}
130+
131+
/// Create a new [`Delete`](Delete) request.
132+
///
133+
/// Once resolved this request will result in the deletion of the given key.
134+
///
135+
/// ```rust,no_run
136+
/// # #![feature(async_await)]
137+
/// # use tikv_client::{Key, Config, raw::Client};
138+
/// # use futures::prelude::*;
139+
/// # futures::executor::block_on(async {
140+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
141+
/// # let connected_client = connecting_client.await.unwrap();
142+
/// let key = "TiKV";
143+
/// let req = connected_client.delete(key);
144+
/// let result: () = req.await.unwrap();
145+
/// # });
146+
/// ```
147+
pub fn delete(&self, key: impl Into<Key>) -> Delete {
148+
Delete::new(self.rpc(), DeleteInner::new(key.into()))
149+
}
150+
151+
/// Create a new [`BatchDelete`](BatchDelete) request.
152+
///
153+
/// Once resolved this request will result in the deletion of the given keys.
154+
///
155+
/// ```rust,no_run
156+
/// # #![feature(async_await)]
157+
/// # use tikv_client::{Config, raw::Client};
158+
/// # use futures::prelude::*;
159+
/// # futures::executor::block_on(async {
160+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
161+
/// # let connected_client = connecting_client.await.unwrap();
162+
/// let keys = vec!["TiKV", "TiDB"];
163+
/// let req = connected_client.batch_delete(keys);
164+
/// let result: () = req.await.unwrap();
165+
/// # });
166+
/// ```
167+
pub fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchDelete {
168+
BatchDelete::new(
169+
self.rpc(),
170+
BatchDeleteInner::new(keys.into_iter().map(Into::into).collect()),
171+
)
172+
}
173+
174+
/// Create a new [`Scan`](Scan) request.
175+
///
176+
/// Once resolved this request will result in a scanner over the given keys.
177+
///
178+
/// ```rust,no_run
179+
/// # #![feature(async_await)]
180+
/// # use tikv_client::{KvPair, Config, raw::Client};
181+
/// # use futures::prelude::*;
182+
/// # futures::executor::block_on(async {
183+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
184+
/// # let connected_client = connecting_client.await.unwrap();
185+
/// let inclusive_range = "TiKV"..="TiDB";
186+
/// let req = connected_client.scan(inclusive_range, 2);
187+
/// let result: Vec<KvPair> = req.await.unwrap();
188+
/// # });
189+
/// ```
190+
pub fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Scan {
191+
Scan::new(self.rpc(), ScanInner::new(range.into(), limit))
192+
}
193+
194+
/// Create a new [`BatchScan`](BatchScan) request.
195+
///
196+
/// Once resolved this request will result in a set of scanners over the given keys.
197+
///
198+
/// ```rust,no_run
199+
/// # #![feature(async_await)]
200+
/// # use tikv_client::{Key, Config, raw::Client};
201+
/// # use futures::prelude::*;
202+
/// # futures::executor::block_on(async {
203+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
204+
/// # let connected_client = connecting_client.await.unwrap();
205+
/// let inclusive_range1 = "TiDB"..="TiKV";
206+
/// let inclusive_range2 = "TiKV"..="TiSpark";
207+
/// let iterable = vec![inclusive_range1, inclusive_range2];
208+
/// let req = connected_client.batch_scan(iterable, 2);
209+
/// let result = req.await;
210+
/// # });
211+
/// ```
212+
pub fn batch_scan(
213+
&self,
214+
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
215+
each_limit: u32,
216+
) -> BatchScan {
217+
BatchScan::new(
218+
self.rpc(),
219+
BatchScanInner::new(
220+
ranges.into_iter().map(KeyRange::into).collect(),
221+
each_limit,
222+
),
223+
)
224+
}
225+
226+
/// Create a new [`DeleteRange`](DeleteRange) request.
227+
///
228+
/// Once resolved this request will result in the deletion of all keys over the given range.
229+
///
230+
/// ```rust,no_run
231+
/// # #![feature(async_await)]
232+
/// # use tikv_client::{Key, Config, raw::Client};
233+
/// # use futures::prelude::*;
234+
/// # futures::executor::block_on(async {
235+
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
236+
/// # let connected_client = connecting_client.await.unwrap();
237+
/// let inclusive_range = "TiKV"..="TiDB";
238+
/// let req = connected_client.delete_range(inclusive_range);
239+
/// let result: () = req.await.unwrap();
240+
/// # });
241+
/// ```
242+
pub fn delete_range(&self, range: impl Into<BoundRange>) -> DeleteRange {
243+
DeleteRange::new(self.rpc(), DeleteRangeInner::new(range.into()))
244+
}
245+
}
246+
247+
/// An unresolved [`Client`](Client) connection to a TiKV cluster.
248+
///
249+
/// Once resolved it will result in a connected [`Client`](Client).
250+
///
251+
/// ```rust,no_run
252+
/// # #![feature(async_await)]
253+
/// use tikv_client::{Config, raw::{Client, Connect}};
254+
/// use futures::prelude::*;
255+
///
256+
/// # futures::executor::block_on(async {
257+
/// let connect: Connect = Client::connect(Config::default());
258+
/// let client: Client = connect.await.unwrap();
259+
/// # });
260+
/// ```
261+
pub struct Connect {
262+
config: Config,
263+
}
264+
265+
impl Connect {
266+
fn new(config: Config) -> Self {
267+
Connect { config }
268+
}
269+
}
270+
271+
impl Future for Connect {
272+
type Output = Result<Client>;
273+
274+
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
275+
let config = &self.config;
276+
let rpc = Arc::new(RpcClient::connect(config)?);
277+
Poll::Ready(Ok(Client { rpc }))
278+
}
279+
}

0 commit comments

Comments
 (0)