Skip to content

Commit bbaf317

Browse files
authored
Fix reverse scan for scene of multiple regions (tikv#438)
* reproduce issue Signed-off-by: Ping Yu <yuping@pingcap.com> * fix reverse range Signed-off-by: Ping Yu <yuping@pingcap.com> --------- Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent bd14485 commit bbaf317

File tree

6 files changed

+143
-17
lines changed

6 files changed

+143
-17
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ export RUSTFLAGS=-Dwarnings
22

33
.PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all
44

5-
PD_ADDRS ?= "127.0.0.1:2379"
6-
MULTI_REGION ?= 1
5+
export PD_ADDRS ?= 127.0.0.1:2379
6+
export MULTI_REGION ?= 1
77

88
ALL_FEATURES := integration-tests
99

src/raw/requests.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ use crate::proto::kvrpcpb;
1616
use crate::proto::kvrpcpb::ApiVersion;
1717
use crate::proto::metapb;
1818
use crate::proto::tikvpb::tikv_client::TikvClient;
19+
use crate::range_request;
1920
use crate::request::plan::ResponseWithShard;
2021
use crate::request::Collect;
2122
use crate::request::CollectSingle;
2223
use crate::request::DefaultProcessor;
2324
use crate::request::KvRequest;
2425
use crate::request::Merge;
2526
use crate::request::Process;
27+
use crate::request::RangeRequest;
2628
use crate::request::Shardable;
2729
use crate::request::SingleKey;
2830
use crate::shardable_key;
@@ -227,6 +229,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
227229
type Response = kvrpcpb::RawDeleteRangeResponse;
228230
}
229231

232+
range_request!(kvrpcpb::RawDeleteRangeRequest);
230233
shardable_range!(kvrpcpb::RawDeleteRangeRequest);
231234

232235
pub fn new_raw_scan_request(
@@ -250,6 +253,7 @@ impl KvRequest for kvrpcpb::RawScanRequest {
250253
type Response = kvrpcpb::RawScanResponse;
251254
}
252255

256+
range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan.
253257
shardable_range!(kvrpcpb::RawScanRequest);
254258

255259
impl Merge<kvrpcpb::RawScanResponse> for Collect {

src/request/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub use self::plan_builder::SingleKey;
2323
pub use self::shard::Batchable;
2424
pub use self::shard::HasNextBatch;
2525
pub use self::shard::NextBatch;
26+
pub use self::shard::RangeRequest;
2627
pub use self::shard::Shardable;
2728
use crate::backoff::Backoff;
2829
use crate::backoff::DEFAULT_REGION_BACKOFF;

src/request/shard.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::request::KvRequest;
1212
use crate::request::Plan;
1313
use crate::request::ResolveLock;
1414
use crate::store::RegionStore;
15+
use crate::store::Request;
1516
use crate::Result;
1617

1718
macro_rules! impl_inner_shardable {
@@ -204,6 +205,32 @@ macro_rules! shardable_keys {
204205
};
205206
}
206207

208+
pub trait RangeRequest: Request {
209+
fn is_reverse(&self) -> bool {
210+
false
211+
}
212+
}
213+
214+
#[doc(hidden)]
215+
#[macro_export]
216+
macro_rules! range_request {
217+
($type_: ty) => {
218+
impl RangeRequest for $type_ {}
219+
};
220+
}
221+
222+
#[doc(hidden)]
223+
#[macro_export]
224+
macro_rules! reversible_range_request {
225+
($type_: ty) => {
226+
impl RangeRequest for $type_ {
227+
fn is_reverse(&self) -> bool {
228+
self.reverse
229+
}
230+
}
231+
};
232+
}
233+
207234
#[doc(hidden)]
208235
#[macro_export]
209236
macro_rules! shardable_range {
@@ -215,8 +242,13 @@ macro_rules! shardable_range {
215242
&self,
216243
pd_client: &Arc<impl $crate::pd::PdClient>,
217244
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> {
218-
let start_key = self.start_key.clone().into();
219-
let end_key = self.end_key.clone().into();
245+
let mut start_key = self.start_key.clone().into();
246+
let mut end_key = self.end_key.clone().into();
247+
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
248+
// Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key.
249+
if self.is_reverse() {
250+
std::mem::swap(&mut start_key, &mut end_key);
251+
}
220252
$crate::store::store_stream_for_range((start_key, end_key), pd_client.clone())
221253
}
222254

@@ -227,8 +259,13 @@ macro_rules! shardable_range {
227259
) -> $crate::Result<()> {
228260
self.set_context(store.region_with_leader.context()?);
229261

262+
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
263+
// As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
230264
self.start_key = shard.0.into();
231265
self.end_key = shard.1.into();
266+
if self.is_reverse() {
267+
std::mem::swap(&mut self.start_key, &mut self.end_key);
268+
}
232269
Ok(())
233270
}
234271
}

src/transaction/requests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ use crate::request::KvRequest;
2828
use crate::request::Merge;
2929
use crate::request::NextBatch;
3030
use crate::request::Process;
31+
use crate::request::RangeRequest;
3132
use crate::request::ResponseWithShard;
3233
use crate::request::Shardable;
3334
use crate::request::SingleKey;
3435
use crate::request::{Batchable, StoreRequest};
36+
use crate::reversible_range_request;
3537
use crate::shardable_key;
3638
use crate::shardable_keys;
3739
use crate::shardable_range;
@@ -170,6 +172,7 @@ impl KvRequest for kvrpcpb::ScanRequest {
170172
type Response = kvrpcpb::ScanResponse;
171173
}
172174

175+
reversible_range_request!(kvrpcpb::ScanRequest);
173176
shardable_range!(kvrpcpb::ScanRequest);
174177

175178
impl Merge<kvrpcpb::ScanResponse> for Collect {

tests/integration_tests.rs

Lines changed: 94 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,27 +1028,108 @@ async fn txn_scan_reverse() -> Result<()> {
10281028
init().await?;
10291029
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
10301030

1031-
let k1 = b"a1".to_vec();
1032-
let k2 = b"a2".to_vec();
1033-
let v1 = b"b1".to_vec();
1034-
let v2 = b"b2".to_vec();
1035-
1036-
let reverse_resp = vec![
1037-
(Key::from(k2.clone()), v2.clone()),
1038-
(Key::from(k1.clone()), v1.clone()),
1039-
];
1031+
let k1 = b"k1".to_vec();
1032+
let k2 = b"k2".to_vec();
1033+
let k3 = b"k3".to_vec();
1034+
1035+
let v1 = b"v1".to_vec();
1036+
let v2 = b"v2".to_vec();
1037+
let v3 = b"v3".to_vec();
10401038

10411039
// Pessimistic option is not stable in this case. Use optimistic options instead.
10421040
let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn);
10431041
let mut t = client.begin_with_options(option.clone()).await?;
1044-
t.put(k1.clone(), v1).await?;
1045-
t.put(k2.clone(), v2).await?;
1042+
t.put(k1.clone(), v1.clone()).await?;
1043+
t.put(k2.clone(), v2.clone()).await?;
1044+
t.put(k3.clone(), v3.clone()).await?;
1045+
t.commit().await?;
1046+
1047+
let mut t2 = client.begin_with_options(option).await?;
1048+
{
1049+
// For [k1, k3]:
1050+
let bound_range: BoundRange = (k1.clone()..=k3.clone()).into();
1051+
let resp = t2
1052+
.scan_reverse(bound_range, 3)
1053+
.await?
1054+
.map(|kv| (kv.0, kv.1))
1055+
.collect::<Vec<(Key, Vec<u8>)>>();
1056+
assert_eq!(
1057+
resp,
1058+
vec![
1059+
(Key::from(k3.clone()), v3.clone()),
1060+
(Key::from(k2.clone()), v2.clone()),
1061+
(Key::from(k1.clone()), v1.clone()),
1062+
]
1063+
);
1064+
}
1065+
{
1066+
// For [k1, k3):
1067+
let bound_range: BoundRange = (k1.clone()..k3.clone()).into();
1068+
let resp = t2
1069+
.scan_reverse(bound_range, 3)
1070+
.await?
1071+
.map(|kv| (kv.0, kv.1))
1072+
.collect::<Vec<(Key, Vec<u8>)>>();
1073+
assert_eq!(
1074+
resp,
1075+
vec![
1076+
(Key::from(k2.clone()), v2.clone()),
1077+
(Key::from(k1.clone()), v1),
1078+
]
1079+
);
1080+
}
1081+
{
1082+
// For (k1, k3):
1083+
let mut start_key = k1.clone();
1084+
start_key.push(0);
1085+
let bound_range: BoundRange = (start_key..k3).into();
1086+
let resp = t2
1087+
.scan_reverse(bound_range, 3)
1088+
.await?
1089+
.map(|kv| (kv.0, kv.1))
1090+
.collect::<Vec<(Key, Vec<u8>)>>();
1091+
assert_eq!(resp, vec![(Key::from(k2), v2),]);
1092+
}
1093+
t2.commit().await?;
1094+
1095+
Ok(())
1096+
}
1097+
1098+
#[tokio::test]
1099+
#[serial]
1100+
async fn txn_scan_reverse_multi_regions() -> Result<()> {
1101+
init().await?;
1102+
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;
1103+
1104+
// Keys in `keys` should locate in different regions. See `init()` for boundary of regions.
1105+
let keys: Vec<Key> = vec![
1106+
0x00000000_u32.to_be_bytes().to_vec(),
1107+
0x40000000_u32.to_be_bytes().to_vec(),
1108+
0x80000000_u32.to_be_bytes().to_vec(),
1109+
0xC0000000_u32.to_be_bytes().to_vec(),
1110+
]
1111+
.into_iter()
1112+
.map(Into::into)
1113+
.collect();
1114+
let values: Vec<Vec<u8>> = (0..keys.len())
1115+
.map(|i| format!("v{}", i).into_bytes())
1116+
.collect();
1117+
let bound_range: BoundRange =
1118+
(keys.first().unwrap().clone()..=keys.last().unwrap().clone()).into();
1119+
1120+
// Pessimistic option is not stable in this case. Use optimistic options instead.
1121+
let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn);
1122+
let mut t = client.begin_with_options(option.clone()).await?;
1123+
let mut reverse_resp = Vec::with_capacity(keys.len());
1124+
for (k, v) in keys.into_iter().zip(values.into_iter()).rev() {
1125+
t.put(k.clone(), v.clone()).await?;
1126+
reverse_resp.push((k, v));
1127+
}
10461128
t.commit().await?;
10471129

10481130
let mut t2 = client.begin_with_options(option).await?;
1049-
let bound_range: BoundRange = (k1..=k2).into();
10501131
let resp = t2
1051-
.scan_reverse(bound_range, 2)
1132+
.scan_reverse(bound_range, 100)
10521133
.await?
10531134
.map(|kv| (kv.0, kv.1))
10541135
.collect::<Vec<(Key, Vec<u8>)>>();

0 commit comments

Comments
 (0)