Skip to content

Commit d656079

Browse files
authored
transaction: Support unsafe_destroy_range interface (#420)
* add unsafe_destroy_range Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> * fix compile error on lower version of rust Signed-off-by: Ping Yu <yuping@pingcap.com> --------- Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent 5ac72f2 commit d656079

File tree

15 files changed

+338
-12
lines changed

15 files changed

+338
-12
lines changed

src/backoff.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use rand::thread_rng;
88
use rand::Rng;
99

1010
pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
11+
pub const DEFAULT_STORE_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 1000, 10);
1112
pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
1213
pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
1314

src/mock.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ use crate::proto::metapb::{self};
1919
use crate::region::RegionId;
2020
use crate::region::RegionWithLeader;
2121
use crate::request::codec::ApiV1TxnCodec;
22-
use crate::store::KvClient;
2322
use crate::store::KvConnect;
2423
use crate::store::RegionStore;
2524
use crate::store::Request;
25+
use crate::store::{KvClient, Store};
2626
use crate::Config;
2727
use crate::Error;
2828
use crate::Key;
@@ -206,6 +206,10 @@ impl PdClient for MockPdClient {
206206
}
207207
}
208208

209+
async fn all_stores(&self) -> Result<Vec<Store>> {
210+
Ok(vec![Store::new(Arc::new(self.client.clone()))])
211+
}
212+
209213
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
210214
Ok(Timestamp::default())
211215
}

src/pd/client.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use crate::region::RegionVerId;
2121
use crate::region::RegionWithLeader;
2222
use crate::region_cache::RegionCache;
2323
use crate::request::codec::{ApiV1TxnCodec, Codec};
24-
use crate::store::KvClient;
2524
use crate::store::KvConnect;
2625
use crate::store::RegionStore;
2726
use crate::store::TikvConnect;
27+
use crate::store::{KvClient, Store};
2828
use crate::BoundRange;
2929
use crate::Config;
3030
use crate::Key;
@@ -78,6 +78,8 @@ pub trait PdClient: Send + Sync + 'static {
7878
self.map_region_to_store(region).await
7979
}
8080

81+
async fn all_stores(&self) -> Result<Vec<Store>>;
82+
8183
fn group_keys_by_region<K, K2>(
8284
self: Arc<Self>,
8385
keys: impl Iterator<Item = K> + Send + Sync + 'static,
@@ -255,6 +257,16 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
255257
Self::decode_region(region, self.enable_mvcc_codec)
256258
}
257259

260+
async fn all_stores(&self) -> Result<Vec<Store>> {
261+
let pb_stores = self.region_cache.read_through_all_stores().await?;
262+
let mut stores = Vec::with_capacity(pb_stores.len());
263+
for store in pb_stores {
264+
let client = self.kv_client(&store.address).await?;
265+
stores.push(Store::new(Arc::new(client)));
266+
}
267+
Ok(stores)
268+
}
269+
258270
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
259271
self.pd.clone().get_timestamp().await
260272
}

src/pd/retry.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ impl RetryClientTrait for RetryClient<Cluster> {
160160
})
161161
}
162162

163-
#[allow(dead_code)]
164163
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
165164
retry!(self, "get_all_stores", |cluster| async {
166165
cluster

src/region_cache.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,39 @@ impl<C: RetryClientTrait> RegionCache<C> {
232232
cache.key_to_ver_id.remove(&start_key);
233233
}
234234
}
235+
236+
pub async fn read_through_all_stores(&self) -> Result<Vec<Store>> {
237+
let stores = self
238+
.inner_client
239+
.clone()
240+
.get_all_stores()
241+
.await?
242+
.into_iter()
243+
.filter(is_valid_tikv_store)
244+
.collect::<Vec<_>>();
245+
for store in &stores {
246+
self.store_cache
247+
.write()
248+
.await
249+
.insert(store.id, store.clone());
250+
}
251+
Ok(stores)
252+
}
253+
}
254+
255+
const ENGINE_LABEL_KEY: &str = "engine";
256+
const ENGINE_LABEL_TIFLASH: &str = "tiflash";
257+
const ENGINE_LABEL_TIFLASH_COMPUTE: &str = "tiflash_compute";
258+
259+
fn is_valid_tikv_store(store: &metapb::Store) -> bool {
260+
if metapb::StoreState::from_i32(store.state).unwrap() == metapb::StoreState::Tombstone {
261+
return false;
262+
}
263+
let is_tiflash = store.labels.iter().any(|label| {
264+
label.key == ENGINE_LABEL_KEY
265+
&& (label.value == ENGINE_LABEL_TIFLASH || label.value == ENGINE_LABEL_TIFLASH_COMPUTE)
266+
});
267+
!is_tiflash
235268
}
236269

237270
#[cfg(test)]
@@ -253,6 +286,7 @@ mod test {
253286
use crate::proto::metapb::{self};
254287
use crate::region::RegionId;
255288
use crate::region::RegionWithLeader;
289+
use crate::region_cache::is_valid_tikv_store;
256290
use crate::Key;
257291
use crate::Result;
258292

@@ -512,4 +546,34 @@ mod test {
512546

513547
region
514548
}
549+
550+
#[test]
551+
fn test_is_valid_tikv_store() {
552+
let mut store = metapb::Store::default();
553+
assert!(is_valid_tikv_store(&store));
554+
555+
store.state = metapb::StoreState::Tombstone.into();
556+
assert!(!is_valid_tikv_store(&store));
557+
558+
store.state = metapb::StoreState::Up.into();
559+
assert!(is_valid_tikv_store(&store));
560+
561+
store.labels.push(metapb::StoreLabel {
562+
key: "some_key".to_owned(),
563+
value: "some_value".to_owned(),
564+
});
565+
assert!(is_valid_tikv_store(&store));
566+
567+
store.labels.push(metapb::StoreLabel {
568+
key: "engine".to_owned(),
569+
value: "tiflash".to_owned(),
570+
});
571+
assert!(!is_valid_tikv_store(&store));
572+
573+
store.labels[1].value = "tiflash_compute".to_owned();
574+
assert!(!is_valid_tikv_store(&store));
575+
576+
store.labels[1].value = "other".to_owned();
577+
assert!(is_valid_tikv_store(&store));
578+
}
515579
}

src/request/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use crate::backoff::Backoff;
2828
use crate::backoff::DEFAULT_REGION_BACKOFF;
2929
use crate::backoff::OPTIMISTIC_BACKOFF;
3030
use crate::backoff::PESSIMISTIC_BACKOFF;
31-
use crate::store::HasKeyErrors;
3231
use crate::store::Request;
32+
use crate::store::{HasKeyErrors, Store};
3333
use crate::transaction::HasLocks;
3434

3535
pub mod codec;
@@ -47,6 +47,12 @@ pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
4747
// TODO: fn decode_response()
4848
}
4949

50+
/// For requests or plans which are handled at TiKV store (other than region) level.
51+
pub trait StoreRequest {
52+
/// Apply the request to specified TiKV store.
53+
fn apply_store(&mut self, store: &Store);
54+
}
55+
5056
#[derive(Clone, Debug, new, Eq, PartialEq)]
5157
pub struct RetryOptions {
5258
/// How to retry when there is a region error and we need to resolve regions with PD.

src/request/plan.rs

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ use crate::proto::errorpb;
1818
use crate::proto::errorpb::EpochNotMatch;
1919
use crate::proto::kvrpcpb;
2020
use crate::request::shard::HasNextBatch;
21-
use crate::request::KvRequest;
2221
use crate::request::NextBatch;
2322
use crate::request::Shardable;
23+
use crate::request::{KvRequest, StoreRequest};
2424
use crate::stats::tikv_stats;
25-
use crate::store::HasKeyErrors;
2625
use crate::store::HasRegionError;
2726
use crate::store::HasRegionErrors;
2827
use crate::store::KvClient;
2928
use crate::store::RegionStore;
29+
use crate::store::{HasKeyErrors, Store};
3030
use crate::transaction::resolve_locks;
3131
use crate::transaction::HasLocks;
3232
use crate::transaction::ResolveLocksContext;
@@ -73,7 +73,19 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
7373
}
7474
}
7575

76+
impl<Req: KvRequest + StoreRequest> StoreRequest for Dispatch<Req> {
77+
fn apply_store(&mut self, store: &Store) {
78+
self.kv_client = Some(store.client.clone());
79+
self.request.apply_store(store);
80+
}
81+
}
82+
7683
const MULTI_REGION_CONCURRENCY: usize = 16;
84+
const MULTI_STORES_CONCURRENCY: usize = 16;
85+
86+
fn is_grpc_error(e: &Error) -> bool {
87+
matches!(e, Error::GrpcAPI(_) | Error::Grpc(_))
88+
}
7789

7890
pub struct RetryableMultiRegion<P: Plan, PdC: PdClient> {
7991
pub(super) inner: P,
@@ -150,7 +162,6 @@ where
150162
let res = plan.execute().await;
151163
drop(permit);
152164

153-
let is_grpc_error = |e: &Error| matches!(e, Error::GrpcAPI(_) | Error::Grpc(_));
154165
let mut resp = match res {
155166
Ok(resp) => resp,
156167
Err(e) if is_grpc_error(&e) => {
@@ -354,6 +365,90 @@ where
354365
}
355366
}
356367

368+
pub struct RetryableAllStores<P: Plan, PdC: PdClient> {
369+
pub(super) inner: P,
370+
pub pd_client: Arc<PdC>,
371+
pub backoff: Backoff,
372+
}
373+
374+
impl<P: Plan, PdC: PdClient> Clone for RetryableAllStores<P, PdC> {
375+
fn clone(&self) -> Self {
376+
RetryableAllStores {
377+
inner: self.inner.clone(),
378+
pd_client: self.pd_client.clone(),
379+
backoff: self.backoff.clone(),
380+
}
381+
}
382+
}
383+
384+
// About `HasRegionError`:
385+
// Store requests should be return region errors.
386+
// But as the response of only store request by now (UnsafeDestroyRangeResponse) has the `region_error` field,
387+
// we require `HasRegionError` to check whether there is region error returned from TiKV.
388+
#[async_trait]
389+
impl<P: Plan + StoreRequest, PdC: PdClient> Plan for RetryableAllStores<P, PdC>
390+
where
391+
P::Result: HasKeyErrors + HasRegionError,
392+
{
393+
type Result = Vec<Result<P::Result>>;
394+
395+
async fn execute(&self) -> Result<Self::Result> {
396+
let concurrency_permits = Arc::new(Semaphore::new(MULTI_STORES_CONCURRENCY));
397+
let stores = self.pd_client.clone().all_stores().await?;
398+
let mut handles = Vec::with_capacity(stores.len());
399+
for store in stores {
400+
let mut clone = self.inner.clone();
401+
clone.apply_store(&store);
402+
let handle = tokio::spawn(Self::single_store_handler(
403+
clone,
404+
self.backoff.clone(),
405+
concurrency_permits.clone(),
406+
));
407+
handles.push(handle);
408+
}
409+
let results = try_join_all(handles).await?;
410+
Ok(results.into_iter().collect::<Vec<_>>())
411+
}
412+
}
413+
414+
impl<P: Plan, PdC: PdClient> RetryableAllStores<P, PdC>
415+
where
416+
P::Result: HasKeyErrors + HasRegionError,
417+
{
418+
async fn single_store_handler(
419+
plan: P,
420+
mut backoff: Backoff,
421+
permits: Arc<Semaphore>,
422+
) -> Result<P::Result> {
423+
loop {
424+
let permit = permits.acquire().await.unwrap();
425+
let res = plan.execute().await;
426+
drop(permit);
427+
428+
match res {
429+
Ok(mut resp) => {
430+
if let Some(e) = resp.key_errors() {
431+
return Err(Error::MultipleKeyErrors(e));
432+
} else if let Some(e) = resp.region_error() {
433+
// Store request should not return region error.
434+
return Err(Error::RegionError(Box::new(e)));
435+
} else {
436+
return Ok(resp);
437+
}
438+
}
439+
Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() {
440+
Some(duration) => {
441+
sleep(duration).await;
442+
continue;
443+
}
444+
None => return Err(e),
445+
},
446+
Err(e) => return Err(e),
447+
}
448+
}
449+
}
450+
}
451+
357452
/// A technique for merging responses into a single result (with type `Out`).
358453
pub trait Merge<In>: Sized + Clone + Send + Sync + 'static {
359454
type Out: Send;

src/request/plan_builder.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ use super::plan::PreserveShard;
77
use crate::backoff::Backoff;
88
use crate::pd::PdClient;
99
use crate::request::codec::EncodedRequest;
10-
use crate::request::plan::CleanupLocks;
10+
use crate::request::plan::{CleanupLocks, RetryableAllStores};
1111
use crate::request::shard::HasNextBatch;
12-
use crate::request::DefaultProcessor;
1312
use crate::request::Dispatch;
1413
use crate::request::ExtractError;
1514
use crate::request::KvRequest;
@@ -22,6 +21,7 @@ use crate::request::ProcessResponse;
2221
use crate::request::ResolveLock;
2322
use crate::request::RetryableMultiRegion;
2423
use crate::request::Shardable;
24+
use crate::request::{DefaultProcessor, StoreRequest};
2525
use crate::store::HasKeyErrors;
2626
use crate::store::HasRegionError;
2727
use crate::store::HasRegionErrors;
@@ -194,6 +194,26 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
194194
}
195195
}
196196

197+
impl<PdC: PdClient, P: Plan + StoreRequest> PlanBuilder<PdC, P, NoTarget>
198+
where
199+
P::Result: HasKeyErrors + HasRegionError,
200+
{
201+
pub fn all_stores(
202+
self,
203+
backoff: Backoff,
204+
) -> PlanBuilder<PdC, RetryableAllStores<P, PdC>, Targetted> {
205+
PlanBuilder {
206+
pd_client: self.pd_client.clone(),
207+
plan: RetryableAllStores {
208+
inner: self.plan,
209+
pd_client: self.pd_client,
210+
backoff,
211+
},
212+
phantom: PhantomData,
213+
}
214+
}
215+
}
216+
197217
impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
198218
where
199219
P::Result: HasKeyErrors,

src/store/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ has_region_error!(kvrpcpb::CheckTxnStatusResponse);
5252
has_region_error!(kvrpcpb::CheckSecondaryLocksResponse);
5353
has_region_error!(kvrpcpb::DeleteRangeResponse);
5454
has_region_error!(kvrpcpb::GcResponse);
55+
has_region_error!(kvrpcpb::UnsafeDestroyRangeResponse);
5556
has_region_error!(kvrpcpb::RawGetResponse);
5657
has_region_error!(kvrpcpb::RawBatchGetResponse);
5758
has_region_error!(kvrpcpb::RawPutResponse);
@@ -111,6 +112,7 @@ has_str_error!(kvrpcpb::RawCasResponse);
111112
has_str_error!(kvrpcpb::RawCoprocessorResponse);
112113
has_str_error!(kvrpcpb::ImportResponse);
113114
has_str_error!(kvrpcpb::DeleteRangeResponse);
115+
has_str_error!(kvrpcpb::UnsafeDestroyRangeResponse);
114116

115117
impl HasKeyErrors for kvrpcpb::ScanResponse {
116118
fn key_errors(&mut self) -> Option<Vec<Error>> {

src/store/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ pub struct RegionStore {
3333
pub client: Arc<dyn KvClient + Send + Sync>,
3434
}
3535

36+
#[derive(new, Clone)]
37+
pub struct Store {
38+
pub client: Arc<dyn KvClient + Send + Sync>,
39+
}
40+
3641
#[async_trait]
3742
pub trait KvConnectStore: KvConnect {
3843
async fn connect_to_store(

0 commit comments

Comments
 (0)