Skip to content

Commit 97794ff

Browse files
authored
Add jitter support to retry behavior (#41)
According to our streaming spec, the retry behavior should also include some form of jitter. This jitter will cause the delay to be AT MOST halved (e.g. a 30 second delay, post jitter will be in the range of [15, 30] seconds).
1 parent b747f2a commit 97794ff

File tree

3 files changed

+36
-10
lines changed

3 files changed

+36
-10
lines changed

eventsource-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ log = "0.4.6"
2020
pin-project = "1.0.10"
2121
tokio = { version = "1.17.0", features = ["time"] }
2222
hyper-timeout = "0.4.1"
23+
rand = "0.8.5"
2324

2425
[dev-dependencies]
2526
env_logger = "0.7.1"

eventsource-client/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ impl<C> ReconnectingRequest<C> {
318318
reconnect_delay,
319319
delay_max,
320320
backoff_factor,
321+
true,
321322
)),
322323
redirect_count: 0,
323324
current_url: url,

eventsource-client/src/retry.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::time::{Duration, Instant};
22

3+
use rand::{thread_rng, Rng};
4+
35
pub(crate) trait RetryStrategy {
46
/// Return the next amount of time a failed request should delay before re-attempting.
57
fn next_delay(&mut self, current_time: Instant) -> Duration;
@@ -17,18 +19,25 @@ pub(crate) struct BackoffRetry {
1719
base_delay: Duration,
1820
max_delay: Duration,
1921
backoff_factor: u32,
22+
include_jitter: bool,
2023

2124
reset_interval: Duration,
2225
next_delay: Duration,
2326
good_since: Option<Instant>,
2427
}
2528

2629
impl BackoffRetry {
27-
pub fn new(base_delay: Duration, max_delay: Duration, backoff_factor: u32) -> Self {
30+
pub fn new(
31+
base_delay: Duration,
32+
max_delay: Duration,
33+
backoff_factor: u32,
34+
include_jitter: bool,
35+
) -> Self {
2836
Self {
2937
base_delay,
3038
max_delay,
3139
backoff_factor,
40+
include_jitter,
3241
reset_interval: DEFAULT_RESET_RETRY_INTERVAL,
3342
next_delay: base_delay,
3443
good_since: None,
@@ -38,18 +47,22 @@ impl BackoffRetry {
3847

3948
impl RetryStrategy for BackoffRetry {
4049
fn next_delay(&mut self, current_time: Instant) -> Duration {
41-
let mut next_delay = self.next_delay;
50+
let mut current_delay = self.next_delay;
4251

4352
if let Some(good_since) = self.good_since {
4453
if current_time - good_since >= self.reset_interval {
45-
next_delay = self.base_delay;
54+
current_delay = self.base_delay;
4655
}
4756
}
4857

4958
self.good_since = None;
50-
self.next_delay = std::cmp::min(self.max_delay, next_delay * self.backoff_factor);
59+
self.next_delay = std::cmp::min(self.max_delay, current_delay * self.backoff_factor);
5160

52-
next_delay
61+
if self.include_jitter {
62+
thread_rng().gen_range(current_delay / 2..=current_delay)
63+
} else {
64+
current_delay
65+
}
5366
}
5467

5568
fn change_base_delay(&mut self, base_delay: Duration) {
@@ -76,7 +89,7 @@ mod tests {
7689
#[test]
7790
fn test_fixed_retry() {
7891
let base = Duration::from_secs(10);
79-
let mut retry = BackoffRetry::new(base, Duration::from_secs(30), 1);
92+
let mut retry = BackoffRetry::new(base, Duration::from_secs(30), 1, false);
8093
let start = Instant::now() - Duration::from_secs(60);
8194

8295
assert_eq!(retry.next_delay(start), base);
@@ -87,7 +100,7 @@ mod tests {
87100
#[test]
88101
fn test_able_to_reset_base_delay() {
89102
let base = Duration::from_secs(10);
90-
let mut retry = BackoffRetry::new(base, Duration::from_secs(30), 1);
103+
let mut retry = BackoffRetry::new(base, Duration::from_secs(30), 1, false);
91104
let start = Instant::now();
92105

93106
assert_eq!(retry.next_delay(start), base);
@@ -102,7 +115,7 @@ mod tests {
102115
fn test_with_backoff() {
103116
let base = Duration::from_secs(10);
104117
let max = Duration::from_secs(60);
105-
let mut retry = BackoffRetry::new(base, max, 2);
118+
let mut retry = BackoffRetry::new(base, max, 2, false);
106119
let start = Instant::now() - Duration::from_secs(60);
107120

108121
assert_eq!(retry.next_delay(start), base);
@@ -117,12 +130,23 @@ mod tests {
117130
assert_eq!(retry.next_delay(start.add(Duration::from_secs(3))), max);
118131
}
119132

133+
#[test]
134+
fn test_with_jitter() {
135+
let base = Duration::from_secs(10);
136+
let max = Duration::from_secs(60);
137+
let mut retry = BackoffRetry::new(base, max, 1, true);
138+
let start = Instant::now() - Duration::from_secs(60);
139+
140+
let delay = retry.next_delay(start);
141+
assert!(base / 2 <= delay && delay <= base);
142+
}
143+
120144
#[test]
121145
fn test_retry_holds_at_max() {
122146
let base = Duration::from_secs(20);
123147
let max = Duration::from_secs(30);
124148

125-
let mut retry = BackoffRetry::new(base, max, 2);
149+
let mut retry = BackoffRetry::new(base, max, 2, false);
126150
let start = Instant::now();
127151
retry.reset(start);
128152

@@ -146,7 +170,7 @@ mod tests {
146170
let reset_interval = Duration::from_secs(45);
147171

148172
// Prepare a retry strategy that has succeeded at a specific point.
149-
let mut retry = BackoffRetry::new(base, max, 2);
173+
let mut retry = BackoffRetry::new(base, max, 2, false);
150174
retry.reset_interval = reset_interval;
151175
let start = Instant::now() - Duration::from_secs(60);
152176
retry.reset(start);

0 commit comments

Comments
 (0)