3
3
* SPDX-License-Identifier: Apache-2.0
4
4
*/
5
5
6
+ use std:: sync:: Mutex ;
7
+ use std:: time:: { Duration , SystemTime } ;
8
+
9
+ use tokio:: sync:: OwnedSemaphorePermit ;
10
+ use tracing:: debug;
11
+
12
+ use aws_smithy_runtime_api:: box_error:: BoxError ;
13
+ use aws_smithy_runtime_api:: client:: interceptors:: context:: InterceptorContext ;
14
+ use aws_smithy_runtime_api:: client:: retries:: classifiers:: { RetryAction , RetryReason } ;
15
+ use aws_smithy_runtime_api:: client:: retries:: { RequestAttempts , RetryStrategy , ShouldAttempt } ;
16
+ use aws_smithy_runtime_api:: client:: runtime_components:: RuntimeComponents ;
17
+ use aws_smithy_types:: config_bag:: { ConfigBag , Storable , StoreReplace } ;
18
+ use aws_smithy_types:: retry:: { ErrorKind , RetryConfig , RetryMode } ;
19
+
6
20
use crate :: client:: retries:: classifiers:: run_classifiers_on_ctx;
7
21
use crate :: client:: retries:: client_rate_limiter:: { ClientRateLimiter , RequestReason } ;
8
22
use crate :: client:: retries:: strategy:: standard:: ReleaseResult :: {
@@ -11,17 +25,6 @@ use crate::client::retries::strategy::standard::ReleaseResult::{
11
25
use crate :: client:: retries:: token_bucket:: TokenBucket ;
12
26
use crate :: client:: retries:: { ClientRateLimiterPartition , RetryPartition } ;
13
27
use crate :: static_partition_map:: StaticPartitionMap ;
14
- use aws_smithy_runtime_api:: box_error:: BoxError ;
15
- use aws_smithy_runtime_api:: client:: interceptors:: context:: InterceptorContext ;
16
- use aws_smithy_runtime_api:: client:: retries:: classifiers:: { RetryAction , RetryReason } ;
17
- use aws_smithy_runtime_api:: client:: retries:: { RequestAttempts , RetryStrategy , ShouldAttempt } ;
18
- use aws_smithy_runtime_api:: client:: runtime_components:: RuntimeComponents ;
19
- use aws_smithy_types:: config_bag:: { ConfigBag , Storable , StoreReplace } ;
20
- use aws_smithy_types:: retry:: { ErrorKind , RetryConfig , RetryMode } ;
21
- use std:: sync:: Mutex ;
22
- use std:: time:: { Duration , SystemTime } ;
23
- use tokio:: sync:: OwnedSemaphorePermit ;
24
- use tracing:: debug;
25
28
26
29
static CLIENT_RATE_LIMITER : StaticPartitionMap < ClientRateLimiterPartition , ClientRateLimiter > =
27
30
StaticPartitionMap :: new ( ) ;
@@ -56,7 +59,7 @@ impl StandardRetryStrategy {
56
59
fn set_retry_permit ( & self , new_retry_permit : OwnedSemaphorePermit ) {
57
60
let mut old_retry_permit = self . retry_permit . lock ( ) . unwrap ( ) ;
58
61
if let Some ( p) = old_retry_permit. replace ( new_retry_permit) {
59
- // Whenever we set a new retry permit and it replaces the old one, we need to "forget"
62
+ // Whenever we set a new retry permit, and it replaces the old one, we need to "forget"
60
63
// the old permit, removing it from the bucket forever.
61
64
p. forget ( )
62
65
}
@@ -141,7 +144,7 @@ impl StandardRetryStrategy {
141
144
// Get the backoff time multiplier in seconds (with fractional seconds)
142
145
retry_cfg. initial_backoff ( ) . as_secs_f64 ( ) ,
143
146
// `self.local.attempts` tracks number of requests made including the initial request
144
- // The initial attempt shouldn't count towards backoff calculations so we subtract it
147
+ // The initial attempt shouldn't count towards backoff calculations, so we subtract it
145
148
request_attempts - 1 ,
146
149
) ;
147
150
Ok ( Duration :: from_secs_f64 ( backoff) . min ( retry_cfg. max_backoff ( ) ) )
@@ -194,27 +197,6 @@ impl RetryStrategy for StandardRetryStrategy {
194
197
cfg : & ConfigBag ,
195
198
) -> Result < ShouldAttempt , BoxError > {
196
199
let retry_cfg = cfg. load :: < RetryConfig > ( ) . expect ( "retry config is required" ) ;
197
- // Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it
198
- let output_or_error = ctx. output_or_error ( ) . expect (
199
- "This must never be called without reaching the point where the result exists." ,
200
- ) ;
201
- let token_bucket = cfg. load :: < TokenBucket > ( ) ;
202
- if output_or_error. is_ok ( ) {
203
- debug ! ( "request succeeded, no retry necessary" ) ;
204
- if let Some ( tb) = token_bucket {
205
- // If this retry strategy is holding any permits, release them back to the bucket.
206
- if let NoPermitWasReleased = self . release_retry_permit ( ) {
207
- // In the event that there was no retry permit to release, we generate new
208
- // permits from nothing. We do this to make up for permits we had to "forget".
209
- // Otherwise, repeated retries would empty the bucket and nothing could fill it
210
- // back up again.
211
- tb. regenerate_a_token ( ) ;
212
- }
213
- }
214
- update_rate_limiter_if_exists ( runtime_components, cfg, false ) ;
215
-
216
- return Ok ( ShouldAttempt :: No ) ;
217
- }
218
200
219
201
// Check if we're out of attempts
220
202
let request_attempts = cfg
@@ -236,19 +218,40 @@ impl RetryStrategy for StandardRetryStrategy {
236
218
let retry_classifiers = runtime_components. retry_classifiers ( ) ;
237
219
let classifier_result = run_classifiers_on_ctx ( retry_classifiers, ctx) ;
238
220
239
- // Calculate the appropriate backoff time.
240
- let backoff =
241
- match self . calculate_backoff ( runtime_components, cfg, retry_cfg, & classifier_result) {
221
+ if classifier_result. should_retry ( ) {
222
+ // Calculate the appropriate backoff time.
223
+ let backoff = match self . calculate_backoff (
224
+ runtime_components,
225
+ cfg,
226
+ retry_cfg,
227
+ & classifier_result,
228
+ ) {
242
229
Ok ( value) => value,
243
230
// In some cases, backoff calculation will decide that we shouldn't retry at all.
244
231
Err ( value) => return Ok ( value) ,
245
232
} ;
246
- debug ! (
247
- "attempt #{request_attempts} failed with {:?}; retrying after {:?}" ,
248
- classifier_result, backoff,
249
- ) ;
233
+ debug ! (
234
+ "attempt #{request_attempts} failed with {:?}; retrying after {:?}" ,
235
+ classifier_result, backoff,
236
+ ) ;
237
+
238
+ Ok ( ShouldAttempt :: YesAfterDelay ( backoff) )
239
+ } else {
240
+ debug ! ( "attempt #{request_attempts} succeeded, no retry necessary" ) ;
241
+ if let Some ( tb) = cfg. load :: < TokenBucket > ( ) {
242
+ // If this retry strategy is holding any permits, release them back to the bucket.
243
+ if let NoPermitWasReleased = self . release_retry_permit ( ) {
244
+ // In the event that there was no retry permit to release, we generate new
245
+ // permits from nothing. We do this to make up for permits we had to "forget".
246
+ // Otherwise, repeated retries would empty the bucket and nothing could fill it
247
+ // back up again.
248
+ tb. regenerate_a_token ( ) ;
249
+ }
250
+ }
251
+ update_rate_limiter_if_exists ( runtime_components, cfg, false ) ;
250
252
251
- Ok ( ShouldAttempt :: YesAfterDelay ( backoff) )
253
+ Ok ( ShouldAttempt :: No )
254
+ }
252
255
}
253
256
}
254
257
@@ -305,34 +308,43 @@ fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
305
308
306
309
#[ cfg( test) ]
307
310
mod tests {
308
- use super :: * ;
311
+ use std:: fmt;
312
+ use std:: sync:: Mutex ;
313
+ use std:: time:: Duration ;
314
+
315
+ use aws_smithy_runtime_api:: client:: interceptors:: context:: {
316
+ Input , InterceptorContext , Output ,
317
+ } ;
309
318
use aws_smithy_runtime_api:: client:: orchestrator:: OrchestratorError ;
310
319
use aws_smithy_runtime_api:: client:: retries:: classifiers:: {
311
320
ClassifyRetry , RetryAction , SharedRetryClassifier ,
312
321
} ;
313
- use aws_smithy_runtime_api:: client:: retries:: { AlwaysRetry , RetryStrategy } ;
314
- use aws_smithy_runtime_api:: client:: runtime_components:: RuntimeComponentsBuilder ;
315
- use aws_smithy_types:: config_bag:: Layer ;
316
- use aws_smithy_types:: retry:: { ErrorKind , ProvideErrorKind } ;
317
- use std:: fmt;
318
- use std:: sync:: Mutex ;
319
- use std:: time:: Duration ;
322
+ use aws_smithy_runtime_api:: client:: retries:: {
323
+ AlwaysRetry , RequestAttempts , RetryStrategy , ShouldAttempt ,
324
+ } ;
325
+ use aws_smithy_runtime_api:: client:: runtime_components:: {
326
+ RuntimeComponents , RuntimeComponentsBuilder ,
327
+ } ;
328
+ use aws_smithy_types:: config_bag:: { ConfigBag , Layer } ;
329
+ use aws_smithy_types:: retry:: { ErrorKind , ProvideErrorKind , RetryConfig } ;
320
330
331
+ use super :: { calculate_exponential_backoff, StandardRetryStrategy } ;
321
332
#[ cfg( feature = "test-util" ) ]
322
- use crate :: client:: retries:: token_bucket:: TokenBucket ;
323
- use aws_smithy_runtime_api:: client:: interceptors:: context:: { Input , Output } ;
333
+ use crate :: client:: retries:: TokenBucket ;
324
334
325
335
#[ test]
326
336
fn no_retry_necessary_for_ok_result ( ) {
327
337
let cfg = ConfigBag :: of_layers ( vec ! [ {
328
338
let mut layer = Layer :: new( "test" ) ;
329
339
layer. store_put( RetryConfig :: standard( ) ) ;
340
+ layer. store_put( RequestAttempts :: new( 1 ) ) ;
330
341
layer
331
342
} ] ) ;
332
343
let rc = RuntimeComponentsBuilder :: for_tests ( ) . build ( ) . unwrap ( ) ;
333
344
let mut ctx = InterceptorContext :: new ( Input :: doesnt_matter ( ) ) ;
334
345
let strategy = StandardRetryStrategy :: default ( ) ;
335
346
ctx. set_output_or_error ( Ok ( Output :: doesnt_matter ( ) ) ) ;
347
+
336
348
let actual = strategy
337
349
. should_attempt_retry ( & ctx, & rc, & cfg)
338
350
. expect ( "method is infallible for this use" ) ;
@@ -441,7 +453,7 @@ mod tests {
441
453
#[ cfg( feature = "test-util" ) ]
442
454
impl PresetReasonRetryClassifier {
443
455
fn new ( mut retry_reasons : Vec < RetryAction > ) -> Self {
444
- // We'll pop the retry_reasons in reverse order so we reverse the list to fix that.
456
+ // We'll pop the retry_reasons in reverse order, so we reverse the list to fix that.
445
457
retry_reasons. reverse ( ) ;
446
458
Self {
447
459
retry_actions : Mutex :: new ( retry_reasons) ,
@@ -557,6 +569,98 @@ mod tests {
557
569
assert_eq ! ( token_bucket. available_permits( ) , 490 ) ;
558
570
}
559
571
572
+ #[ cfg( feature = "test-util" ) ]
573
+ #[ test]
574
+ fn successful_request_and_deser_should_be_retryable ( ) {
575
+ #[ derive( Clone , Copy , Debug ) ]
576
+ enum LongRunningOperationStatus {
577
+ Running ,
578
+ Complete ,
579
+ }
580
+
581
+ #[ derive( Debug ) ]
582
+ struct LongRunningOperationOutput {
583
+ status : Option < LongRunningOperationStatus > ,
584
+ }
585
+
586
+ impl LongRunningOperationOutput {
587
+ fn status ( & self ) -> Option < LongRunningOperationStatus > {
588
+ self . status
589
+ }
590
+ }
591
+
592
+ struct WaiterRetryClassifier { }
593
+
594
+ impl WaiterRetryClassifier {
595
+ fn new ( ) -> Self {
596
+ WaiterRetryClassifier { }
597
+ }
598
+ }
599
+
600
+ impl fmt:: Debug for WaiterRetryClassifier {
601
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
602
+ write ! ( f, "WaiterRetryClassifier" )
603
+ }
604
+ }
605
+ impl ClassifyRetry for WaiterRetryClassifier {
606
+ fn classify_retry ( & self , ctx : & InterceptorContext ) -> RetryAction {
607
+ let status: Option < LongRunningOperationStatus > =
608
+ ctx. output_or_error ( ) . and_then ( |res| {
609
+ res. ok ( ) . and_then ( |output| {
610
+ output
611
+ . downcast_ref :: < LongRunningOperationOutput > ( )
612
+ . and_then ( |output| output. status ( ) )
613
+ } )
614
+ } ) ;
615
+
616
+ if let Some ( LongRunningOperationStatus :: Running ) = status {
617
+ return RetryAction :: server_error ( ) ;
618
+ } ;
619
+
620
+ RetryAction :: NoActionIndicated
621
+ }
622
+
623
+ fn name ( & self ) -> & ' static str {
624
+ "waiter retry classifier"
625
+ }
626
+ }
627
+
628
+ let retry_config = RetryConfig :: standard ( )
629
+ . with_use_static_exponential_base ( true )
630
+ . with_max_attempts ( 5 ) ;
631
+
632
+ let rc = RuntimeComponentsBuilder :: for_tests ( )
633
+ . with_retry_classifier ( SharedRetryClassifier :: new ( WaiterRetryClassifier :: new ( ) ) )
634
+ . build ( )
635
+ . unwrap ( ) ;
636
+ let mut layer = Layer :: new ( "test" ) ;
637
+ layer. store_put ( retry_config) ;
638
+ let mut cfg = ConfigBag :: of_layers ( vec ! [ layer] ) ;
639
+ let mut ctx = InterceptorContext :: new ( Input :: doesnt_matter ( ) ) ;
640
+ let strategy = StandardRetryStrategy :: new ( ) ;
641
+
642
+ ctx. set_output_or_error ( Ok ( Output :: erase ( LongRunningOperationOutput {
643
+ status : Some ( LongRunningOperationStatus :: Running ) ,
644
+ } ) ) ) ;
645
+
646
+ cfg. interceptor_state ( ) . store_put ( TokenBucket :: new ( 5 ) ) ;
647
+ let token_bucket = cfg. load :: < TokenBucket > ( ) . unwrap ( ) . clone ( ) ;
648
+
649
+ cfg. interceptor_state ( ) . store_put ( RequestAttempts :: new ( 1 ) ) ;
650
+ let should_retry = strategy. should_attempt_retry ( & ctx, & rc, & cfg) . unwrap ( ) ;
651
+ let dur = should_retry. expect_delay ( ) ;
652
+ assert_eq ! ( dur, Duration :: from_secs( 1 ) ) ;
653
+ assert_eq ! ( token_bucket. available_permits( ) , 0 ) ;
654
+
655
+ ctx. set_output_or_error ( Ok ( Output :: erase ( LongRunningOperationOutput {
656
+ status : Some ( LongRunningOperationStatus :: Complete ) ,
657
+ } ) ) ) ;
658
+ cfg. interceptor_state ( ) . store_put ( RequestAttempts :: new ( 2 ) ) ;
659
+ let should_retry = strategy. should_attempt_retry ( & ctx, & rc, & cfg) . unwrap ( ) ;
660
+ should_retry. expect_no ( ) ;
661
+ assert_eq ! ( token_bucket. available_permits( ) , 5 ) ;
662
+ }
663
+
560
664
#[ cfg( feature = "test-util" ) ]
561
665
#[ test]
562
666
fn no_quota ( ) {
0 commit comments