@@ -147,9 +147,27 @@ where
147
147
) -> Result < <Self as Plan >:: Result > {
148
148
// limit concurrent requests
149
149
let permit = permits. acquire ( ) . await . unwrap ( ) ;
150
- let mut resp = plan. execute ( ) . await ? ;
150
+ let res = plan. execute ( ) . await ;
151
151
drop ( permit) ;
152
152
153
+ let is_grpc_error = |e : & Error | matches ! ( e, Error :: GrpcAPI ( _) | Error :: Grpc ( _) ) ;
154
+ let mut resp = match res {
155
+ Ok ( resp) => resp,
156
+ Err ( e) if is_grpc_error ( & e) => {
157
+ return Self :: handle_grpc_error (
158
+ pd_client,
159
+ plan,
160
+ region_store,
161
+ backoff,
162
+ permits,
163
+ preserve_region_results,
164
+ e,
165
+ )
166
+ . await ;
167
+ }
168
+ Err ( e) => return Err ( e) ,
169
+ } ;
170
+
153
171
if let Some ( e) = resp. key_errors ( ) {
154
172
Ok ( vec ! [ Err ( Error :: MultipleKeyErrors ( e) ) ] )
155
173
} else if let Some ( e) = resp. region_error ( ) {
@@ -272,6 +290,34 @@ where
272
290
pd_client. invalidate_region_cache ( ver_id) . await ;
273
291
Ok ( false )
274
292
}
293
+
294
+ async fn handle_grpc_error (
295
+ pd_client : Arc < PdC > ,
296
+ plan : P ,
297
+ region_store : RegionStore ,
298
+ mut backoff : Backoff ,
299
+ permits : Arc < Semaphore > ,
300
+ preserve_region_results : bool ,
301
+ e : Error ,
302
+ ) -> Result < <Self as Plan >:: Result > {
303
+ debug ! ( "handle grpc error: {:?}" , e) ;
304
+ let ver_id = region_store. region_with_leader . ver_id ( ) ;
305
+ pd_client. invalidate_region_cache ( ver_id) . await ;
306
+ match backoff. next_delay_duration ( ) {
307
+ Some ( duration) => {
308
+ sleep ( duration) . await ;
309
+ Self :: single_plan_handler (
310
+ pd_client,
311
+ plan,
312
+ backoff,
313
+ permits,
314
+ preserve_region_results,
315
+ )
316
+ . await
317
+ }
318
+ None => Err ( e) ,
319
+ }
320
+ }
275
321
}
276
322
277
323
impl < P : Plan , PdC : PdClient > Clone for RetryableMultiRegion < P , PdC > {
0 commit comments