diff --git a/src/raw/client.rs b/src/raw/client.rs index 620531ee..a9c70cc4 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -31,6 +31,7 @@ use crate::Key; use crate::KvPair; use crate::Result; use crate::Value; +use crate::request::plan::is_grpc_error; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; @@ -835,6 +836,17 @@ impl Client { } Ok((Some(r), region.end_key())) } + Err(err) if is_grpc_error(&err) => { + debug!("retryable_scan: grpc error: {:?}", err); + plan::handle_rpc_error(self.rpc.clone(), store.clone()).await; + + if let Some(duration) = scan_args.backoff.next_delay_duration() { + sleep(duration).await; + continue; + } else { + return Err(err); + } + } Err(err) => Err(err), }; } diff --git a/src/request/plan.rs b/src/request/plan.rs index be915f77..99331393 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -87,7 +87,7 @@ impl StoreRequest for Dispatch { const MULTI_REGION_CONCURRENCY: usize = 16; const MULTI_STORES_CONCURRENCY: usize = 16; -fn is_grpc_error(e: &Error) -> bool { +pub(crate) fn is_grpc_error(e: &Error) -> bool { matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)) } @@ -345,6 +345,18 @@ pub(crate) async fn handle_region_error( } } +pub(crate) async fn handle_rpc_error( + pd_client: Arc, + region_store: RegionStore, +) { + let ver_id = region_store.region_with_leader.ver_id(); + let store_id = region_store.region_with_leader.get_store_id(); + pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } +} + // Returns // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry