1
1
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
2
use core:: ops:: Range ;
3
- use std:: future:: Future ;
4
- use std:: pin:: Pin ;
3
+
5
4
use std:: str:: FromStr ;
6
5
use std:: sync:: Arc ;
7
6
@@ -14,7 +13,7 @@ use crate::config::Config;
14
13
use crate :: pd:: PdClient ;
15
14
use crate :: pd:: PdRpcClient ;
16
15
use crate :: proto:: kvrpcpb:: { RawScanRequest , RawScanResponse } ;
17
- use crate :: proto:: { errorpb , metapb} ;
16
+ use crate :: proto:: metapb;
18
17
use crate :: raw:: lowering:: * ;
19
18
use crate :: request:: CollectSingle ;
20
19
use crate :: request:: EncodeKeyspace ;
@@ -766,10 +765,6 @@ impl<PdC: PdClient> Client<PdC> {
766
765
let ( start_key, end_key) = range. clone ( ) . into_keys ( ) ;
767
766
let mut current_key: Key = start_key;
768
767
769
- let region_error_handler =
770
- |pd_rpc_client : Arc < PdC > , err : errorpb:: Error , store : RegionStore | {
771
- Box :: pin ( plan:: handle_region_error ( pd_rpc_client, err, store) )
772
- } as _ ;
773
768
while current_limit > 0 {
774
769
let scan_args = ScanInnerArgs {
775
770
start_key : current_key. clone ( ) ,
@@ -779,7 +774,7 @@ impl<PdC: PdClient> Client<PdC> {
779
774
reverse,
780
775
backoff : backoff. clone ( ) ,
781
776
} ;
782
- let ( res, next_key) = self . retryable_scan ( scan_args, region_error_handler ) . await ?;
777
+ let ( res, next_key) = self . retryable_scan ( scan_args) . await ?;
783
778
784
779
let mut kvs = res
785
780
. map ( |r| r. kvs . into_iter ( ) . map ( Into :: into) . collect :: < Vec < KvPair > > ( ) )
@@ -805,18 +800,10 @@ impl<PdC: PdClient> Client<PdC> {
805
800
Ok ( result)
806
801
}
807
802
808
- async fn retryable_scan < ' a , F > (
803
+ async fn retryable_scan (
809
804
& self ,
810
805
mut scan_args : ScanInnerArgs ,
811
- mut error_handler : F ,
812
- ) -> Result < ( Option < RawScanResponse > , Key ) >
813
- where
814
- F : FnMut (
815
- Arc < PdC > ,
816
- errorpb:: Error ,
817
- RegionStore ,
818
- ) -> Pin < Box < dyn Future < Output = Result < bool > > > > ,
819
- {
806
+ ) -> Result < ( Option < RawScanResponse > , Key ) > {
820
807
let start_key = scan_args. start_key ;
821
808
822
809
let region = self . rpc . clone ( ) . region_for_key ( & start_key) . await ?;
@@ -834,7 +821,8 @@ impl<PdC: PdClient> Client<PdC> {
834
821
Ok ( mut r) => {
835
822
if let Some ( err) = r. region_error ( ) {
836
823
let status =
837
- error_handler ( self . rpc . clone ( ) , err. clone ( ) , store. clone ( ) ) . await ?;
824
+ plan:: handle_region_error ( self . rpc . clone ( ) , err. clone ( ) , store. clone ( ) )
825
+ . await ?;
838
826
if status {
839
827
continue ;
840
828
} else if let Some ( duration) = scan_args. backoff . next_delay_duration ( ) {
0 commit comments