Skip to content

Commit 142844e

Browse files
committed
retry S3 request for storage.get and storage.exists
Typically AWS SDKs have retry logic in place on certain errors, it seems like `rusoto` does not: rusoto/rusoto#234 Before we migrate to the official S3 SDK, this retries these requests.
1 parent 6e2a276 commit 142844e

File tree

1 file changed

+53
-23
lines changed

1 file changed

+53
-23
lines changed

src/storage/s3.rs

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,43 @@ use chrono::{DateTime, NaiveDateTime, Utc};
55
use futures_util::{
66
future::TryFutureExt,
77
stream::{FuturesUnordered, StreamExt},
8+
Future,
89
};
9-
use rusoto_core::{region::Region, RusotoError};
10+
use rusoto_core::{region::Region, RusotoError, RusotoResult};
1011
use rusoto_credential::DefaultCredentialsProvider;
1112
use rusoto_s3::{
1213
DeleteObjectsRequest, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectRequest,
1314
ListObjectsV2Request, ObjectIdentifier, PutObjectRequest, S3Client, S3,
1415
};
15-
use std::{convert::TryInto, io::Write, sync::Arc};
16-
use tokio::runtime::Runtime;
16+
use std::{convert::TryInto, io::Write, sync::Arc, time::Duration};
17+
use tokio::{runtime::Runtime, time::sleep};
18+
19+
const MAX_ATTEMPT_COUNT: u64 = 3;
20+
21+
async fn retry_request<F, Fut, T, E>(mut f: F) -> RusotoResult<T, E>
22+
where
23+
F: FnMut() -> Fut,
24+
Fut: Future<Output = RusotoResult<T, E>>,
25+
{
26+
let mut attempts: u64 = 0;
27+
loop {
28+
match f().await {
29+
Err(RusotoError::HttpDispatch(err)) => {
30+
attempts += 1;
31+
if attempts >= MAX_ATTEMPT_COUNT {
32+
break Err(RusotoError::HttpDispatch(err));
33+
}
34+
log::info!(
35+
"got HttpDispatchError trying to call S3 (will retry {} more times): {:?}",
36+
MAX_ATTEMPT_COUNT - attempts,
37+
err
38+
);
39+
sleep(Duration::from_secs(attempts)).await;
40+
}
41+
result => break result,
42+
}
43+
}
44+
}
1745

1846
pub(super) struct S3Backend {
1947
client: S3Client,
@@ -69,12 +97,14 @@ impl S3Backend {
6997

7098
pub(super) fn exists(&self, path: &str) -> Result<bool, Error> {
7199
self.runtime.block_on(async {
72-
let req = HeadObjectRequest {
73-
bucket: self.bucket.clone(),
74-
key: path.into(),
75-
..Default::default()
76-
};
77-
let resp = self.client.head_object(req).await;
100+
let resp = retry_request(|| {
101+
self.client.head_object(HeadObjectRequest {
102+
bucket: self.bucket.clone(),
103+
key: path.into(),
104+
..Default::default()
105+
})
106+
})
107+
.await;
78108
match resp {
79109
Ok(_) => Ok(true),
80110
Err(RusotoError::Service(HeadObjectError::NoSuchKey(_))) => Ok(false),
@@ -91,24 +121,24 @@ impl S3Backend {
91121
range: Option<FileRange>,
92122
) -> Result<Blob, Error> {
93123
self.runtime.block_on(async {
94-
let res = self
95-
.client
96-
.get_object(GetObjectRequest {
124+
let res = retry_request(|| {
125+
self.client.get_object(GetObjectRequest {
97126
bucket: self.bucket.to_string(),
98127
key: path.into(),
99-
range: range.map(|r| format!("bytes={}-{}", r.start(), r.end())),
128+
range: range
129+
.as_ref()
130+
.map(|r| format!("bytes={}-{}", r.start(), r.end())),
100131
..Default::default()
101132
})
102-
.await
103-
.map_err(|err| match err {
104-
RusotoError::Service(GetObjectError::NoSuchKey(_)) => {
105-
super::PathNotFoundError.into()
106-
}
107-
RusotoError::Unknown(http) if http.status == 404 => {
108-
super::PathNotFoundError.into()
109-
}
110-
err => Error::from(err),
111-
})?;
133+
})
134+
.await
135+
.map_err(|err| match err {
136+
RusotoError::Service(GetObjectError::NoSuchKey(_)) => {
137+
super::PathNotFoundError.into()
138+
}
139+
RusotoError::Unknown(http) if http.status == 404 => super::PathNotFoundError.into(),
140+
err => Error::from(err),
141+
})?;
112142

113143
let mut content = crate::utils::sized_buffer::SizedBuffer::new(max_size);
114144
content.reserve(

0 commit comments

Comments
 (0)