Skip to content

Commit c6110dd

Browse files
authored
Adding a reverse scan API for raw client (#441)
* Adding a reverse scan API for raw client Signed-off-by: Rahul Rane <rahulrane50@gmail.com> * Addressed comments Signed-off-by: Rahul Rane <rahulrane50@gmail.com> * Addressed comments Signed-off-by: Rahul Rane <rahulrane50@gmail.com> --------- Signed-off-by: Rahul Rane <rahulrane50@gmail.com>
1 parent 6f9c133 commit c6110dd

File tree

4 files changed

+243
-7
lines changed

4 files changed

+243
-7
lines changed

src/raw/client.rs

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,40 @@ impl<PdC: PdClient> Client<PdC> {
500500
/// ```
501501
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
502502
debug!("invoking raw scan request");
503-
self.scan_inner(range.into(), limit, false).await
503+
self.scan_inner(range.into(), limit, false, false).await
504+
}
505+
506+
/// Create a new 'scan' request but scans in "reverse" direction.
507+
///
508+
/// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
509+
///
510+
/// If the number of eligible key-value pairs are greater than `limit`,
511+
/// only the first `limit` pairs are returned, ordered by the key.
512+
///
513+
///
514+
/// Reverse Scan queries continuous kv pairs in range [startKey, endKey),
515+
/// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs.
516+
/// The returned keys are in reversed lexicographical order.
517+
/// If you want to include the endKey or exclude the startKey, push a '\0' to the key.
518+
/// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
519+
/// # Examples
520+
/// ```rust,no_run
521+
/// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
522+
/// # use futures::prelude::*;
523+
/// # futures::executor::block_on(async {
524+
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
525+
/// let inclusive_range = "TiKV"..="TiDB";
526+
/// let req = client.scan_reverse(inclusive_range.into_owned(), 2);
527+
/// let result: Vec<KvPair> = req.await.unwrap();
528+
/// # });
529+
/// ```
530+
pub async fn scan_reverse(
531+
&self,
532+
range: impl Into<BoundRange>,
533+
limit: u32,
534+
) -> Result<Vec<KvPair>> {
535+
debug!("invoking raw reverse scan request");
536+
self.scan_inner(range.into(), limit, false, true).await
504537
}
505538

506539
/// Create a new 'scan' request that only returns the keys.
@@ -525,7 +558,40 @@ impl<PdC: PdClient> Client<PdC> {
525558
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
526559
debug!("invoking raw scan_keys request");
527560
Ok(self
528-
.scan_inner(range, limit, true)
561+
.scan_inner(range, limit, true, false)
562+
.await?
563+
.into_iter()
564+
.map(KvPair::into_key)
565+
.collect())
566+
}
567+
568+
/// Create a new 'scan' request that only returns the keys in reverse order.
569+
///
570+
/// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
571+
///
572+
/// If the number of eligible keys are greater than `limit`,
573+
/// only the first `limit` pairs are returned, ordered by the key.
574+
///
575+
///
576+
/// # Examples
577+
/// ```rust,no_run
578+
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
579+
/// # use futures::prelude::*;
580+
/// # futures::executor::block_on(async {
581+
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
582+
/// let inclusive_range = "TiKV"..="TiDB";
583+
/// let req = client.scan_keys(inclusive_range.into_owned(), 2);
584+
/// let result: Vec<Key> = req.await.unwrap();
585+
/// # });
586+
/// ```
587+
pub async fn scan_keys_reverse(
588+
&self,
589+
range: impl Into<BoundRange>,
590+
limit: u32,
591+
) -> Result<Vec<Key>> {
592+
debug!("invoking raw scan_keys request");
593+
Ok(self
594+
.scan_inner(range, limit, true, true)
529595
.await?
530596
.into_iter()
531597
.map(KvPair::into_key)
@@ -682,6 +748,7 @@ impl<PdC: PdClient> Client<PdC> {
682748
range: impl Into<BoundRange>,
683749
limit: u32,
684750
key_only: bool,
751+
reverse: bool,
685752
) -> Result<Vec<KvPair>> {
686753
if limit > MAX_RAW_KV_SCAN_LIMIT {
687754
return Err(Error::MaxScanLimitExceeded {
@@ -703,8 +770,13 @@ impl<PdC: PdClient> Client<PdC> {
703770
let mut cur_limit = limit;
704771

705772
while cur_limit > 0 {
706-
let request =
707-
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
773+
let request = new_raw_scan_request(
774+
cur_range.clone(),
775+
cur_limit,
776+
key_only,
777+
reverse,
778+
self.cf.clone(),
779+
);
708780
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
709781
.single_region_with_store(region_store.clone())
710782
.await?

src/raw/lowering.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub fn new_raw_scan_request(
8484
range: BoundRange,
8585
limit: u32,
8686
key_only: bool,
87+
reverse: bool,
8788
cf: Option<ColumnFamily>,
8889
) -> kvrpcpb::RawScanRequest {
8990
let (start_key, end_key) = range.into_keys();
@@ -92,6 +93,7 @@ pub fn new_raw_scan_request(
9293
end_key.unwrap_or_default().into(),
9394
limit,
9495
key_only,
96+
reverse,
9597
cf,
9698
)
9799
}

src/raw/requests.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,20 @@ pub fn new_raw_scan_request(
278278
end_key: Vec<u8>,
279279
limit: u32,
280280
key_only: bool,
281+
reverse: bool,
281282
cf: Option<ColumnFamily>,
282283
) -> kvrpcpb::RawScanRequest {
283284
let mut req = kvrpcpb::RawScanRequest::default();
284-
req.start_key = start_key;
285-
req.end_key = end_key;
285+
if !reverse {
286+
req.start_key = start_key;
287+
req.end_key = end_key;
288+
} else {
289+
req.start_key = end_key;
290+
req.end_key = start_key;
291+
}
286292
req.limit = limit;
287293
req.key_only = key_only;
294+
req.reverse = reverse;
288295
req.maybe_set_cf(cf);
289296

290297
req
@@ -294,7 +301,7 @@ impl KvRequest for kvrpcpb::RawScanRequest {
294301
type Response = kvrpcpb::RawScanResponse;
295302
}
296303

297-
range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan.
304+
range_request!(kvrpcpb::RawScanRequest);
298305
shardable_range!(kvrpcpb::RawScanRequest);
299306

300307
impl Merge<kvrpcpb::RawScanResponse> for Collect {

tests/integration_tests.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,62 @@ async fn raw_req() -> Result<()> {
561561
assert_eq!(res[5].1, "v4".as_bytes());
562562
assert_eq!(res[6].1, "v5".as_bytes());
563563

564+
// reverse scan
565+
566+
// By default end key is exclusive, so k5 is not included and start key in included
567+
let res = client
568+
.scan_reverse("k2".to_owned().."k5".to_owned(), 5)
569+
.await?;
570+
assert_eq!(res.len(), 3);
571+
assert_eq!(res[0].1, "v4".as_bytes());
572+
assert_eq!(res[1].1, "v3".as_bytes());
573+
assert_eq!(res[2].1, "v2".as_bytes());
574+
575+
// by default end key in exclusive and start key is inclusive but now exclude start key
576+
let res = client
577+
.scan_reverse("k2\0".to_owned().."k5".to_owned(), 5)
578+
.await?;
579+
assert_eq!(res.len(), 2);
580+
assert_eq!(res[0].1, "v4".as_bytes());
581+
assert_eq!(res[1].1, "v3".as_bytes());
582+
583+
// reverse scan
584+
// by default end key is exclusive and start key is inclusive but now include end key
585+
let res = client
586+
.scan_reverse("k2".to_owned()..="k5".to_owned(), 5)
587+
.await?;
588+
assert_eq!(res.len(), 4);
589+
assert_eq!(res[0].1, "v5".as_bytes());
590+
assert_eq!(res[1].1, "v4".as_bytes());
591+
assert_eq!(res[2].1, "v3".as_bytes());
592+
assert_eq!(res[3].1, "v2".as_bytes());
593+
594+
// by default end key is exclusive and start key is inclusive but now include end key and exclude start key
595+
let res = client
596+
.scan_reverse("k2\0".to_owned()..="k5".to_owned(), 5)
597+
.await?;
598+
assert_eq!(res.len(), 3);
599+
assert_eq!(res[0].1, "v5".as_bytes());
600+
assert_eq!(res[1].1, "v4".as_bytes());
601+
assert_eq!(res[2].1, "v3".as_bytes());
602+
603+
// limit results to first 2
604+
let res = client
605+
.scan_reverse("k2".to_owned().."k5".to_owned(), 2)
606+
.await?;
607+
assert_eq!(res.len(), 2);
608+
assert_eq!(res[0].1, "v4".as_bytes());
609+
assert_eq!(res[1].1, "v3".as_bytes());
610+
611+
// if endKey is not provided then it scan everything including end key
612+
let range = BoundRange::range_from(Key::from("k2".to_owned()));
613+
let res = client.scan_reverse(range, 20).await?;
614+
assert_eq!(res.len(), 4);
615+
assert_eq!(res[0].1, "v5".as_bytes());
616+
assert_eq!(res[1].1, "v4".as_bytes());
617+
assert_eq!(res[2].1, "v3".as_bytes());
618+
assert_eq!(res[3].1, "v2".as_bytes());
619+
564620
Ok(())
565621
}
566622

@@ -704,6 +760,105 @@ async fn raw_write_million() -> Result<()> {
704760
r = client.scan(.., limit).await?;
705761
assert_eq!(r.len(), limit as usize);
706762

763+
// test scan_reverse
764+
// test scan, key range from [0,0,0,0] to [255.0.0.0]
765+
let mut limit = 2000;
766+
let mut r = client.scan_reverse(.., limit).await?;
767+
assert_eq!(r.len(), 256);
768+
for (i, val) in r.iter().rev().enumerate() {
769+
let k: Vec<u8> = val.0.clone().into();
770+
assert_eq!(k[0], i as u8);
771+
}
772+
r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?;
773+
assert_eq!(r.len(), 156);
774+
for (i, val) in r.iter().rev().enumerate() {
775+
let k: Vec<u8> = val.0.clone().into();
776+
assert_eq!(k[0], i as u8 + 100);
777+
}
778+
r = client
779+
.scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
780+
.await?;
781+
assert_eq!(r.len(), 195);
782+
for (i, val) in r.iter().rev().enumerate() {
783+
let k: Vec<u8> = val.0.clone().into();
784+
assert_eq!(k[0], i as u8 + 5);
785+
}
786+
r = client
787+
.scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
788+
.await?;
789+
assert_eq!(r.len(), 196);
790+
for (i, val) in r.iter().rev().enumerate() {
791+
let k: Vec<u8> = val.0.clone().into();
792+
assert_eq!(k[0], i as u8 + 5);
793+
}
794+
r = client
795+
.scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
796+
.await?;
797+
assert_eq!(r.len(), 251);
798+
for (i, val) in r.iter().rev().enumerate() {
799+
let k: Vec<u8> = val.0.clone().into();
800+
assert_eq!(k[0], i as u8 + 5);
801+
}
802+
r = client
803+
.scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
804+
.await?;
805+
assert_eq!(r.len(), 0);
806+
r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?;
807+
assert_eq!(r.len(), 0);
808+
809+
limit = 3;
810+
let mut r = client.scan_reverse(.., limit).await?;
811+
let mut expected_start: u8 = 255 - limit as u8 + 1; // including endKey
812+
assert_eq!(r.len(), limit as usize);
813+
for (i, val) in r.iter().rev().enumerate() {
814+
let k: Vec<u8> = val.0.clone().into();
815+
assert_eq!(k[0], i as u8 + expected_start);
816+
}
817+
r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?;
818+
expected_start = 255 - limit as u8 + 1; // including endKey
819+
assert_eq!(r.len(), limit as usize);
820+
for (i, val) in r.iter().rev().enumerate() {
821+
let k: Vec<u8> = val.0.clone().into();
822+
assert_eq!(k[0], i as u8 + expected_start);
823+
}
824+
r = client
825+
.scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
826+
.await?;
827+
expected_start = 200 - limit as u8;
828+
assert_eq!(r.len(), limit as usize);
829+
for (i, val) in r.iter().rev().enumerate() {
830+
let k: Vec<u8> = val.0.clone().into();
831+
assert_eq!(k[0], i as u8 + expected_start);
832+
}
833+
r = client
834+
.scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
835+
.await?;
836+
expected_start = 200 - limit as u8 + 1; // including endKey
837+
assert_eq!(r.len(), limit as usize);
838+
for (i, val) in r.iter().rev().enumerate() {
839+
let k: Vec<u8> = val.0.clone().into();
840+
assert_eq!(k[0], i as u8 + expected_start);
841+
}
842+
r = client
843+
.scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
844+
.await?;
845+
expected_start = 255 - limit as u8 + 1; // including endKey
846+
assert_eq!(r.len(), limit as usize);
847+
for (i, val) in r.iter().rev().enumerate() {
848+
let k: Vec<u8> = val.0.clone().into();
849+
assert_eq!(k[0], i as u8 + expected_start);
850+
}
851+
r = client
852+
.scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
853+
.await?;
854+
assert_eq!(r.len(), 0);
855+
r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?;
856+
assert_eq!(r.len(), 0);
857+
858+
limit = 0;
859+
r = client.scan_reverse(.., limit).await?;
860+
assert_eq!(r.len(), limit as usize);
861+
707862
// test batch_scan
708863
for batch_num in 1..4 {
709864
let _ = client

0 commit comments

Comments
 (0)