Skip to content

Commit 2e81b06

Browse files
committed
add ttl to RawClient
Signed-off-by: Andy Lok <andylokandy@hotmail.com>
1 parent 2c831ba commit 2e81b06

File tree

8 files changed

+152
-15
lines changed

8 files changed

+152
-15
lines changed

src/raw/client.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,33 @@ impl<PdC: PdClient> Client<PdC> {
251251
.map(|r| r.into_iter().map(Into::into).collect())
252252
}
253253

254+
/// Create a new 'get key ttl' request.
255+
///
256+
/// Once resolved this request will result in the fetching of the alive time left for the
257+
/// given key.
258+
///
259+
/// Retuning `Ok(None)` indicates the key does not exist in TiKV.
260+
///
261+
/// # Examples
262+
/// # use tikv_client::{Value, Config, RawClient};
263+
/// # use futures::prelude::*;
264+
/// # futures::executor::block_on(async {
265+
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
266+
/// let key = "TiKV".to_owned();
267+
/// let req = client.get_key_ttl_secs(key);
268+
/// let result: Option<Value> = req.await.unwrap();
269+
/// # });
270+
pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
271+
debug!("invoking raw get_key_ttl_secs request");
272+
let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone());
273+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
274+
.retry_multi_region(self.backoff.clone())
275+
.merge(CollectSingle)
276+
.post_process_default()
277+
.plan();
278+
plan.execute().await
279+
}
280+
254281
/// Create a new 'put' request.
255282
///
256283
/// Once resolved this request will result in the setting of the value associated with the given key.
@@ -268,8 +295,23 @@ impl<PdC: PdClient> Client<PdC> {
268295
/// # });
269296
/// ```
270297
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
298+
self.put_with_ttl(key, value, 0).await
299+
}
300+
301+
pub async fn put_with_ttl(
302+
&self,
303+
key: impl Into<Key>,
304+
value: impl Into<Value>,
305+
ttl_secs: u64,
306+
) -> Result<()> {
271307
debug!("invoking raw put request");
272-
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
308+
let request = new_raw_put_request(
309+
key.into(),
310+
value.into(),
311+
self.cf.clone(),
312+
ttl_secs,
313+
self.atomic,
314+
);
273315
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
274316
.retry_multi_region(self.backoff.clone())
275317
.merge(CollectSingle)
@@ -299,10 +341,19 @@ impl<PdC: PdClient> Client<PdC> {
299341
pub async fn batch_put(
300342
&self,
301343
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
344+
) -> Result<()> {
345+
self.batch_put_with_ttl(pairs, std::iter::repeat(0)).await
346+
}
347+
348+
pub async fn batch_put_with_ttl(
349+
&self,
350+
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
351+
ttls: impl IntoIterator<Item = u64>,
302352
) -> Result<()> {
303353
debug!("invoking raw batch_put request");
304354
let request = new_raw_batch_put_request(
305-
pairs.into_iter().map(Into::into),
355+
pairs.into_iter().map(|pair| pair.into()),
356+
ttls.into_iter(),
306357
self.cf.clone(),
307358
self.atomic,
308359
);

src/raw/lowering.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,33 @@ pub fn new_raw_batch_get_request(
2828
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
2929
}
3030

31+
pub fn new_raw_get_key_ttl_request(
32+
key: Key,
33+
cf: Option<ColumnFamily>,
34+
) -> kvrpcpb::RawGetKeyTtlRequest {
35+
requests::new_raw_get_key_ttl_request(key.into(), cf)
36+
}
37+
3138
pub fn new_raw_put_request(
3239
key: Key,
3340
value: Value,
3441
cf: Option<ColumnFamily>,
42+
ttl: u64,
3543
atomic: bool,
3644
) -> kvrpcpb::RawPutRequest {
37-
requests::new_raw_put_request(key.into(), value, cf, atomic)
45+
requests::new_raw_put_request(key.into(), value, ttl, cf, atomic)
3846
}
3947

4048
pub fn new_raw_batch_put_request(
4149
pairs: impl Iterator<Item = KvPair>,
50+
ttls: impl Iterator<Item = u64>,
4251
cf: Option<ColumnFamily>,
4352
atomic: bool,
4453
) -> kvrpcpb::RawBatchPutRequest {
45-
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
54+
let pairs = pairs.map(Into::into).collect::<Vec<_>>();
55+
let ttls = ttls.collect::<Vec<_>>();
56+
assert_eq!(pairs.len(), ttls.len());
57+
requests::new_raw_batch_put_request(pairs, ttls, cf, atomic)
4658
}
4759

4860
pub fn new_raw_delete_request(

src/raw/requests.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures::stream::BoxStream;
1010
use tonic::transport::Channel;
1111

1212
use super::RawRpcRequest;
13-
use crate::collect_first;
13+
use crate::collect_single;
1414
use crate::pd::PdClient;
1515
use crate::proto::kvrpcpb;
1616
use crate::proto::metapb;
@@ -52,7 +52,7 @@ impl KvRequest for kvrpcpb::RawGetRequest {
5252
}
5353

5454
shardable_key!(kvrpcpb::RawGetRequest);
55-
collect_first!(kvrpcpb::RawGetResponse);
55+
collect_single!(kvrpcpb::RawGetResponse);
5656

5757
impl SingleKey for kvrpcpb::RawGetRequest {
5858
fn key(&self) -> &Vec<u8> {
@@ -101,15 +101,54 @@ impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
101101
}
102102
}
103103

104+
pub fn new_raw_get_key_ttl_request(
105+
key: Vec<u8>,
106+
cf: Option<ColumnFamily>,
107+
) -> kvrpcpb::RawGetKeyTtlRequest {
108+
let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
109+
req.key = key;
110+
req.maybe_set_cf(cf);
111+
112+
req
113+
}
114+
115+
impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
116+
type Response = kvrpcpb::RawGetKeyTtlResponse;
117+
}
118+
119+
shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
120+
collect_single!(kvrpcpb::RawGetKeyTtlResponse);
121+
122+
impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
123+
fn key(&self) -> &Vec<u8> {
124+
&self.key
125+
}
126+
}
127+
128+
impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
129+
type Out = Option<u64>;
130+
131+
fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
132+
let input = input?;
133+
Ok(if input.not_found {
134+
None
135+
} else {
136+
Some(input.ttl)
137+
})
138+
}
139+
}
140+
104141
pub fn new_raw_put_request(
105142
key: Vec<u8>,
106143
value: Vec<u8>,
144+
ttl: u64,
107145
cf: Option<ColumnFamily>,
108146
atomic: bool,
109147
) -> kvrpcpb::RawPutRequest {
110148
let mut req = kvrpcpb::RawPutRequest::default();
111149
req.key = key;
112150
req.value = value;
151+
req.ttl = ttl;
113152
req.maybe_set_cf(cf);
114153
req.for_cas = atomic;
115154

@@ -121,7 +160,7 @@ impl KvRequest for kvrpcpb::RawPutRequest {
121160
}
122161

123162
shardable_key!(kvrpcpb::RawPutRequest);
124-
collect_first!(kvrpcpb::RawPutResponse);
163+
collect_single!(kvrpcpb::RawPutResponse);
125164
impl SingleKey for kvrpcpb::RawPutRequest {
126165
fn key(&self) -> &Vec<u8> {
127166
&self.key
@@ -130,11 +169,13 @@ impl SingleKey for kvrpcpb::RawPutRequest {
130169

131170
pub fn new_raw_batch_put_request(
132171
pairs: Vec<kvrpcpb::KvPair>,
172+
ttls: Vec<u64>,
133173
cf: Option<ColumnFamily>,
134174
atomic: bool,
135175
) -> kvrpcpb::RawBatchPutRequest {
136176
let mut req = kvrpcpb::RawBatchPutRequest::default();
137177
req.pairs = pairs;
178+
req.ttls = ttls;
138179
req.maybe_set_cf(cf);
139180
req.for_cas = atomic;
140181

@@ -185,7 +226,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
185226
}
186227

187228
shardable_key!(kvrpcpb::RawDeleteRequest);
188-
collect_first!(kvrpcpb::RawDeleteResponse);
229+
collect_single!(kvrpcpb::RawDeleteResponse);
189230
impl SingleKey for kvrpcpb::RawDeleteRequest {
190231
fn key(&self) -> &Vec<u8> {
191232
&self.key
@@ -331,7 +372,7 @@ impl KvRequest for kvrpcpb::RawCasRequest {
331372
}
332373

333374
shardable_key!(kvrpcpb::RawCasRequest);
334-
collect_first!(kvrpcpb::RawCasResponse);
375+
collect_single!(kvrpcpb::RawCasResponse);
335376
impl SingleKey for kvrpcpb::RawCasRequest {
336377
fn key(&self) -> &Vec<u8> {
337378
&self.key
@@ -463,6 +504,7 @@ macro_rules! impl_raw_rpc_request {
463504

464505
impl_raw_rpc_request!(RawGetRequest);
465506
impl_raw_rpc_request!(RawBatchGetRequest);
507+
impl_raw_rpc_request!(RawGetKeyTtlRequest);
466508
impl_raw_rpc_request!(RawPutRequest);
467509
impl_raw_rpc_request!(RawBatchPutRequest);
468510
impl_raw_rpc_request!(RawDeleteRequest);
@@ -474,6 +516,7 @@ impl_raw_rpc_request!(RawCasRequest);
474516

475517
impl HasLocks for kvrpcpb::RawGetResponse {}
476518
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
519+
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
477520
impl HasLocks for kvrpcpb::RawPutResponse {}
478521
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
479522
impl HasLocks for kvrpcpb::RawDeleteResponse {}

src/request/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ pub struct CollectSingle;
342342

343343
#[doc(hidden)]
344344
#[macro_export]
345-
macro_rules! collect_first {
345+
macro_rules! collect_single {
346346
($type_: ty) => {
347347
impl Merge<$type_> for CollectSingle {
348348
type Out = $type_;

src/store/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse);
5454
has_region_error!(kvrpcpb::GcResponse);
5555
has_region_error!(kvrpcpb::RawGetResponse);
5656
has_region_error!(kvrpcpb::RawBatchGetResponse);
57+
has_region_error!(kvrpcpb::RawGetKeyTtlResponse);
5758
has_region_error!(kvrpcpb::RawPutResponse);
5859
has_region_error!(kvrpcpb::RawBatchPutResponse);
5960
has_region_error!(kvrpcpb::RawDeleteResponse);
@@ -102,6 +103,7 @@ macro_rules! has_str_error {
102103
}
103104

104105
has_str_error!(kvrpcpb::RawGetResponse);
106+
has_str_error!(kvrpcpb::RawGetKeyTtlResponse);
105107
has_str_error!(kvrpcpb::RawPutResponse);
106108
has_str_error!(kvrpcpb::RawBatchPutResponse);
107109
has_str_error!(kvrpcpb::RawDeleteResponse);

src/store/request.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ macro_rules! impl_request {
6060

6161
impl_request!(RawGetRequest, raw_get, "raw_get");
6262
impl_request!(RawBatchGetRequest, raw_batch_get, "raw_batch_get");
63+
impl_request!(RawGetKeyTtlRequest, raw_get_key_ttl, "raw_get_key_ttl");
6364
impl_request!(RawPutRequest, raw_put, "raw_put");
6465
impl_request!(RawBatchPutRequest, raw_batch_put, "raw_batch_put");
6566
impl_request!(RawDeleteRequest, raw_delete, "raw_delete");

src/transaction/requests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures::stream::{self};
1010
use futures::StreamExt;
1111

1212
use super::transaction::TXN_COMMIT_BATCH_SIZE;
13-
use crate::collect_first;
13+
use crate::collect_single;
1414
use crate::common::Error::PessimisticLockError;
1515
use crate::pd::PdClient;
1616
use crate::proto::kvrpcpb::Action;
@@ -98,7 +98,7 @@ impl KvRequest for kvrpcpb::GetRequest {
9898
}
9999

100100
shardable_key!(kvrpcpb::GetRequest);
101-
collect_first!(kvrpcpb::GetResponse);
101+
collect_single!(kvrpcpb::GetResponse);
102102
impl SingleKey for kvrpcpb::GetRequest {
103103
fn key(&self) -> &Vec<u8> {
104104
&self.key
@@ -220,7 +220,7 @@ impl KvRequest for kvrpcpb::CleanupRequest {
220220
}
221221

222222
shardable_key!(kvrpcpb::CleanupRequest);
223-
collect_first!(kvrpcpb::CleanupResponse);
223+
collect_single!(kvrpcpb::CleanupResponse);
224224
impl SingleKey for kvrpcpb::CleanupRequest {
225225
fn key(&self) -> &Vec<u8> {
226226
&self.key
@@ -617,7 +617,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
617617
}
618618
}
619619

620-
collect_first!(TxnHeartBeatResponse);
620+
collect_single!(TxnHeartBeatResponse);
621621

622622
impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
623623
fn key(&self) -> &Vec<u8> {
@@ -681,7 +681,7 @@ impl SingleKey for kvrpcpb::CheckTxnStatusRequest {
681681
}
682682
}
683683

684-
collect_first!(kvrpcpb::CheckTxnStatusResponse);
684+
collect_single!(kvrpcpb::CheckTxnStatusResponse);
685685

686686
impl Process<kvrpcpb::CheckTxnStatusResponse> for DefaultProcessor {
687687
type Out = TransactionStatus;

tests/integration_tests.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,34 @@ async fn raw_write_million() -> Result<()> {
608608
Ok(())
609609
}
610610

611+
/// Tests raw ttl API.
612+
#[tokio::test]
613+
#[serial]
614+
async fn raw_ttl() -> Result<()> {
615+
init().await?;
616+
let client = RawClient::new(pd_addrs()).await?;
617+
let key1 = vec![1];
618+
let key2 = vec![2];
619+
let val = vec![42];
620+
621+
assert_eq!(client.get_key_ttl_secs(key1.clone()).await?, None);
622+
client.put_with_ttl(key1.clone(), val.clone(), 10).await?;
623+
assert_eq!(client.get(key1.clone()).await?, Some(val.clone()));
624+
assert_eq!(client.get_key_ttl_secs(key1.clone()).await?, Some(10));
625+
client
626+
.batch_put_with_ttl(
627+
vec![(key1.clone(), val.clone()), (key2.clone(), val.clone())],
628+
vec![20, 20],
629+
)
630+
.await?;
631+
assert_eq!(client.get(key1.clone()).await?, Some(val.clone()));
632+
assert_eq!(client.get(key2.clone()).await?, Some(val.clone()));
633+
assert_eq!(client.get_key_ttl_secs(key1.clone()).await?, Some(20));
634+
assert_eq!(client.get_key_ttl_secs(key2.clone()).await?, Some(20));
635+
636+
Ok(())
637+
}
638+
611639
#[tokio::test]
612640
#[serial]
613641
async fn txn_pessimistic_rollback() -> Result<()> {

0 commit comments

Comments
 (0)