Skip to content

Commit 53e7a29

Browse files
Fix batch_put ttl issue (#457)
* fixing the shard issue with batch_put Signed-off-by: limbooverlambda <schakra1@gmail.com> * PR feedback Signed-off-by: limbooverlambda <schakra1@gmail.com> * more make check fixes Signed-off-by: limbooverlambda <schakra1@gmail.com> * removing redundant map Signed-off-by: limbooverlambda <schakra1@gmail.com> * more PR feedback Signed-off-by: limbooverlambda <schakra1@gmail.com> * slight formatting change and remove another redundant clone Signed-off-by: limbooverlambda <schakra1@gmail.com> --------- Signed-off-by: limbooverlambda <schakra1@gmail.com> Co-authored-by: Ping Yu <yuping@pingcap.com>
1 parent ec8dbcc commit 53e7a29

File tree

5 files changed

+136
-19
lines changed

5 files changed

+136
-19
lines changed

src/kv/key.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use super::HexRepr;
1616
use crate::kv::codec::BytesEncoder;
1717
use crate::kv::codec::{self};
1818
use crate::proto::kvrpcpb;
19+
use crate::proto::kvrpcpb::KvPair;
1920

2021
const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
2122

@@ -79,6 +80,20 @@ impl AsRef<Key> for kvrpcpb::Mutation {
7980
}
8081
}
8182

83+
pub struct KvPairTTL(pub KvPair, pub u64);
84+
85+
impl AsRef<Key> for KvPairTTL {
86+
fn as_ref(&self) -> &Key {
87+
self.0.key.as_ref()
88+
}
89+
}
90+
91+
impl From<KvPairTTL> for (KvPair, u64) {
92+
fn from(value: KvPairTTL) -> Self {
93+
(value.0, value.1)
94+
}
95+
}
96+
8297
impl Key {
8398
/// The empty key.
8499
pub const EMPTY: Self = Key(Vec::new());

src/kv/kvpair.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::proto::kvrpcpb;
2525
///
2626
/// Many functions which accept a `KvPair` accept an `Into<KvPair>`, which means all of the above
2727
/// types (Like a `(Key, Value)`) can be passed directly to those functions.
28-
#[derive(Default, Clone, Eq, PartialEq)]
28+
#[derive(Default, Clone, Eq, PartialEq, Hash)]
2929
#[cfg_attr(test, derive(Arbitrary))]
3030
pub struct KvPair(pub Key, pub Value);
3131

src/kv/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod value;
1010
pub use bound_range::BoundRange;
1111
pub use bound_range::IntoOwnedRange;
1212
pub use key::Key;
13+
pub use key::KvPairTTL;
1314
pub use kvpair::KvPair;
1415
pub use value::Value;
1516

src/raw/client.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,36 @@ mod tests {
875875
use crate::proto::kvrpcpb;
876876
use crate::Result;
877877

878+
#[tokio::test]
879+
async fn test_batch_put_with_ttl() -> Result<()> {
880+
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
881+
move |req: &dyn Any| {
882+
if req.downcast_ref::<kvrpcpb::RawBatchPutRequest>().is_some() {
883+
let resp = kvrpcpb::RawBatchPutResponse {
884+
..Default::default()
885+
};
886+
Ok(Box::new(resp) as Box<dyn Any>)
887+
} else {
888+
unreachable!()
889+
}
890+
},
891+
)));
892+
let client = Client {
893+
rpc: pd_client,
894+
cf: Some(ColumnFamily::Default),
895+
backoff: DEFAULT_REGION_BACKOFF,
896+
atomic: false,
897+
keyspace: Keyspace::Enable { keyspace_id: 0 },
898+
};
899+
let pairs = vec![
900+
KvPair(vec![11].into(), vec![12]),
901+
KvPair(vec![11].into(), vec![12]),
902+
];
903+
let ttls = vec![0, 0];
904+
assert!(client.batch_put_with_ttl(pairs, ttls).await.is_ok());
905+
Ok(())
906+
}
907+
878908
#[tokio::test]
879909
async fn test_raw_coprocessor() -> Result<()> {
880910
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(

src/raw/requests.rs

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,8 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::any::Any;
4-
use std::ops::Range;
5-
use std::sync::Arc;
6-
use std::time::Duration;
7-
8-
use async_trait::async_trait;
9-
use futures::stream::BoxStream;
10-
use tonic::transport::Channel;
11-
123
use super::RawRpcRequest;
134
use crate::collect_single;
5+
use crate::kv::KvPairTTL;
146
use crate::pd::PdClient;
157
use crate::proto::kvrpcpb;
168
use crate::proto::metapb;
@@ -41,6 +33,13 @@ use crate::Key;
4133
use crate::KvPair;
4234
use crate::Result;
4335
use crate::Value;
36+
use async_trait::async_trait;
37+
use futures::stream::BoxStream;
38+
use std::any::Any;
39+
use std::ops::Range;
40+
use std::sync::Arc;
41+
use std::time::Duration;
42+
use tonic::transport::Channel;
4443

4544
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
4645
let mut req = kvrpcpb::RawGetRequest::default();
@@ -190,23 +189,28 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
190189
}
191190

192191
impl Shardable for kvrpcpb::RawBatchPutRequest {
193-
type Shard = Vec<kvrpcpb::KvPair>;
192+
type Shard = Vec<(kvrpcpb::KvPair, u64)>;
194193

195194
fn shards(
196195
&self,
197196
pd_client: &Arc<impl PdClient>,
198197
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
199-
let mut pairs = self.pairs.clone();
200-
pairs.sort_by(|a, b| a.key.cmp(&b.key));
201-
store_stream_for_keys(
202-
pairs.into_iter().map(Into::<KvPair>::into),
203-
pd_client.clone(),
204-
)
198+
let kvs = self.pairs.clone();
199+
let ttls = self.ttls.clone();
200+
let mut kv_ttl: Vec<KvPairTTL> = kvs
201+
.into_iter()
202+
.zip(ttls)
203+
.map(|(kv, ttl)| KvPairTTL(kv, ttl))
204+
.collect();
205+
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
206+
store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
205207
}
206208

207209
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
210+
let (pairs, ttls) = shard.into_iter().unzip();
208211
self.set_leader(&store.region_with_leader)?;
209-
self.pairs = shard;
212+
self.pairs = pairs;
213+
self.ttls = ttls;
210214
Ok(())
211215
}
212216
}
@@ -531,21 +535,35 @@ impl_raw_rpc_request!(RawDeleteRangeRequest);
531535
impl_raw_rpc_request!(RawCasRequest);
532536

533537
impl HasLocks for kvrpcpb::RawGetResponse {}
538+
534539
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
540+
535541
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
542+
536543
impl HasLocks for kvrpcpb::RawPutResponse {}
544+
537545
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
546+
538547
impl HasLocks for kvrpcpb::RawDeleteResponse {}
548+
539549
impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
550+
540551
impl HasLocks for kvrpcpb::RawScanResponse {}
552+
541553
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
554+
542555
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
556+
543557
impl HasLocks for kvrpcpb::RawCasResponse {}
558+
544559
impl HasLocks for kvrpcpb::RawCoprocessorResponse {}
545560

546561
#[cfg(test)]
547562
mod test {
548563
use std::any::Any;
564+
use std::collections::HashMap;
565+
use std::ops::Deref;
566+
use std::sync::Mutex;
549567

550568
use super::*;
551569
use crate::backoff::DEFAULT_REGION_BACKOFF;
@@ -555,7 +573,6 @@ mod test {
555573
use crate::proto::kvrpcpb;
556574
use crate::request::Keyspace;
557575
use crate::request::Plan;
558-
use crate::Key;
559576

560577
#[rstest::rstest]
561578
#[case(Keyspace::Disable)]
@@ -600,4 +617,58 @@ mod test {
600617
assert_eq!(scan.len(), 49);
601618
// FIXME test the keys returned.
602619
}
620+
621+
#[tokio::test]
622+
async fn test_raw_batch_put() -> Result<()> {
623+
let region1_kvs = vec![KvPair(vec![9].into(), vec![12])];
624+
let region1_ttls = vec![0];
625+
let region2_kvs = vec![
626+
KvPair(vec![11].into(), vec![12]),
627+
KvPair("FFF".to_string().as_bytes().to_vec().into(), vec![12]),
628+
];
629+
let region2_ttls = vec![0, 1];
630+
631+
let expected_map = HashMap::from([
632+
(region1_kvs.clone(), region1_ttls.clone()),
633+
(region2_kvs.clone(), region2_ttls.clone()),
634+
]);
635+
636+
let pairs: Vec<kvrpcpb::KvPair> = [region1_kvs, region2_kvs]
637+
.concat()
638+
.into_iter()
639+
.map(|kv| kv.into())
640+
.collect();
641+
let ttls = [region1_ttls, region2_ttls].concat();
642+
let cf = ColumnFamily::Default;
643+
644+
let actual_map: Arc<Mutex<HashMap<Vec<KvPair>, Vec<u64>>>> =
645+
Arc::new(Mutex::new(HashMap::new()));
646+
let fut_actual_map = actual_map.clone();
647+
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
648+
move |req: &dyn Any| {
649+
let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap();
650+
let kv_pair = req
651+
.pairs
652+
.clone()
653+
.into_iter()
654+
.map(|p| p.into())
655+
.collect::<Vec<KvPair>>();
656+
let ttls = req.ttls.clone();
657+
fut_actual_map.lock().unwrap().insert(kv_pair, ttls);
658+
let resp = kvrpcpb::RawBatchPutResponse::default();
659+
Ok(Box::new(resp) as Box<dyn Any>)
660+
},
661+
)));
662+
663+
let batch_put_request =
664+
new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false);
665+
let keyspace = Keyspace::Enable { keyspace_id: 0 };
666+
let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request)
667+
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
668+
.retry_multi_region(DEFAULT_REGION_BACKOFF)
669+
.plan();
670+
let _ = plan.execute().await;
671+
assert_eq!(actual_map.lock().unwrap().deref(), &expected_map);
672+
Ok(())
673+
}
603674
}

0 commit comments

Comments
 (0)