Skip to content

Commit 85c59fb

Browse files
authored
txn: Resolve locks for async commit (region level) (#378)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent d2f78b2 commit 85c59fb

File tree

12 files changed

+1051
-97
lines changed

12 files changed

+1051
-97
lines changed

src/request/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub use self::{
1515
ResponseWithShard, RetryableMultiRegion,
1616
},
1717
plan_builder::{PlanBuilder, SingleKey},
18-
shard::Shardable,
18+
shard::{HasNextBatch, NextBatch, Shardable},
1919
};
2020

2121
pub mod plan;

src/request/plan.rs

Lines changed: 165 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use tokio::sync::Semaphore;
1212
use crate::{
1313
backoff::Backoff,
1414
pd::PdClient,
15-
request::{KvRequest, Shardable},
15+
request::{shard::HasNextBatch, KvRequest, NextBatch, Shardable},
1616
stats::tikv_stats,
1717
store::RegionStore,
18-
transaction::{resolve_locks, HasLocks},
18+
transaction::{resolve_locks, HasLocks, ResolveLocksContext, ResolveLocksOptions},
1919
util::iter::FlatMapOkIterExt,
2020
Error, Result,
2121
};
@@ -442,6 +442,169 @@ where
442442
}
443443
}
444444

445+
#[derive(Default)]
446+
pub struct CleanupLocksResult {
447+
pub region_error: Option<errorpb::Error>,
448+
pub key_error: Option<Vec<Error>>,
449+
pub meet_locks: usize,
450+
// TODO: pub resolved_locks: usize,
451+
}
452+
453+
impl Clone for CleanupLocksResult {
454+
fn clone(&self) -> Self {
455+
Self {
456+
meet_locks: self.meet_locks,
457+
..Default::default() // Ignore errors, which should be extracted by `extract_error()`.
458+
}
459+
}
460+
}
461+
462+
impl HasRegionError for CleanupLocksResult {
463+
fn region_error(&mut self) -> Option<errorpb::Error> {
464+
self.region_error.take()
465+
}
466+
}
467+
468+
impl HasKeyErrors for CleanupLocksResult {
469+
fn key_errors(&mut self) -> Option<Vec<Error>> {
470+
self.key_error.take()
471+
}
472+
}
473+
474+
impl Merge<CleanupLocksResult> for Collect {
475+
type Out = CleanupLocksResult;
476+
477+
fn merge(&self, input: Vec<Result<CleanupLocksResult>>) -> Result<Self::Out> {
478+
input
479+
.into_iter()
480+
.fold(Ok(CleanupLocksResult::default()), |acc, x| {
481+
Ok(CleanupLocksResult {
482+
meet_locks: acc.unwrap().meet_locks + x?.meet_locks,
483+
..Default::default()
484+
})
485+
})
486+
}
487+
}
488+
489+
pub struct CleanupLocks<P: Plan, PdC: PdClient> {
490+
pub logger: slog::Logger,
491+
pub inner: P,
492+
pub ctx: ResolveLocksContext,
493+
pub options: ResolveLocksOptions,
494+
pub store: Option<RegionStore>,
495+
pub pd_client: Arc<PdC>,
496+
pub backoff: Backoff,
497+
}
498+
499+
impl<P: Plan, PdC: PdClient> Clone for CleanupLocks<P, PdC> {
500+
fn clone(&self) -> Self {
501+
CleanupLocks {
502+
logger: self.logger.clone(),
503+
inner: self.inner.clone(),
504+
ctx: self.ctx.clone(),
505+
options: self.options,
506+
store: None,
507+
pd_client: self.pd_client.clone(),
508+
backoff: self.backoff.clone(),
509+
}
510+
}
511+
}
512+
513+
#[async_trait]
514+
impl<P: Plan + Shardable + NextBatch, PdC: PdClient> Plan for CleanupLocks<P, PdC>
515+
where
516+
P::Result: HasLocks + HasNextBatch + HasKeyErrors + HasRegionError,
517+
{
518+
type Result = CleanupLocksResult;
519+
520+
async fn execute(&self) -> Result<Self::Result> {
521+
let mut result = CleanupLocksResult::default();
522+
let mut inner = self.inner.clone();
523+
let mut lock_resolver =
524+
crate::transaction::LockResolver::new(self.logger.clone(), self.ctx.clone());
525+
let region = &self.store.as_ref().unwrap().region_with_leader;
526+
let mut has_more_batch = true;
527+
528+
while has_more_batch {
529+
let mut scan_lock_resp = inner.execute().await?;
530+
531+
// Propagate errors to `retry_multi_region` for retry.
532+
if let Some(e) = scan_lock_resp.key_errors() {
533+
info!(
534+
self.logger,
535+
"CleanupLocks::execute, inner key errors:{:?}", e
536+
);
537+
result.key_error = Some(e);
538+
return Ok(result);
539+
} else if let Some(e) = scan_lock_resp.region_error() {
540+
info!(
541+
self.logger,
542+
"CleanupLocks::execute, inner region error:{}",
543+
e.get_message()
544+
);
545+
result.region_error = Some(e);
546+
return Ok(result);
547+
}
548+
549+
// Iterate to next batch of inner.
550+
match scan_lock_resp.has_next_batch() {
551+
Some(range) if region.contains(range.0.as_ref()) => {
552+
debug!(self.logger, "CleanupLocks::execute, next range:{:?}", range);
553+
inner.next_batch(range);
554+
}
555+
_ => has_more_batch = false,
556+
}
557+
558+
let mut locks = scan_lock_resp.take_locks();
559+
if locks.is_empty() {
560+
break;
561+
}
562+
if locks.len() < self.options.batch_size as usize {
563+
has_more_batch = false;
564+
}
565+
566+
if self.options.async_commit_only {
567+
locks = locks
568+
.into_iter()
569+
.filter(|l| l.get_use_async_commit())
570+
.collect::<Vec<_>>();
571+
}
572+
debug!(
573+
self.logger,
574+
"CleanupLocks::execute, meet locks:{}",
575+
locks.len()
576+
);
577+
result.meet_locks += locks.len();
578+
579+
match lock_resolver
580+
.cleanup_locks(self.store.clone().unwrap(), locks, self.pd_client.clone())
581+
.await
582+
{
583+
Ok(()) => {}
584+
Err(Error::ExtractedErrors(mut errors)) => {
585+
// Propagate errors to `retry_multi_region` for retry.
586+
if let Error::RegionError(e) = errors.pop().unwrap() {
587+
result.region_error = Some(*e);
588+
} else {
589+
result.key_error = Some(errors);
590+
}
591+
return Ok(result);
592+
}
593+
Err(e) => {
594+
return Err(e);
595+
}
596+
}
597+
598+
// TODO: improve backoff
599+
// if self.backoff.is_none() {
600+
// return Err(Error::ResolveLockError);
601+
// }
602+
}
603+
604+
Ok(result)
605+
}
606+
}
607+
445608
/// When executed, the plan extracts errors from its inner plan, and returns an
446609
/// `Err` wrapping the error.
447610
///

src/request/plan_builder.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ use crate::{
55
backoff::Backoff,
66
pd::PdClient,
77
request::{
8-
DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, Plan, Process,
9-
ProcessResponse, ResolveLock, RetryableMultiRegion, Shardable,
8+
plan::CleanupLocks, shard::HasNextBatch, DefaultProcessor, Dispatch, ExtractError,
9+
KvRequest, Merge, MergeResponse, NextBatch, Plan, Process, ProcessResponse, ResolveLock,
10+
RetryableMultiRegion, Shardable,
1011
},
1112
store::RegionStore,
12-
transaction::HasLocks,
13+
transaction::{HasLocks, ResolveLocksContext, ResolveLocksOptions},
1314
Result,
1415
};
1516
use std::{marker::PhantomData, sync::Arc};
@@ -68,6 +69,32 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
6869
}
6970
}
7071

72+
pub fn cleanup_locks(
73+
self,
74+
logger: slog::Logger, // TODO: add logger to PlanBuilder.
75+
ctx: ResolveLocksContext,
76+
options: ResolveLocksOptions,
77+
backoff: Backoff,
78+
) -> PlanBuilder<PdC, CleanupLocks<P, PdC>, Ph>
79+
where
80+
P: Shardable + NextBatch,
81+
P::Result: HasLocks + HasNextBatch + HasRegionError + HasKeyErrors,
82+
{
83+
PlanBuilder {
84+
pd_client: self.pd_client.clone(),
85+
plan: CleanupLocks {
86+
logger,
87+
inner: self.plan,
88+
ctx,
89+
options,
90+
store: None,
91+
backoff,
92+
pd_client: self.pd_client,
93+
},
94+
phantom: PhantomData,
95+
}
96+
}
97+
7198
/// Merge the results of a request. Usually used where a request is sent to multiple regions
7299
/// to combine the responses from each region.
73100
pub fn merge<In, M: Merge<In>>(self, merge: M) -> PlanBuilder<PdC, MergeResponse<P, In, M>, Ph>

src/request/shard.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use super::plan::PreserveShard;
44
use crate::{
55
pd::PdClient,
6-
request::{Dispatch, KvRequest, Plan, ResolveLock},
6+
request::{plan::CleanupLocks, Dispatch, KvRequest, Plan, ResolveLock},
77
store::RegionStore,
88
Result,
99
};
@@ -38,6 +38,17 @@ pub trait Shardable {
3838
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>;
3939
}
4040

41+
// Use to iterate in a region for scan requests that have batch size limit.
42+
// HasNextBatch use to get the next batch according to previous response.
43+
pub trait HasNextBatch {
44+
fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)>;
45+
}
46+
47+
// NextBatch use to change start key of request by result of `has_next_batch`.
48+
pub trait NextBatch {
49+
fn next_batch(&mut self, _range: (Vec<u8>, Vec<u8>));
50+
}
51+
4152
impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
4253
type Shard = Req::Shard;
4354

@@ -54,6 +65,12 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
5465
}
5566
}
5667

68+
impl<Req: KvRequest + NextBatch> NextBatch for Dispatch<Req> {
69+
fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
70+
self.request.next_batch(range);
71+
}
72+
}
73+
5774
impl<P: Plan + Shardable> Shardable for PreserveShard<P> {
5875
type Shard = P::Shard;
5976

@@ -74,6 +91,22 @@ impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
7491
impl_inner_shardable!();
7592
}
7693

94+
impl<P: Plan + Shardable, PdC: PdClient> Shardable for CleanupLocks<P, PdC> {
95+
type Shard = P::Shard;
96+
97+
fn shards(
98+
&self,
99+
pd_client: &Arc<impl PdClient>,
100+
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
101+
self.inner.shards(pd_client)
102+
}
103+
104+
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
105+
self.store = Some(store.clone());
106+
self.inner.apply_shard(shard, store)
107+
}
108+
}
109+
77110
#[macro_export]
78111
macro_rules! shardable_key {
79112
($type_: ty) => {

0 commit comments

Comments
 (0)