Skip to content

Commit 7ea3ed9

Browse files
author
Zelda Hessler
authored
No really, add the standard retry strategy to the orchestrator (#2741)
## Description <!--- Describe your changes in detail --> This PR adds support for the Standard retry strategy. The standard strategy will be inserted into a service's config bag if a retry config was set. Otherwise, unless another retry strategy was already set, a `NeverRetryStrategy` will be set. This seemed like a reasonable default, but I'm open to suggestions. ## Testing <!--- Please describe in detail how you tested your changes --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> tests are included --- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._
1 parent 303d99b commit 7ea3ed9

File tree

6 files changed

+250
-10
lines changed

6 files changed

+250
-10
lines changed

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ServiceRuntimePluginGenerator.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class ServiceRuntimePluginGenerator(
9191
"HttpConnector" to client.resolve("http_connector::HttpConnector"),
9292
"IdentityResolvers" to runtimeApi.resolve("client::identity::IdentityResolvers"),
9393
"InterceptorRegistrar" to runtimeApi.resolve("client::interceptors::InterceptorRegistrar"),
94+
"StandardRetryStrategy" to runtime.resolve("client::retries::strategy::StandardRetryStrategy"),
9495
"NeverRetryStrategy" to runtime.resolve("client::retries::strategy::NeverRetryStrategy"),
9596
"RetryClassifiers" to runtimeApi.resolve("client::retries::RetryClassifiers"),
9697
"Params" to endpointTypesGenerator.paramsStruct(),
@@ -141,11 +142,16 @@ class ServiceRuntimePluginGenerator(
141142
#{retry_classifier_customizations};
142143
cfg.set_retry_classifiers(retry_classifiers);
143144
144-
// TODO(enableNewSmithyRuntime): Wire up standard retry
145-
cfg.set_retry_strategy(#{NeverRetryStrategy}::new());
146-
147145
let sleep_impl = self.handle.conf.sleep_impl();
148146
let timeout_config = self.handle.conf.timeout_config();
147+
let retry_config = self.handle.conf.retry_config();
148+
149+
if let Some(retry_config) = retry_config {
150+
cfg.set_retry_strategy(#{StandardRetryStrategy}::new(retry_config));
151+
} else if cfg.retry_strategy().is_none() {
152+
cfg.set_retry_strategy(#{NeverRetryStrategy}::new());
153+
}
154+
149155
let connector_settings = timeout_config.map(#{ConnectorSettings}::from_timeout_config).unwrap_or_default();
150156
let connection: #{Box}<dyn #{Connection}> = #{Box}::new(#{DynConnectorAdapter}::new(
151157
// TODO(enableNewSmithyRuntime): Replace the tower-based DynConnector and remove DynConnectorAdapter when deleting the middleware implementation

rust-runtime/aws-smithy-runtime-api/src/client/orchestrator.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub trait ConfigBagAccessors {
135135
fn retry_classifiers(&self) -> &RetryClassifiers;
136136
fn set_retry_classifiers(&mut self, retry_classifier: RetryClassifiers);
137137

138-
fn retry_strategy(&self) -> &dyn RetryStrategy;
138+
fn retry_strategy(&self) -> Option<&dyn RetryStrategy>;
139139
fn set_retry_strategy(&mut self, retry_strategy: impl RetryStrategy + 'static);
140140

141141
fn request_time(&self) -> Option<SharedTimeSource>;
@@ -255,10 +255,8 @@ impl ConfigBagAccessors for ConfigBag {
255255
self.put::<RetryClassifiers>(retry_classifiers);
256256
}
257257

258-
fn retry_strategy(&self) -> &dyn RetryStrategy {
259-
&**self
260-
.get::<Box<dyn RetryStrategy>>()
261-
.expect("a retry strategy must be set")
258+
fn retry_strategy(&self) -> Option<&dyn RetryStrategy> {
259+
self.get::<Box<dyn RetryStrategy>>().map(|rs| &**rs)
262260
}
263261

264262
fn set_retry_strategy(&mut self, retry_strategy: impl RetryStrategy + 'static) {

rust-runtime/aws-smithy-runtime/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ pin-project-lite = "0.2.7"
2828
pin-utils = "0.1.0"
2929
tokio = { version = "1.25", features = [] }
3030
tracing = "0.1.37"
31+
fastrand = "1.4"
3132

3233
[dev-dependencies]
3334
aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio", "test-util"] }
3435
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["test-util"] }
36+
aws-smithy-types = { path = "../aws-smithy-types", features = ["test-util"] }
3537
tokio = { version = "1.25", features = ["macros", "rt", "test-util"] }
3638
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
3739
tracing-test = "0.2.1"

rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,12 @@ async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors:
132132
halt_on_err!([ctx] => interceptors.modify_before_retry_loop(ctx, cfg));
133133

134134
let retry_strategy = cfg.retry_strategy();
135-
match retry_strategy.should_attempt_initial_request(cfg) {
135+
// If we got a retry strategy from the bag, ask it what to do.
136+
// Otherwise, assume we should attempt the initial request.
137+
let should_attempt = retry_strategy
138+
.map(|rs| rs.should_attempt_initial_request(cfg))
139+
.unwrap_or(Ok(ShouldAttempt::Yes));
140+
match should_attempt {
136141
// Yes, let's make a request
137142
Ok(ShouldAttempt::Yes) => trace!("retry strategy has OKed initial request"),
138143
// No, this request shouldn't be sent
@@ -178,7 +183,13 @@ async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors:
178183
continue_on_err!([ctx] => maybe_timeout);
179184

180185
let retry_strategy = cfg.retry_strategy();
181-
let should_attempt = halt_on_err!([ctx] => retry_strategy.should_attempt_retry(ctx, cfg));
186+
// If we got a retry strategy from the bag, ask it what to do.
187+
// If no strategy was set, we won't retry.
188+
let should_attempt = halt_on_err!(
189+
[ctx] => retry_strategy
190+
.map(|rs| rs.should_attempt_retry(ctx, cfg))
191+
.unwrap_or(Ok(ShouldAttempt::No)
192+
));
182193
match should_attempt {
183194
// Yes, let's retry the request
184195
ShouldAttempt::Yes => continue,

rust-runtime/aws-smithy-runtime/src/client/retries/strategy.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
#[cfg(feature = "test-util")]
77
mod fixed_delay;
88
mod never;
9+
mod standard;
910

1011
#[cfg(feature = "test-util")]
1112
pub use fixed_delay::FixedDelayRetryStrategy;
1213
pub use never::NeverRetryStrategy;
14+
pub use standard::StandardRetryStrategy;
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use aws_smithy_runtime_api::client::interceptors::InterceptorContext;
7+
use aws_smithy_runtime_api::client::orchestrator::{BoxError, ConfigBagAccessors};
8+
use aws_smithy_runtime_api::client::request_attempts::RequestAttempts;
9+
use aws_smithy_runtime_api::client::retries::{
10+
ClassifyRetry, RetryReason, RetryStrategy, ShouldAttempt,
11+
};
12+
use aws_smithy_types::config_bag::ConfigBag;
13+
use aws_smithy_types::retry::RetryConfig;
14+
use std::time::Duration;
15+
16+
const DEFAULT_MAX_ATTEMPTS: usize = 4;
17+
18+
#[derive(Debug)]
19+
pub struct StandardRetryStrategy {
20+
max_attempts: usize,
21+
initial_backoff: Duration,
22+
max_backoff: Duration,
23+
base: fn() -> f64,
24+
}
25+
26+
impl StandardRetryStrategy {
27+
pub fn new(retry_config: &RetryConfig) -> Self {
28+
// TODO(enableNewSmithyRuntime) add support for `retry_config.reconnect_mode()` here or in the orchestrator flow.
29+
Self::default()
30+
.with_max_attempts(retry_config.max_attempts() as usize)
31+
.with_initial_backoff(retry_config.initial_backoff())
32+
}
33+
34+
pub fn with_base(mut self, base: fn() -> f64) -> Self {
35+
self.base = base;
36+
self
37+
}
38+
39+
pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
40+
self.max_attempts = max_attempts;
41+
self
42+
}
43+
44+
pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
45+
self.initial_backoff = initial_backoff;
46+
self
47+
}
48+
}
49+
50+
impl Default for StandardRetryStrategy {
51+
fn default() -> Self {
52+
Self {
53+
max_attempts: DEFAULT_MAX_ATTEMPTS,
54+
max_backoff: Duration::from_secs(20),
55+
// by default, use a random base for exponential backoff
56+
base: fastrand::f64,
57+
initial_backoff: Duration::from_secs(1),
58+
}
59+
}
60+
}
61+
62+
impl RetryStrategy for StandardRetryStrategy {
63+
// TODO(token-bucket) add support for optional cross-request token bucket
64+
fn should_attempt_initial_request(&self, _cfg: &ConfigBag) -> Result<ShouldAttempt, BoxError> {
65+
Ok(ShouldAttempt::Yes)
66+
}
67+
68+
fn should_attempt_retry(
69+
&self,
70+
ctx: &InterceptorContext,
71+
cfg: &ConfigBag,
72+
) -> Result<ShouldAttempt, BoxError> {
73+
// Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it
74+
let output_or_error = ctx.output_or_error().expect(
75+
"This must never be called without reaching the point where the result exists.",
76+
);
77+
if output_or_error.is_ok() {
78+
tracing::debug!("request succeeded, no retry necessary");
79+
return Ok(ShouldAttempt::No);
80+
}
81+
82+
// Check if we're out of attempts
83+
let request_attempts: &RequestAttempts = cfg
84+
.get()
85+
.expect("at least one request attempt is made before any retry is attempted");
86+
if request_attempts.attempts() >= self.max_attempts {
87+
tracing::trace!(
88+
attempts = request_attempts.attempts(),
89+
max_attempts = self.max_attempts,
90+
"not retrying because we are out of attempts"
91+
);
92+
return Ok(ShouldAttempt::No);
93+
}
94+
95+
// Run the classifiers against the context to determine if we should retry
96+
let retry_classifiers = cfg.retry_classifiers();
97+
let retry_reason = retry_classifiers.classify_retry(ctx);
98+
let backoff = match retry_reason {
99+
Some(RetryReason::Explicit(dur)) => dur,
100+
Some(RetryReason::Error(_)) => {
101+
let backoff = calculate_exponential_backoff(
102+
// Generate a random base multiplier to create jitter
103+
(self.base)(),
104+
// Get the backoff time multiplier in seconds (with fractional seconds)
105+
self.initial_backoff.as_secs_f64(),
106+
// `self.local.attempts` tracks number of requests made including the initial request
107+
// The initial attempt shouldn't count towards backoff calculations so we subtract it
108+
(request_attempts.attempts() - 1) as u32,
109+
);
110+
Duration::from_secs_f64(backoff).min(self.max_backoff)
111+
}
112+
Some(_) => {
113+
unreachable!("RetryReason is non-exhaustive. Therefore, we need to cover this unreachable case.")
114+
}
115+
None => {
116+
tracing::trace!(
117+
attempts = request_attempts.attempts(),
118+
max_attempts = self.max_attempts,
119+
"encountered unretryable error"
120+
);
121+
return Ok(ShouldAttempt::No);
122+
}
123+
};
124+
125+
tracing::debug!(
126+
"attempt {} failed with {:?}; retrying after {:?}",
127+
request_attempts.attempts(),
128+
retry_reason.expect("the match statement above ensures this is not None"),
129+
backoff
130+
);
131+
132+
Ok(ShouldAttempt::YesAfterDelay(backoff))
133+
}
134+
}
135+
136+
fn calculate_exponential_backoff(base: f64, initial_backoff: f64, retry_attempts: u32) -> f64 {
137+
base * initial_backoff * 2_u32.pow(retry_attempts) as f64
138+
}
139+
140+
#[cfg(test)]
141+
mod tests {
142+
use super::{ShouldAttempt, StandardRetryStrategy};
143+
use aws_smithy_runtime_api::client::interceptors::InterceptorContext;
144+
use aws_smithy_runtime_api::client::orchestrator::{ConfigBagAccessors, OrchestratorError};
145+
use aws_smithy_runtime_api::client::request_attempts::RequestAttempts;
146+
use aws_smithy_runtime_api::client::retries::{AlwaysRetry, RetryClassifiers, RetryStrategy};
147+
use aws_smithy_types::config_bag::ConfigBag;
148+
use aws_smithy_types::retry::ErrorKind;
149+
use aws_smithy_types::type_erasure::TypeErasedBox;
150+
use std::time::Duration;
151+
152+
#[test]
153+
fn no_retry_necessary_for_ok_result() {
154+
let cfg = ConfigBag::base();
155+
let mut ctx = InterceptorContext::new(TypeErasedBox::doesnt_matter());
156+
let strategy = StandardRetryStrategy::default();
157+
ctx.set_output_or_error(Ok(TypeErasedBox::doesnt_matter()));
158+
let actual = strategy
159+
.should_attempt_retry(&ctx, &cfg)
160+
.expect("method is infallible for this use");
161+
assert_eq!(ShouldAttempt::No, actual);
162+
}
163+
164+
fn set_up_cfg_and_context(
165+
error_kind: ErrorKind,
166+
current_request_attempts: usize,
167+
) -> (InterceptorContext, ConfigBag) {
168+
let mut ctx = InterceptorContext::new(TypeErasedBox::doesnt_matter());
169+
ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
170+
let mut cfg = ConfigBag::base();
171+
cfg.set_retry_classifiers(RetryClassifiers::new().with_classifier(AlwaysRetry(error_kind)));
172+
cfg.put(RequestAttempts::new(current_request_attempts));
173+
174+
(ctx, cfg)
175+
}
176+
177+
// Test that error kinds produce the correct "retry after X seconds" output.
178+
// All error kinds are handled in the same way for the standard strategy.
179+
fn test_should_retry_error_kind(error_kind: ErrorKind) {
180+
let (ctx, cfg) = set_up_cfg_and_context(error_kind, 3);
181+
let strategy = StandardRetryStrategy::default().with_base(|| 1.0);
182+
let actual = strategy
183+
.should_attempt_retry(&ctx, &cfg)
184+
.expect("method is infallible for this use");
185+
assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
186+
}
187+
188+
#[test]
189+
fn should_retry_transient_error_result_after_2s() {
190+
test_should_retry_error_kind(ErrorKind::TransientError);
191+
}
192+
193+
#[test]
194+
fn should_retry_client_error_result_after_2s() {
195+
test_should_retry_error_kind(ErrorKind::ClientError);
196+
}
197+
198+
#[test]
199+
fn should_retry_server_error_result_after_2s() {
200+
test_should_retry_error_kind(ErrorKind::ServerError);
201+
}
202+
203+
#[test]
204+
fn should_retry_throttling_error_result_after_2s() {
205+
test_should_retry_error_kind(ErrorKind::ThrottlingError);
206+
}
207+
208+
#[test]
209+
fn dont_retry_when_out_of_attempts() {
210+
let current_attempts = 4;
211+
let max_attempts = current_attempts;
212+
let (ctx, cfg) = set_up_cfg_and_context(ErrorKind::TransientError, current_attempts);
213+
let strategy = StandardRetryStrategy::default()
214+
.with_base(|| 1.0)
215+
.with_max_attempts(max_attempts);
216+
let actual = strategy
217+
.should_attempt_retry(&ctx, &cfg)
218+
.expect("method is infallible for this use");
219+
assert_eq!(ShouldAttempt::No, actual);
220+
}
221+
}

0 commit comments

Comments
 (0)