Skip to content

Commit 3f8ea11

Browse files
yongmaniosmanthus
andauthored
Add scan_reverse and scan_keys_reverse support for txnkv (#354)
Co-authored-by: iosmanthus <MyOsmanthusTree@gmail.com>
1 parent e9d0dcd commit 3f8ea11

File tree

6 files changed

+108
-17
lines changed

6 files changed

+108
-17
lines changed

src/transaction/buffer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl Buffer {
111111
range: BoundRange,
112112
limit: u32,
113113
update_cache: bool,
114+
reverse: bool,
114115
f: F,
115116
) -> Result<impl Iterator<Item = KvPair>>
116117
where
@@ -158,7 +159,13 @@ impl Buffer {
158159
.into_iter()
159160
.map(|(k, v)| KvPair::new(k, v))
160161
.collect::<Vec<_>>();
161-
res.sort_by_cached_key(|x| x.key().clone());
162+
163+
// TODO: use `BTreeMap` instead of `HashMap` to avoid sorting.
164+
if reverse {
165+
res.sort_unstable_by(|a, b| b.key().cmp(a.key()));
166+
} else {
167+
res.sort_unstable_by(|a, b| a.key().cmp(b.key()));
168+
}
162169

163170
Ok(res.into_iter().take(limit as usize))
164171
}

src/transaction/lowering.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub fn new_scan_request(
2323
timestamp: Timestamp,
2424
limit: u32,
2525
key_only: bool,
26+
reverse: bool,
2627
) -> kvrpcpb::ScanRequest {
2728
let (start_key, end_key) = range.into_keys();
2829
requests::new_scan_request(
@@ -31,6 +32,7 @@ pub fn new_scan_request(
3132
timestamp.version(),
3233
limit,
3334
key_only,
35+
reverse,
3436
)
3537
}
3638

src/transaction/requests.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,20 @@ pub fn new_scan_request(
125125
timestamp: u64,
126126
limit: u32,
127127
key_only: bool,
128+
reverse: bool,
128129
) -> kvrpcpb::ScanRequest {
129130
let mut req = kvrpcpb::ScanRequest::default();
130-
req.set_start_key(start_key);
131-
req.set_end_key(end_key);
131+
if !reverse {
132+
req.set_start_key(start_key);
133+
req.set_end_key(end_key);
134+
} else {
135+
req.set_start_key(end_key);
136+
req.set_end_key(start_key);
137+
}
132138
req.set_limit(limit);
133139
req.set_key_only(key_only);
134140
req.set_version(timestamp);
141+
req.set_reverse(reverse);
135142
req
136143
}
137144

src/transaction/snapshot.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
use crate::{BoundRange, Key, KvPair, Result, Transaction, Value};
44
use derive_new::new;
5-
use futures::stream::BoxStream;
65
use slog::Logger;
7-
use std::ops::RangeBounds;
86

97
/// A read-only transaction which reads at the given timestamp.
108
///
@@ -61,10 +59,26 @@ impl Snapshot {
6159
self.transaction.scan_keys(range, limit).await
6260
}
6361

64-
/// Unimplemented. Similar to scan, but in the reverse direction.
65-
#[allow(dead_code)]
66-
fn scan_reverse(&mut self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
62+
/// Similar to scan, but in the reverse direction.
63+
pub async fn scan_reverse(
64+
&mut self,
65+
range: impl Into<BoundRange>,
66+
limit: u32,
67+
) -> Result<impl Iterator<Item = KvPair>> {
6768
debug!(self.logger, "invoking scan_reverse request on snapshot");
68-
self.transaction.scan_reverse(range)
69+
self.transaction.scan_reverse(range, limit).await
70+
}
71+
72+
/// Similar to scan_keys, but in the reverse direction.
73+
pub async fn scan_keys_reverse(
74+
&mut self,
75+
range: impl Into<BoundRange>,
76+
limit: u32,
77+
) -> Result<impl Iterator<Item = Key>> {
78+
debug!(
79+
self.logger,
80+
"invoking scan_keys_reverse request on snapshot"
81+
);
82+
self.transaction.scan_keys_reverse(range, limit).await
6983
}
7084
}

src/transaction/transaction.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use crate::{
1212
};
1313
use derive_new::new;
1414
use fail::fail_point;
15-
use futures::{prelude::*, stream::BoxStream};
15+
use futures::prelude::*;
1616
use slog::Logger;
17-
use std::{iter, ops::RangeBounds, sync::Arc, time::Instant};
17+
use std::{iter, sync::Arc, time::Instant};
1818
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
1919
use tokio::{sync::RwLock, time::Duration};
2020

@@ -344,7 +344,7 @@ impl<PdC: PdClient> Transaction<PdC> {
344344
limit: u32,
345345
) -> Result<impl Iterator<Item = KvPair>> {
346346
debug!(self.logger, "invoking transactional scan request");
347-
self.scan_inner(range, limit, false).await
347+
self.scan_inner(range, limit, false, false).await
348348
}
349349

350350
/// Create a new 'scan' request that only returns the keys.
@@ -381,17 +381,39 @@ impl<PdC: PdClient> Transaction<PdC> {
381381
) -> Result<impl Iterator<Item = Key>> {
382382
debug!(self.logger, "invoking transactional scan_keys request");
383383
Ok(self
384-
.scan_inner(range, limit, true)
384+
.scan_inner(range, limit, true, false)
385385
.await?
386386
.map(KvPair::into_key))
387387
}
388388

389389
/// Create a 'scan_reverse' request.
390390
///
391391
/// Similar to [`scan`](Transaction::scan), but scans in the reverse direction.
392-
pub(crate) fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
392+
pub async fn scan_reverse(
393+
&mut self,
394+
range: impl Into<BoundRange>,
395+
limit: u32,
396+
) -> Result<impl Iterator<Item = KvPair>> {
393397
debug!(self.logger, "invoking transactional scan_reverse request");
394-
unimplemented!()
398+
self.scan_inner(range, limit, false, true).await
399+
}
400+
401+
/// Create a 'scan_keys_reverse' request.
402+
///
403+
/// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction.
404+
pub async fn scan_keys_reverse(
405+
&mut self,
406+
range: impl Into<BoundRange>,
407+
limit: u32,
408+
) -> Result<impl Iterator<Item = Key>> {
409+
debug!(
410+
self.logger,
411+
"invoking transactional scan_keys_reverse request"
412+
);
413+
Ok(self
414+
.scan_inner(range, limit, true, true)
415+
.await?
416+
.map(KvPair::into_key))
395417
}
396418

397419
/// Sets the value associated with the given key.
@@ -680,6 +702,7 @@ impl<PdC: PdClient> Transaction<PdC> {
680702
range: impl Into<BoundRange>,
681703
limit: u32,
682704
key_only: bool,
705+
reverse: bool,
683706
) -> Result<impl Iterator<Item = KvPair>> {
684707
self.check_allow_operation().await?;
685708
let timestamp = self.timestamp.clone();
@@ -691,8 +714,10 @@ impl<PdC: PdClient> Transaction<PdC> {
691714
range.into(),
692715
limit,
693716
!key_only,
717+
reverse,
694718
move |new_range, new_limit| async move {
695-
let request = new_scan_request(new_range, timestamp, new_limit, key_only);
719+
let request =
720+
new_scan_request(new_range, timestamp, new_limit, key_only, reverse);
696721
let plan = PlanBuilder::new(rpc, request)
697722
.resolve_lock(retry_options.lock_backoff)
698723
.retry_multi_region(retry_options.region_backoff)

tests/integration_tests.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::{
2222
iter,
2323
};
2424
use tikv_client::{
25-
transaction::HeartbeatOption, Error, Key, KvPair, RawClient, Result, Transaction,
25+
transaction::HeartbeatOption, BoundRange, Error, Key, KvPair, RawClient, Result, Transaction,
2626
TransactionClient, TransactionOptions, Value,
2727
};
2828

@@ -883,6 +883,42 @@ async fn txn_scan() -> Result<()> {
883883
Ok(())
884884
}
885885

886+
#[tokio::test]
887+
#[serial]
888+
async fn txn_scan_reverse() -> Result<()> {
889+
init().await?;
890+
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
891+
892+
let k1 = b"a1".to_vec();
893+
let k2 = b"a2".to_vec();
894+
let v1 = b"b1".to_vec();
895+
let v2 = b"b2".to_vec();
896+
897+
let reverse_resp = vec![
898+
(Key::from(k2.clone()), v2.clone()),
899+
(Key::from(k1.clone()), v1.clone()),
900+
];
901+
902+
// pessimistic
903+
let option = TransactionOptions::new_pessimistic().drop_check(tikv_client::CheckLevel::Warn);
904+
let mut t = client.begin_with_options(option.clone()).await?;
905+
t.put(k1.clone(), v1).await?;
906+
t.put(k2.clone(), v2).await?;
907+
t.commit().await?;
908+
909+
let mut t2 = client.begin_with_options(option).await?;
910+
let bound_range: BoundRange = (k1..=k2).into();
911+
let resp = t2
912+
.scan_reverse(bound_range, 2)
913+
.await?
914+
.map(|kv| (kv.0, kv.1))
915+
.collect::<Vec<(Key, Vec<u8>)>>();
916+
assert_eq!(resp, reverse_resp);
917+
t2.commit().await?;
918+
919+
Ok(())
920+
}
921+
886922
// helper function
887923
async fn get_u32(client: &RawClient, key: Vec<u8>) -> Result<u32> {
888924
let x = client.get(key).await?.unwrap();

0 commit comments

Comments
 (0)