Skip to content

Commit 1f67ece

Browse files
committed
feat!: implement retryable calls
1 parent a660346 commit 1f67ece

File tree

4 files changed

+128
-24
lines changed

4 files changed

+128
-24
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ name = "esplora_client"
1717
path = "src/lib.rs"
1818

1919
[dependencies]
20+
async-std = "1.13.0"
2021
serde = { version = "1.0", features = ["derive"] }
2122
bitcoin = { version = "0.32", features = ["serde", "std"], default-features = false }
2223
hex = { version = "0.2", package = "hex-conservative" }

src/async.rs

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
1414
use std::collections::HashMap;
1515
use std::str::FromStr;
16+
use std::time::Duration;
1617

18+
use async_std::task;
1719
use bitcoin::consensus::{deserialize, serialize, Decodable, Encodable};
1820
use bitcoin::hashes::{sha256, Hash};
1921
use bitcoin::hex::{DisplayHex, FromHex};
@@ -24,16 +26,21 @@ use bitcoin::{
2426
#[allow(unused_imports)]
2527
use log::{debug, error, info, trace};
2628

27-
use reqwest::{header, Client};
29+
use reqwest::{header, Client, Response};
2830

29-
use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
31+
use crate::{
32+
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
33+
RETRYABLE_ERROR_CODES,
34+
};
3035

3136
#[derive(Debug, Clone)]
3237
pub struct AsyncClient {
3338
/// The URL of the Esplora Server.
3439
url: String,
3540
/// The inner [`reqwest::Client`] to make HTTP requests.
3641
client: Client,
42+
backoff: Duration,
43+
max_retries: u32,
3744
}
3845

3946
impl AsyncClient {
@@ -63,12 +70,22 @@ impl AsyncClient {
6370
client_builder = client_builder.default_headers(headers);
6471
}
6572

66-
Ok(Self::from_client(builder.base_url, client_builder.build()?))
73+
Ok(AsyncClient {
74+
url: builder.base_url,
75+
client: client_builder.build()?,
76+
backoff: builder.backoff,
77+
max_retries: builder.max_retries,
78+
})
6779
}
6880

6981
/// Build an async client from the base url and [`Client`]
7082
pub fn from_client(url: String, client: Client) -> Self {
71-
AsyncClient { url, client }
83+
AsyncClient {
84+
url,
85+
client,
86+
backoff: crate::DEFAULT_BACKOFF,
87+
max_retries: crate::DEFAULT_MAX_RETRIES,
88+
}
7289
}
7390

7491
/// Make an HTTP GET request to given URL, deserializing to any `T` that
@@ -84,7 +101,7 @@ impl AsyncClient {
84101
/// [`bitcoin::consensus::Decodable`] deserialization.
85102
async fn get_response<T: Decodable>(&self, path: &str) -> Result<T, Error> {
86103
let url = format!("{}{}", self.url, path);
87-
let response = self.client.get(url).send().await?;
104+
let response = self.get_with_retry(&url).await?;
88105

89106
if !response.status().is_success() {
90107
return Err(Error::HttpResponse {
@@ -127,7 +144,7 @@ impl AsyncClient {
127144
path: &str,
128145
) -> Result<T, Error> {
129146
let url = format!("{}{}", self.url, path);
130-
let response = self.client.get(url).send().await?;
147+
let response = self.get_with_retry(&url).await?;
131148

132149
if !response.status().is_success() {
133150
return Err(Error::HttpResponse {
@@ -172,7 +189,7 @@ impl AsyncClient {
172189
/// [`bitcoin::consensus::Decodable`] deserialization.
173190
async fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
174191
let url = format!("{}{}", self.url, path);
175-
let response = self.client.get(url).send().await?;
192+
let response = self.get_with_retry(&url).await?;
176193

177194
if !response.status().is_success() {
178195
return Err(Error::HttpResponse {
@@ -212,7 +229,7 @@ impl AsyncClient {
212229
/// This function will return an error either from the HTTP client.
213230
async fn get_response_text(&self, path: &str) -> Result<String, Error> {
214231
let url = format!("{}{}", self.url, path);
215-
let response = self.client.get(url).send().await?;
232+
let response = self.get_with_retry(&url).await?;
216233

217234
if !response.status().is_success() {
218235
return Err(Error::HttpResponse {
@@ -422,4 +439,26 @@ impl AsyncClient {
422439
pub fn client(&self) -> &Client {
423440
&self.client
424441
}
442+
443+
/// Sends a GET request to the given `url`, retrying failed attempts
444+
/// for retryable error codes until max retries hit.
445+
async fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
446+
let mut attempts = 0;
447+
let mut delay = self.backoff;
448+
449+
loop {
450+
match self.client.get(url).send().await {
451+
Ok(resp)
452+
if attempts < self.max_retries
453+
&& RETRYABLE_ERROR_CODES.contains(&resp.status().as_u16()) =>
454+
{
455+
task::sleep(delay).await;
456+
attempts += 1;
457+
delay *= 2;
458+
}
459+
Ok(resp) => return Ok(resp),
460+
Err(e) => return Err(Error::Reqwest(e)),
461+
}
462+
}
463+
}
425464
}

src/blocking.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
use std::collections::HashMap;
1515
use std::convert::TryFrom;
1616
use std::str::FromStr;
17+
use std::thread;
18+
use std::time::Duration;
1719

1820
#[allow(unused_imports)]
1921
use log::{debug, error, info, trace};
2022

21-
use minreq::{Proxy, Request};
23+
use minreq::{Proxy, Request, Response};
2224

2325
use bitcoin::consensus::{deserialize, serialize, Decodable};
2426
use bitcoin::hashes::{sha256, Hash};
@@ -27,7 +29,10 @@ use bitcoin::{
2729
block::Header as BlockHeader, Block, BlockHash, MerkleBlock, Script, Transaction, Txid,
2830
};
2931

30-
use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
32+
use crate::{
33+
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
34+
RETRYABLE_ERROR_CODES,
35+
};
3136

3237
#[derive(Debug, Clone)]
3338
pub struct BlockingClient {
@@ -39,6 +44,10 @@ pub struct BlockingClient {
3944
pub timeout: Option<u64>,
4045
/// HTTP headers to set on every request made to Esplora server
4146
pub headers: HashMap<String, String>,
47+
/// Backoff
48+
pub backoff: Duration,
49+
/// Max retries
50+
pub max_retries: u32,
4251
}
4352

4453
impl BlockingClient {
@@ -49,6 +58,8 @@ impl BlockingClient {
4958
proxy: builder.proxy,
5059
timeout: builder.timeout,
5160
headers: builder.headers,
61+
backoff: builder.backoff,
62+
max_retries: builder.max_retries,
5263
}
5364
}
5465

@@ -80,20 +91,20 @@ impl BlockingClient {
8091
}
8192

8293
fn get_opt_response<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
83-
match self.get_request(path)?.send() {
94+
match self.get_with_retry(path) {
8495
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
8596
Ok(resp) if !is_status_ok(resp.status_code) => {
8697
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
8798
let message = resp.as_str().unwrap_or_default().to_string();
8899
Err(Error::HttpResponse { status, message })
89100
}
90101
Ok(resp) => Ok(Some(deserialize::<T>(resp.as_bytes())?)),
91-
Err(e) => Err(Error::Minreq(e)),
102+
Err(e) => Err(e),
92103
}
93104
}
94105

95106
fn get_opt_response_txid(&self, path: &str) -> Result<Option<Txid>, Error> {
96-
match self.get_request(path)?.send() {
107+
match self.get_with_retry(path) {
97108
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
98109
Ok(resp) if !is_status_ok(resp.status_code) => {
99110
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
@@ -103,12 +114,12 @@ impl BlockingClient {
103114
Ok(resp) => Ok(Some(
104115
Txid::from_str(resp.as_str().map_err(Error::Minreq)?).map_err(Error::HexToArray)?,
105116
)),
106-
Err(e) => Err(Error::Minreq(e)),
117+
Err(e) => Err(e),
107118
}
108119
}
109120

110121
fn get_opt_response_hex<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
111-
match self.get_request(path)?.send() {
122+
match self.get_with_retry(path) {
112123
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
113124
Ok(resp) if !is_status_ok(resp.status_code) => {
114125
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
@@ -122,12 +133,12 @@ impl BlockingClient {
122133
.map_err(Error::BitcoinEncoding)
123134
.map(|r| Some(r))
124135
}
125-
Err(e) => Err(Error::Minreq(e)),
136+
Err(e) => Err(e),
126137
}
127138
}
128139

129140
fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
130-
match self.get_request(path)?.send() {
141+
match self.get_with_retry(path) {
131142
Ok(resp) if !is_status_ok(resp.status_code) => {
132143
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
133144
let message = resp.as_str().unwrap_or_default().to_string();
@@ -138,51 +149,51 @@ impl BlockingClient {
138149
let hex_vec = Vec::from_hex(hex_str).unwrap();
139150
deserialize::<T>(&hex_vec).map_err(Error::BitcoinEncoding)
140151
}
141-
Err(e) => Err(Error::Minreq(e)),
152+
Err(e) => Err(e),
142153
}
143154
}
144155

145156
fn get_response_json<'a, T: serde::de::DeserializeOwned>(
146157
&'a self,
147158
path: &'a str,
148159
) -> Result<T, Error> {
149-
let response = self.get_request(path)?.send();
160+
let response = self.get_with_retry(path);
150161
match response {
151162
Ok(resp) if !is_status_ok(resp.status_code) => {
152163
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
153164
let message = resp.as_str().unwrap_or_default().to_string();
154165
Err(Error::HttpResponse { status, message })
155166
}
156167
Ok(resp) => Ok(resp.json::<T>().map_err(Error::Minreq)?),
157-
Err(e) => Err(Error::Minreq(e)),
168+
Err(e) => Err(e),
158169
}
159170
}
160171

161172
fn get_opt_response_json<T: serde::de::DeserializeOwned>(
162173
&self,
163174
path: &str,
164175
) -> Result<Option<T>, Error> {
165-
match self.get_request(path)?.send() {
176+
match self.get_with_retry(path) {
166177
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
167178
Ok(resp) if !is_status_ok(resp.status_code) => {
168179
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
169180
let message = resp.as_str().unwrap_or_default().to_string();
170181
Err(Error::HttpResponse { status, message })
171182
}
172183
Ok(resp) => Ok(Some(resp.json::<T>()?)),
173-
Err(e) => Err(Error::Minreq(e)),
184+
Err(e) => Err(e),
174185
}
175186
}
176187

177188
fn get_response_str(&self, path: &str) -> Result<String, Error> {
178-
match self.get_request(path)?.send() {
189+
match self.get_with_retry(path) {
179190
Ok(resp) if !is_status_ok(resp.status_code) => {
180191
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
181192
let message = resp.as_str().unwrap_or_default().to_string();
182193
Err(Error::HttpResponse { status, message })
183194
}
184195
Ok(resp) => Ok(resp.as_str()?.to_string()),
185-
Err(e) => Err(Error::Minreq(e)),
196+
Err(e) => Err(e),
186197
}
187198
}
188199

@@ -339,6 +350,28 @@ impl BlockingClient {
339350
};
340351
self.get_response_json(&path)
341352
}
353+
354+
/// Sends a GET request to the given `url`, retrying failed attempts
355+
/// for retryable error codes until max retries hit.
356+
pub fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
357+
let mut attempts = 0;
358+
let mut delay = self.backoff;
359+
360+
loop {
361+
match self.get_request(url)?.send() {
362+
Ok(resp)
363+
if attempts < self.max_retries
364+
&& RETRYABLE_ERROR_CODES.contains(&(resp.status_code as u16)) =>
365+
{
366+
thread::sleep(delay);
367+
attempts += 1;
368+
delay *= 2;
369+
}
370+
Ok(resp) => return Ok(resp),
371+
Err(e) => return Err(Error::Minreq(e)),
372+
}
373+
}
374+
}
342375
}
343376

344377
fn is_status_ok(status: i32) -> bool {

src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
use std::collections::HashMap;
7070
use std::fmt;
7171
use std::num::TryFromIntError;
72+
use std::time::Duration;
7273

7374
pub mod api;
7475

@@ -83,6 +84,18 @@ pub use blocking::BlockingClient;
8384
#[cfg(feature = "async")]
8485
pub use r#async::AsyncClient;
8586

87+
/// Response status codes for which the request may be retried.
88+
const RETRYABLE_ERROR_CODES: [u16; 2] = [
89+
429, // TOO_MANY_REQUESTS
90+
503, // SERVICE_UNAVAILABLE
91+
];
92+
93+
/// Default backoff.
94+
const DEFAULT_BACKOFF: Duration = Duration::from_millis(512);
95+
96+
/// Default max retries.
97+
const DEFAULT_MAX_RETRIES: u32 = 3;
98+
8699
/// Get a fee value in sats/vbytes from the estimates
87100
/// that matches the confirmation target set as parameter.
88101
///
@@ -117,6 +130,10 @@ pub struct Builder {
117130
pub timeout: Option<u64>,
118131
/// HTTP headers to set on every request made to Esplora server.
119132
pub headers: HashMap<String, String>,
133+
/// Backoff
134+
pub backoff: Duration,
135+
/// Max retries
136+
pub max_retries: u32,
120137
}
121138

122139
impl Builder {
@@ -127,6 +144,8 @@ impl Builder {
127144
proxy: None,
128145
timeout: None,
129146
headers: HashMap::new(),
147+
backoff: DEFAULT_BACKOFF,
148+
max_retries: DEFAULT_MAX_RETRIES,
130149
}
131150
}
132151

@@ -148,6 +167,18 @@ impl Builder {
148167
self
149168
}
150169

170+
/// Set backoff.
171+
pub fn backoff(mut self, duration: Duration) -> Self {
172+
self.backoff = duration;
173+
self
174+
}
175+
176+
/// Set max retries.
177+
pub fn max_retries(mut self, count: u32) -> Self {
178+
self.max_retries = count;
179+
self
180+
}
181+
151182
/// Build a blocking client from builder
152183
#[cfg(feature = "blocking")]
153184
pub fn build_blocking(self) -> BlockingClient {

0 commit comments

Comments
 (0)