Skip to content

Commit 9c95803

Browse files
Zelda Hesslerjdisanti
andauthored
add support for adaptive retries when orchestrator mode is enabled (#2800)
## Motivation and Context <!--- Why is this change required? What problem does it solve? --> <!--- If it fixes an open issue, please link to the issue here --> #2190 ## Description <!--- Describe your changes in detail --> add support for adaptive retries ## Testing <!--- Please describe in detail how you tested your changes --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> I wrote some tests and I'm open to suggestions for more. ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: John DiSanti <jdisanti@amazon.com>
1 parent c8ba2d5 commit 9c95803

File tree

25 files changed

+1017
-251
lines changed

25 files changed

+1017
-251
lines changed

aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/IntegrationTestDependencies.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package software.amazon.smithy.rustsdk
88
import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
99
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
1010
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
11+
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Approx
1112
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.AsyncStd
1213
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.AsyncStream
1314
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.BytesUtils
@@ -26,6 +27,8 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
2627
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingAppender
2728
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingSubscriber
2829
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingTest
30+
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.smithyRuntime
31+
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.smithyRuntimeApi
2932
import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope
3033
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
3134
import software.amazon.smithy.rust.codegen.core.rustlang.writable
@@ -73,21 +76,30 @@ class IntegrationTestDependencies(
7376
private val hasTests: Boolean,
7477
private val hasBenches: Boolean,
7578
) : LibRsCustomization() {
79+
private val runtimeConfig = codegenContext.runtimeConfig
7680
override fun section(section: LibRsSection) = when (section) {
7781
is LibRsSection.Body -> testDependenciesOnly {
7882
if (hasTests) {
7983
val smithyClient = CargoDependency.smithyClient(codegenContext.runtimeConfig)
8084
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
8185
val smithyAsync = CargoDependency.smithyAsync(codegenContext.runtimeConfig)
8286
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
87+
val smithyTypes = CargoDependency.smithyTypes(codegenContext.runtimeConfig)
88+
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
8389
addDependency(smithyClient)
8490
addDependency(smithyAsync)
91+
addDependency(smithyTypes)
8592
addDependency(CargoDependency.smithyProtocolTestHelpers(codegenContext.runtimeConfig))
8693
addDependency(SerdeJson)
8794
addDependency(Tokio)
8895
addDependency(FuturesUtil)
8996
addDependency(Tracing.toDevDependency())
9097
addDependency(TracingSubscriber)
98+
99+
if (codegenContext.smithyRuntimeMode.generateOrchestrator) {
100+
addDependency(smithyRuntime(runtimeConfig).copy(features = setOf("test-util"), scope = DependencyScope.Dev))
101+
addDependency(smithyRuntimeApi(runtimeConfig).copy(features = setOf("test-util"), scope = DependencyScope.Dev))
102+
}
91103
}
92104
if (hasBenches) {
93105
addDependency(Criterion)
@@ -103,6 +115,7 @@ class IntegrationTestDependencies(
103115
private fun serviceSpecificCustomizations(): List<LibRsCustomization> = when (moduleName) {
104116
"transcribestreaming" -> listOf(TranscribeTestDependencies())
105117
"s3" -> listOf(S3TestDependencies(codegenContext))
118+
"dynamodb" -> listOf(DynamoDbTestDependencies())
106119
else -> emptyList()
107120
}
108121
}
@@ -116,6 +129,13 @@ class TranscribeTestDependencies : LibRsCustomization() {
116129
}
117130
}
118131

132+
class DynamoDbTestDependencies : LibRsCustomization() {
133+
override fun section(section: LibRsSection): Writable =
134+
writable {
135+
addDependency(Approx)
136+
}
137+
}
138+
119139
class S3TestDependencies(private val codegenContext: ClientCodegenContext) : LibRsCustomization() {
120140
override fun section(section: LibRsSection): Writable =
121141
writable {

aws/sdk/integration-tests/dynamodb/Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,27 @@ publish = false
1111
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1212

1313
[dependencies]
14+
approx = "0.5.1"
15+
aws-config = { path = "../../build/aws-sdk/sdk/aws-config" }
1416
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
1517
aws-http = { path = "../../build/aws-sdk/sdk/aws-http" }
1618
aws-sdk-dynamodb = { path = "../../build/aws-sdk/sdk/dynamodb" }
19+
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async" }
1720
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util", "rustls"] }
1821
aws-smithy-http = { path = "../../build/aws-sdk/sdk/aws-smithy-http" }
19-
aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" }
2022
aws-smithy-protocol-test = { path = "../../build/aws-sdk/sdk/aws-smithy-protocol-test" }
21-
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async" }
23+
aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["test-util"]}
24+
aws-smithy-runtime-api = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime-api", features = ["test-util"]}
25+
aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types", features = ["test-util"]}
2226
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
2327
bytes = "1.0.0"
2428
criterion = { version = "0.4.0" }
2529
futures-util = { version = "0.3.16", default-features = false }
2630
http = "0.2.0"
2731
serde_json = "1.0.0"
2832
tokio = { version = "1.23.1", features = ["full", "test-util"] }
29-
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
3033
tokio-stream = "0.1.5"
34+
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
3135

3236
[[bench]]
3337
name = "deserialization_bench"
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#[cfg(aws_sdk_orchestrator_mode)]
7+
mod test {
8+
use aws_sdk_dynamodb::config::{Credentials, Region, SharedAsyncSleep};
9+
use aws_sdk_dynamodb::{config::retry::RetryConfig, error::ProvideErrorMetadata};
10+
use aws_smithy_async::rt::sleep::TokioSleep;
11+
use aws_smithy_async::test_util::instant_time_and_sleep;
12+
use aws_smithy_async::time::SharedTimeSource;
13+
use aws_smithy_async::time::SystemTimeSource;
14+
use aws_smithy_client::test_connection::TestConnection;
15+
use aws_smithy_http::body::SdkBody;
16+
use aws_smithy_runtime::client::retries::RetryPartition;
17+
use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18+
use aws_smithy_types::timeout::TimeoutConfigBuilder;
19+
use std::time::{Duration, Instant, SystemTime};
20+
21+
fn req() -> HttpRequest {
22+
http::Request::builder()
23+
.body(SdkBody::from("request body"))
24+
.unwrap()
25+
}
26+
27+
fn ok() -> HttpResponse {
28+
http::Response::builder()
29+
.status(200)
30+
.header("server", "Server")
31+
.header("content-type", "application/x-amz-json-1.0")
32+
.header("content-length", "23")
33+
.header("connection", "keep-alive")
34+
.header("x-amz-crc32", "2335643545")
35+
.body(SdkBody::from("{ \"TableNames\": [ \"Test\" ] }"))
36+
.unwrap()
37+
}
38+
39+
fn err() -> HttpResponse {
40+
http::Response::builder()
41+
.status(500)
42+
.body(SdkBody::from("{ \"message\": \"The request has failed because of an unknown error, exception or failure.\", \"code\": \"InternalServerError\" }"))
43+
.unwrap()
44+
}
45+
46+
fn throttling_err() -> HttpResponse {
47+
http::Response::builder()
48+
.status(400)
49+
.body(SdkBody::from("{ \"message\": \"The request was denied due to request throttling.\", \"code\": \"ThrottlingException\" }"))
50+
.unwrap()
51+
}
52+
53+
#[tokio::test]
54+
async fn test_adaptive_retries_with_no_throttling_errors() {
55+
let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
56+
57+
let events = vec![
58+
// First operation
59+
(req(), err()),
60+
(req(), err()),
61+
(req(), ok()),
62+
// Second operation
63+
(req(), err()),
64+
(req(), ok()),
65+
// Third operation will fail, only errors
66+
(req(), err()),
67+
(req(), err()),
68+
(req(), err()),
69+
(req(), err()),
70+
];
71+
72+
let conn = TestConnection::new(events);
73+
let config = aws_sdk_dynamodb::Config::builder()
74+
.credentials_provider(Credentials::for_tests())
75+
.region(Region::new("us-east-1"))
76+
.retry_config(
77+
RetryConfig::adaptive()
78+
.with_max_attempts(4)
79+
.with_use_static_exponential_base(true),
80+
)
81+
.time_source(SharedTimeSource::new(time_source))
82+
.sleep_impl(SharedAsyncSleep::new(sleep_impl.clone()))
83+
.retry_partition(RetryPartition::new(
84+
"test_adaptive_retries_with_no_throttling_errors",
85+
))
86+
.http_connector(conn.clone())
87+
.build();
88+
let expected_table_names = vec!["Test".to_owned()];
89+
90+
// We create a new client each time to ensure that the cross-client retry state is working.
91+
let client = aws_sdk_dynamodb::Client::from_conf(config.clone());
92+
let res = client.list_tables().send().await.unwrap();
93+
assert_eq!(sleep_impl.total_duration(), Duration::from_secs(3));
94+
assert_eq!(res.table_names(), Some(expected_table_names.as_slice()));
95+
// Three requests should have been made, two failing & one success
96+
assert_eq!(conn.requests().len(), 3);
97+
98+
let client = aws_sdk_dynamodb::Client::from_conf(config.clone());
99+
let res = client.list_tables().send().await.unwrap();
100+
assert_eq!(sleep_impl.total_duration(), Duration::from_secs(3 + 1));
101+
assert_eq!(res.table_names(), Some(expected_table_names.as_slice()));
102+
// Two requests should have been made, one failing & one success (plus previous requests)
103+
assert_eq!(conn.requests().len(), 5);
104+
105+
let client = aws_sdk_dynamodb::Client::from_conf(config);
106+
let err = client.list_tables().send().await.unwrap_err();
107+
assert_eq!(sleep_impl.total_duration(), Duration::from_secs(3 + 1 + 7),);
108+
assert_eq!(err.code(), Some("InternalServerError"));
109+
// four requests should have been made, all failing (plus previous requests)
110+
assert_eq!(conn.requests().len(), 9);
111+
}
112+
113+
#[tokio::test]
114+
async fn test_adaptive_retries_with_throttling_errors_times_out() {
115+
tracing_subscriber::fmt::init();
116+
let events = vec![
117+
// First operation
118+
(req(), err()),
119+
(req(), ok()),
120+
// Second operation
121+
(req(), err()),
122+
(req(), throttling_err()),
123+
(req(), ok()),
124+
];
125+
126+
let conn = TestConnection::new(events);
127+
let config = aws_sdk_dynamodb::Config::builder()
128+
.credentials_provider(Credentials::for_tests())
129+
.region(Region::new("us-east-1"))
130+
.retry_config(
131+
RetryConfig::adaptive()
132+
.with_max_attempts(4)
133+
.with_initial_backoff(Duration::from_millis(50))
134+
.with_use_static_exponential_base(true),
135+
)
136+
.timeout_config(
137+
TimeoutConfigBuilder::new()
138+
.operation_attempt_timeout(Duration::from_millis(100))
139+
.build(),
140+
)
141+
.time_source(SharedTimeSource::new(SystemTimeSource::new()))
142+
.sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
143+
.http_connector(conn.clone())
144+
.retry_partition(RetryPartition::new(
145+
"test_adaptive_retries_with_throttling_errors_times_out",
146+
))
147+
.build();
148+
149+
let expected_table_names = vec!["Test".to_owned()];
150+
let start = Instant::now();
151+
152+
// We create a new client each time to ensure that the cross-client retry state is working.
153+
let client = aws_sdk_dynamodb::Client::from_conf(config.clone());
154+
let res = client.list_tables().send().await.unwrap();
155+
assert_eq!(res.table_names(), Some(expected_table_names.as_slice()));
156+
// Three requests should have been made, two failing & one success
157+
assert_eq!(conn.requests().len(), 2);
158+
159+
let client = aws_sdk_dynamodb::Client::from_conf(config);
160+
let err = client.list_tables().send().await.unwrap_err();
161+
assert_eq!(err.to_string(), "request has timed out".to_owned());
162+
// two requests should have been made, both failing (plus previous requests)
163+
assert_eq!(conn.requests().len(), 2 + 2);
164+
165+
let since = start.elapsed();
166+
// At least 300 milliseconds must pass:
167+
// - 50ms for the first retry on attempt 1
168+
// - 50ms for the second retry on attempt 3
169+
// - 100ms for the throttling delay triggered by attempt 4, which required a delay longer than the attempt timeout.
170+
// - 100ms for the 5th attempt, which would have succeeded, but required a delay longer than the attempt timeout.
171+
assert!(since.as_secs_f64() > 0.3);
172+
}
173+
}

0 commit comments

Comments
 (0)