Skip to content

Commit 3a2060f

Browse files
committed
add ttl for raw client
Signed-off-by: andylokandy <andylokandy@hotmail.com>
1 parent c045d1e commit 3a2060f

File tree

7 files changed

+112
-17
lines changed

7 files changed

+112
-17
lines changed

src/raw/client.rs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,17 @@ impl<PdC: PdClient> Client<PdC> {
219219
.map(|r| r.into_iter().map(Into::into).collect())
220220
}
221221

222+
pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
223+
debug!(self.logger, "invoking raw get_key_ttl_secs request");
224+
let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone());
225+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
226+
.retry_multi_region(DEFAULT_REGION_BACKOFF)
227+
.merge(CollectSingle)
228+
.post_process_default()
229+
.plan();
230+
plan.execute().await
231+
}
232+
222233
/// Create a new 'put' request.
223234
///
224235
/// Once resolved this request will result in the setting of the value associated with the given key.
@@ -236,8 +247,23 @@ impl<PdC: PdClient> Client<PdC> {
236247
/// # });
237248
/// ```
238249
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
239-
debug!(self.logger, "invoking raw put request");
240-
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
250+
self.put_with_ttl(key, value, 0).await
251+
}
252+
253+
pub async fn put_with_ttl(
254+
&self,
255+
key: impl Into<Key>,
256+
value: impl Into<Value>,
257+
ttl_secs: u64,
258+
) -> Result<()> {
259+
debug!(self.logger, "invoking raw put_with_ttl request");
260+
let request = new_raw_put_request(
261+
key.into(),
262+
value.into(),
263+
self.cf.clone(),
264+
ttl_secs,
265+
self.atomic,
266+
);
241267
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
242268
.retry_multi_region(DEFAULT_REGION_BACKOFF)
243269
.merge(CollectSingle)
@@ -268,9 +294,19 @@ impl<PdC: PdClient> Client<PdC> {
268294
&self,
269295
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
270296
) -> Result<()> {
271-
debug!(self.logger, "invoking raw batch_put request");
297+
self.batch_put_with_ttl(pairs.into_iter().map(|pair| (pair, 0)))
298+
.await
299+
}
300+
301+
pub async fn batch_put_with_ttl(
302+
&self,
303+
pairs_with_ttls_secs: impl IntoIterator<Item = (impl Into<KvPair>, u64)>,
304+
) -> Result<()> {
305+
debug!(self.logger, "invoking raw batch_put_with_ttl request");
272306
let request = new_raw_batch_put_request(
273-
pairs.into_iter().map(Into::into),
307+
pairs_with_ttls_secs
308+
.into_iter()
309+
.map(|(pair, ttl)| (pair.into(), ttl)),
274310
self.cf.clone(),
275311
self.atomic,
276312
);

src/raw/lowering.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,30 @@ pub fn new_raw_batch_get_request(
2121
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
2222
}
2323

24+
pub fn new_raw_get_key_ttl_request(
25+
key: Key,
26+
cf: Option<ColumnFamily>,
27+
) -> kvrpcpb::RawGetKeyTtlRequest {
28+
requests::new_raw_get_key_ttl_request(key.into(), cf)
29+
}
30+
2431
pub fn new_raw_put_request(
2532
key: Key,
2633
value: Value,
2734
cf: Option<ColumnFamily>,
35+
ttl: u64,
2836
atomic: bool,
2937
) -> kvrpcpb::RawPutRequest {
30-
requests::new_raw_put_request(key.into(), value, cf, atomic)
38+
requests::new_raw_put_request(key.into(), value, cf, ttl, atomic)
3139
}
3240

3341
pub fn new_raw_batch_put_request(
34-
pairs: impl Iterator<Item = KvPair>,
42+
pairs_with_ttl: impl Iterator<Item = (KvPair, u64)>,
3543
cf: Option<ColumnFamily>,
3644
atomic: bool,
3745
) -> kvrpcpb::RawBatchPutRequest {
38-
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
46+
let (pairs, ttls) = pairs_with_ttl.map(|(pair, ttl)| (pair.into(), ttl)).unzip();
47+
requests::new_raw_batch_put_request(pairs, cf, ttls, atomic)
3948
}
4049

4150
pub fn new_raw_delete_request(

src/raw/requests.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tikv_client_store::Request;
1010

1111
use super::RawRpcRequest;
1212
use crate::{
13-
collect_first,
13+
collect_single,
1414
pd::PdClient,
1515
request::{
1616
plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge,
@@ -35,7 +35,7 @@ impl KvRequest for kvrpcpb::RawGetRequest {
3535
}
3636

3737
shardable_key!(kvrpcpb::RawGetRequest);
38-
collect_first!(kvrpcpb::RawGetResponse);
38+
collect_single!(kvrpcpb::RawGetResponse);
3939

4040
impl SingleKey for kvrpcpb::RawGetRequest {
4141
fn key(&self) -> &Vec<u8> {
@@ -84,16 +84,55 @@ impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
8484
}
8585
}
8686

87+
pub fn new_raw_get_key_ttl_request(
88+
key: Vec<u8>,
89+
cf: Option<ColumnFamily>,
90+
) -> kvrpcpb::RawGetKeyTtlRequest {
91+
let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
92+
req.set_key(key);
93+
req.maybe_set_cf(cf);
94+
95+
req
96+
}
97+
98+
impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
99+
type Response = kvrpcpb::RawGetKeyTtlResponse;
100+
}
101+
102+
shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
103+
collect_single!(kvrpcpb::RawGetKeyTtlResponse);
104+
105+
impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
106+
fn key(&self) -> &Vec<u8> {
107+
&self.key
108+
}
109+
}
110+
111+
impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
112+
type Out = Option<u64>;
113+
114+
fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
115+
let input = input?;
116+
Ok(if input.not_found {
117+
None
118+
} else {
119+
Some(input.ttl)
120+
})
121+
}
122+
}
123+
87124
pub fn new_raw_put_request(
88125
key: Vec<u8>,
89126
value: Vec<u8>,
90127
cf: Option<ColumnFamily>,
128+
ttl: u64,
91129
atomic: bool,
92130
) -> kvrpcpb::RawPutRequest {
93131
let mut req = kvrpcpb::RawPutRequest::default();
94132
req.set_key(key);
95133
req.set_value(value);
96134
req.maybe_set_cf(cf);
135+
req.set_ttl(ttl);
97136
req.set_for_cas(atomic);
98137

99138
req
@@ -104,7 +143,7 @@ impl KvRequest for kvrpcpb::RawPutRequest {
104143
}
105144

106145
shardable_key!(kvrpcpb::RawPutRequest);
107-
collect_first!(kvrpcpb::RawPutResponse);
146+
collect_single!(kvrpcpb::RawPutResponse);
108147
impl SingleKey for kvrpcpb::RawPutRequest {
109148
fn key(&self) -> &Vec<u8> {
110149
&self.key
@@ -114,11 +153,13 @@ impl SingleKey for kvrpcpb::RawPutRequest {
114153
pub fn new_raw_batch_put_request(
115154
pairs: Vec<kvrpcpb::KvPair>,
116155
cf: Option<ColumnFamily>,
156+
ttls: Vec<u64>,
117157
atomic: bool,
118158
) -> kvrpcpb::RawBatchPutRequest {
119159
let mut req = kvrpcpb::RawBatchPutRequest::default();
120160
req.set_pairs(pairs);
121161
req.maybe_set_cf(cf);
162+
req.set_ttls(ttls);
122163
req.set_for_cas(atomic);
123164

124165
req
@@ -168,7 +209,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
168209
}
169210

170211
shardable_key!(kvrpcpb::RawDeleteRequest);
171-
collect_first!(kvrpcpb::RawDeleteResponse);
212+
collect_single!(kvrpcpb::RawDeleteResponse);
172213
impl SingleKey for kvrpcpb::RawDeleteRequest {
173214
fn key(&self) -> &Vec<u8> {
174215
&self.key
@@ -314,7 +355,7 @@ impl KvRequest for kvrpcpb::RawCasRequest {
314355
}
315356

316357
shardable_key!(kvrpcpb::RawCasRequest);
317-
collect_first!(kvrpcpb::RawCasResponse);
358+
collect_single!(kvrpcpb::RawCasResponse);
318359
impl SingleKey for kvrpcpb::RawCasRequest {
319360
fn key(&self) -> &Vec<u8> {
320361
&self.key
@@ -445,6 +486,7 @@ macro_rules! impl_raw_rpc_request {
445486

446487
impl_raw_rpc_request!(RawGetRequest);
447488
impl_raw_rpc_request!(RawBatchGetRequest);
489+
impl_raw_rpc_request!(RawGetKeyTtlRequest);
448490
impl_raw_rpc_request!(RawPutRequest);
449491
impl_raw_rpc_request!(RawBatchPutRequest);
450492
impl_raw_rpc_request!(RawDeleteRequest);
@@ -456,6 +498,7 @@ impl_raw_rpc_request!(RawCasRequest);
456498

457499
impl HasLocks for kvrpcpb::RawGetResponse {}
458500
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
501+
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
459502
impl HasLocks for kvrpcpb::RawPutResponse {}
460503
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
461504
impl HasLocks for kvrpcpb::RawDeleteResponse {}

src/request/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ pub struct Collect;
303303
pub struct CollectSingle;
304304

305305
#[macro_export]
306-
macro_rules! collect_first {
306+
macro_rules! collect_single {
307307
($type_: ty) => {
308308
impl Merge<$type_> for CollectSingle {
309309
type Out = $type_;

src/transaction/requests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use crate::{
4-
collect_first,
4+
collect_single,
55
pd::PdClient,
66
request::{
77
Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process,
@@ -74,7 +74,7 @@ impl KvRequest for kvrpcpb::GetRequest {
7474
}
7575

7676
shardable_key!(kvrpcpb::GetRequest);
77-
collect_first!(kvrpcpb::GetResponse);
77+
collect_single!(kvrpcpb::GetResponse);
7878
impl SingleKey for kvrpcpb::GetRequest {
7979
fn key(&self) -> &Vec<u8> {
8080
&self.key
@@ -183,7 +183,7 @@ impl KvRequest for kvrpcpb::CleanupRequest {
183183
}
184184

185185
shardable_key!(kvrpcpb::CleanupRequest);
186-
collect_first!(kvrpcpb::CleanupResponse);
186+
collect_single!(kvrpcpb::CleanupResponse);
187187
impl SingleKey for kvrpcpb::CleanupRequest {
188188
fn key(&self) -> &Vec<u8> {
189189
&self.key
@@ -489,7 +489,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
489489
}
490490
}
491491

492-
collect_first!(TxnHeartBeatResponse);
492+
collect_single!(TxnHeartBeatResponse);
493493

494494
impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
495495
fn key(&self) -> &Vec<u8> {

tikv-client-store/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse);
5757
has_region_error!(kvrpcpb::GcResponse);
5858
has_region_error!(kvrpcpb::RawGetResponse);
5959
has_region_error!(kvrpcpb::RawBatchGetResponse);
60+
has_region_error!(kvrpcpb::RawGetKeyTtlResponse);
6061
has_region_error!(kvrpcpb::RawPutResponse);
6162
has_region_error!(kvrpcpb::RawBatchPutResponse);
6263
has_region_error!(kvrpcpb::RawDeleteResponse);
@@ -109,6 +110,7 @@ macro_rules! has_str_error {
109110
}
110111

111112
has_str_error!(kvrpcpb::RawGetResponse);
113+
has_str_error!(kvrpcpb::RawGetKeyTtlResponse);
112114
has_str_error!(kvrpcpb::RawPutResponse);
113115
has_str_error!(kvrpcpb::RawBatchPutResponse);
114116
has_str_error!(kvrpcpb::RawDeleteResponse);

tikv-client-store/src/request.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ macro_rules! impl_request {
4747

4848
impl_request!(RawGetRequest, raw_get_async_opt, "raw_get");
4949
impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get");
50+
impl_request!(
51+
RawGetKeyTtlRequest,
52+
raw_get_key_ttl_async_opt,
53+
"raw_get_key_ttl"
54+
);
5055
impl_request!(RawPutRequest, raw_put_async_opt, "raw_put");
5156
impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put");
5257
impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete");

0 commit comments

Comments
 (0)