Skip to content

Commit a675082

Browse files
authored
Merge pull request #386 from haojinming/resolve_lock_in_range
Resolve lock in range
2 parents a04e551 + b22710b commit a675082

File tree

9 files changed

+140
-66
lines changed

9 files changed

+140
-66
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ jobs:
8888
run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
8989
- name: start tiup playground
9090
run: |
91-
~/.tiup/bin/tiup install tikv:nightly pd:nightly
92-
~/.tiup/bin/tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config config/tikv.toml --pd.config config/pd.toml &
91+
# use latest stable version
92+
~/.tiup/bin/tiup install tikv pd
93+
~/.tiup/bin/tiup playground --mode tikv-slim --kv 3 --without-monitor --kv.config config/tikv.toml --pd.config config/pd.toml &
9394
while :; do
9495
echo "waiting cluster to be ready"
9596
[[ "$(curl -I http://127.0.0.1:2379/pd/api/v1/regions 2>/dev/null | head -n 1 | cut -d$' ' -f2)" -ne "405" ]] || break

src/request/plan.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -446,14 +446,13 @@ where
446446
pub struct CleanupLocksResult {
447447
pub region_error: Option<errorpb::Error>,
448448
pub key_error: Option<Vec<Error>>,
449-
pub meet_locks: usize,
450-
// TODO: pub resolved_locks: usize,
449+
pub resolved_locks: usize,
451450
}
452451

453452
impl Clone for CleanupLocksResult {
454453
fn clone(&self) -> Self {
455454
Self {
456-
meet_locks: self.meet_locks,
455+
resolved_locks: self.resolved_locks,
457456
..Default::default() // Ignore errors, which should be extracted by `extract_error()`.
458457
}
459458
}
@@ -479,7 +478,7 @@ impl Merge<CleanupLocksResult> for Collect {
479478
.into_iter()
480479
.fold(Ok(CleanupLocksResult::default()), |acc, x| {
481480
Ok(CleanupLocksResult {
482-
meet_locks: acc.unwrap().meet_locks + x?.meet_locks,
481+
resolved_locks: acc?.resolved_locks + x?.resolved_locks,
483482
..Default::default()
484483
})
485484
})
@@ -574,13 +573,15 @@ where
574573
"CleanupLocks::execute, meet locks:{}",
575574
locks.len()
576575
);
577-
result.meet_locks += locks.len();
578576

577+
let lock_size = locks.len();
579578
match lock_resolver
580579
.cleanup_locks(self.store.clone().unwrap(), locks, self.pd_client.clone())
581580
.await
582581
{
583-
Ok(()) => {}
582+
Ok(()) => {
583+
result.resolved_locks += lock_size;
584+
}
584585
Err(Error::ExtractedErrors(mut errors)) => {
585586
// Propagate errors to `retry_multi_region` for retry.
586587
if let Error::RegionError(e) = errors.pop().unwrap() {

src/store.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ pub fn store_stream_for_range<PdC: PdClient>(
5353
range: (Vec<u8>, Vec<u8>),
5454
pd_client: Arc<PdC>,
5555
) -> BoxStream<'static, Result<((Vec<u8>, Vec<u8>), RegionStore)>> {
56-
let bnd_range = BoundRange::from(range.clone());
56+
let bnd_range = if range.1.is_empty() {
57+
BoundRange::range_from(range.0.clone().into())
58+
} else {
59+
BoundRange::from(range.clone())
60+
};
5761
pd_client
5862
.stores_for_range(bnd_range)
5963
.map_ok(move |store| {
@@ -67,25 +71,6 @@ pub fn store_stream_for_range<PdC: PdClient>(
6771
.boxed()
6872
}
6973

70-
pub fn store_stream_for_range_by_start_key<PdC: PdClient>(
71-
start_key: Key,
72-
pd_client: Arc<PdC>,
73-
) -> BoxStream<'static, Result<(Vec<u8>, RegionStore)>> {
74-
let bnd_range = BoundRange::range_from(start_key.clone());
75-
pd_client
76-
.stores_for_range(bnd_range)
77-
.map_ok(move |store| {
78-
let region_range = store.region_with_leader.range();
79-
(
80-
range_intersection(region_range, (start_key.clone(), vec![].into()))
81-
.0
82-
.into(),
83-
store,
84-
)
85-
})
86-
.boxed()
87-
}
88-
8974
/// The range used for request should be the intersection of `region_range` and `range`.
9075
fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) {
9176
let (lower, upper) = region_range;

src/transaction/client.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use super::requests::new_scan_lock_request;
43
use crate::{
54
backoff::DEFAULT_REGION_BACKOFF,
65
config::Config,
@@ -10,10 +9,11 @@ use crate::{
109
transaction::{
1110
lock::ResolveLocksOptions, ResolveLocksContext, Snapshot, Transaction, TransactionOptions,
1211
},
13-
Backoff, Result,
12+
transaction_lowering::new_scan_lock_request,
13+
Backoff, BoundRange, Result,
1414
};
1515
use slog::{Drain, Logger};
16-
use std::{mem, sync::Arc};
16+
use std::sync::Arc;
1717
use tikv_client_proto::pdpb::Timestamp;
1818

1919
// FIXME: cargo-culted value
@@ -242,7 +242,7 @@ impl Client {
242242
batch_size: SCAN_LOCK_BATCH_SIZE,
243243
..Default::default()
244244
};
245-
self.cleanup_locks(&safepoint, options).await?;
245+
self.cleanup_locks(.., &safepoint, options).await?;
246246

247247
// update safepoint to PD
248248
let res: bool = self
@@ -258,24 +258,20 @@ impl Client {
258258

259259
pub async fn cleanup_locks(
260260
&self,
261+
range: impl Into<BoundRange>,
261262
safepoint: &Timestamp,
262263
options: ResolveLocksOptions,
263264
) -> Result<CleanupLocksResult> {
264265
debug!(self.logger, "invoking cleanup async commit locks");
265266
// scan all locks with ts <= safepoint
266-
let mut start_key = vec![];
267267
let ctx = ResolveLocksContext::default();
268268
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
269-
let req = new_scan_lock_request(
270-
mem::take(&mut start_key),
271-
safepoint.version(),
272-
options.batch_size,
273-
);
269+
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
274270
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
275271
.cleanup_locks(self.logger.clone(), ctx.clone(), options, backoff)
276272
.retry_multi_region(DEFAULT_REGION_BACKOFF)
277-
.merge(crate::request::Collect)
278273
.extract_error()
274+
.merge(crate::request::Collect)
279275
.plan();
280276
plan.execute().await
281277
}
@@ -286,10 +282,10 @@ impl Client {
286282
pub async fn scan_locks(
287283
&self,
288284
safepoint: &Timestamp,
289-
mut start_key: Vec<u8>,
285+
range: impl Into<BoundRange>,
290286
batch_size: u32,
291287
) -> Result<Vec<tikv_client_proto::kvrpcpb::LockInfo>> {
292-
let req = new_scan_lock_request(mem::take(&mut start_key), safepoint.version(), batch_size);
288+
let req = new_scan_lock_request(range.into(), safepoint, batch_size);
293289
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
294290
.retry_multi_region(DEFAULT_REGION_BACKOFF)
295291
.merge(crate::request::Collect)

src/transaction/lowering.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,17 @@ pub fn new_pessimistic_lock_request(
161161
}
162162

163163
pub fn new_scan_lock_request(
164-
start_key: Key,
165-
safepoint: Timestamp,
164+
range: BoundRange,
165+
safepoint: &Timestamp,
166166
limit: u32,
167167
) -> kvrpcpb::ScanLockRequest {
168-
requests::new_scan_lock_request(start_key.into(), safepoint.version(), limit)
168+
let (start_key, end_key) = range.into_keys();
169+
requests::new_scan_lock_request(
170+
start_key.into(),
171+
end_key.unwrap_or_default().into(),
172+
safepoint.version(),
173+
limit,
174+
)
169175
}
170176

171177
pub fn new_heart_beat_request(

src/transaction/requests.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge,
88
NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
99
},
10-
store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore},
10+
store::{store_stream_for_keys, store_stream_for_range, RegionStore},
1111
timestamp::TimestampExt,
1212
transaction::HasLocks,
1313
util::iter::FlatMapOkIterExt,
@@ -445,11 +445,13 @@ impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Muta
445445

446446
pub fn new_scan_lock_request(
447447
start_key: Vec<u8>,
448+
end_key: Vec<u8>,
448449
safepoint: u64,
449450
limit: u32,
450451
) -> kvrpcpb::ScanLockRequest {
451452
let mut req = kvrpcpb::ScanLockRequest::default();
452453
req.set_start_key(start_key);
454+
req.set_end_key(end_key);
453455
req.set_max_version(safepoint);
454456
req.set_limit(limit);
455457
req
@@ -460,25 +462,29 @@ impl KvRequest for kvrpcpb::ScanLockRequest {
460462
}
461463

462464
impl Shardable for kvrpcpb::ScanLockRequest {
463-
type Shard = Vec<u8>;
465+
type Shard = (Vec<u8>, Vec<u8>);
464466

465467
fn shards(
466468
&self,
467469
pd_client: &Arc<impl PdClient>,
468470
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
469-
store_stream_for_range_by_start_key(self.start_key.clone().into(), pd_client.clone())
471+
store_stream_for_range(
472+
(self.start_key.clone(), self.end_key.clone()),
473+
pd_client.clone(),
474+
)
470475
}
471476

472477
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
473478
self.set_context(store.region_with_leader.context()?);
474-
self.set_start_key(shard);
479+
self.set_start_key(shard.0);
475480
Ok(())
476481
}
477482
}
478483

479484
impl HasNextBatch for kvrpcpb::ScanLockResponse {
480485
fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)> {
481486
self.get_locks().last().map(|lock| {
487+
// TODO: if last key is larger or equal than ScanLockRequest.end_key, return None.
482488
let mut start_key: Vec<u8> = lock.get_key().to_vec();
483489
start_key.push(0);
484490
(start_key, vec![])

tests/common/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,14 @@ pub async fn init() -> Result<()> {
4646
.take(count as usize - 1)
4747
.map(|x| x.to_be_bytes().to_vec());
4848

49-
ensure_region_split(keys_1.chain(keys_2), 80).await?;
49+
// about 43 regions with above keys.
50+
ensure_region_split(keys_1.chain(keys_2), 40).await?;
5051
}
5152

5253
clear_tikv().await;
54+
let region_cnt = ctl::get_region_count().await?;
55+
// print log for debug convenience
56+
println!("init finish with {region_cnt} regions");
5357
Ok(())
5458
}
5559

0 commit comments

Comments
 (0)