Skip to content

Commit b747f2a

Browse files
authored
Reset retry backoff after max retry interval (#40)
According to our streaming spec, default retry behavior should continue with a backoff calculation until 60 seconds of continuous failures have occurred, after which we can reset the retry interval. While working on this task, I extracted the retry strategy into a separate struct. This keeps the client a little cleaner, allows us to actually unit test the retry policy, and will hopefully more easily allow us to control the retry behavior for future unit tests.
1 parent 2b9eb55 commit b747f2a

File tree

3 files changed

+207
-27
lines changed

3 files changed

+207
-27
lines changed

eventsource-client/src/client.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@ use std::{
1818
fmt::{self, Debug, Display, Formatter},
1919
future::Future,
2020
io::ErrorKind,
21-
mem,
2221
pin::Pin,
2322
str::FromStr,
2423
task::{Context, Poll},
25-
time::Duration,
24+
time::{Duration, Instant},
2625
};
2726

2827
use tokio::{
@@ -39,6 +38,7 @@ use hyper_timeout::TimeoutConnector;
3938
use crate::event_parser::EventParser;
4039
use crate::event_parser::SSE;
4140

41+
use crate::retry::{BackoffRetry, RetryStrategy};
4242
use std::error::Error as StdError;
4343

4444
#[cfg(feature = "rustls")]
@@ -292,7 +292,7 @@ pub struct ReconnectingRequest<C> {
292292
props: RequestProps,
293293
#[pin]
294294
state: State,
295-
next_reconnect_delay: Duration,
295+
retry_strategy: Box<dyn RetryStrategy + Send + Sync>,
296296
current_url: Uri,
297297
redirect_count: u32,
298298
event_parser: EventParser,
@@ -306,12 +306,19 @@ impl<C> ReconnectingRequest<C> {
306306
last_event_id: Option<String>,
307307
) -> ReconnectingRequest<C> {
308308
let reconnect_delay = props.reconnect_opts.delay;
309+
let delay_max = props.reconnect_opts.delay_max;
310+
let backoff_factor = props.reconnect_opts.backoff_factor;
311+
309312
let url = props.url.clone();
310313
ReconnectingRequest {
311314
props,
312315
http,
313316
state: State::New,
314-
next_reconnect_delay: reconnect_delay,
317+
retry_strategy: Box::new(BackoffRetry::new(
318+
reconnect_delay,
319+
delay_max,
320+
backoff_factor,
321+
)),
315322
redirect_count: 0,
316323
current_url: url,
317324
event_parser: EventParser::new(),
@@ -352,23 +359,6 @@ impl<C> ReconnectingRequest<C> {
352359
Ok(self.http.request(request))
353360
}
354361

355-
fn backoff(mut self: Pin<&mut Self>) -> Duration {
356-
let delay = self.next_reconnect_delay;
357-
let this = self.as_mut().project();
358-
let mut next_reconnect_delay = std::cmp::min(
359-
this.props.reconnect_opts.delay_max,
360-
*this.next_reconnect_delay * this.props.reconnect_opts.backoff_factor,
361-
);
362-
mem::swap(this.next_reconnect_delay, &mut next_reconnect_delay);
363-
delay
364-
}
365-
366-
fn reset_backoff(self: Pin<&mut Self>) {
367-
let mut delay = self.props.reconnect_opts.delay;
368-
let this = self.project();
369-
mem::swap(this.next_reconnect_delay, &mut delay);
370-
}
371-
372362
fn reset_redirects(self: Pin<&mut Self>) {
373363
let url = self.props.url.clone();
374364
let this = self.project();
@@ -402,8 +392,8 @@ where
402392
*this.last_event_id = evt.id.clone();
403393

404394
if let Some(retry) = evt.retry {
405-
this.props.reconnect_opts.delay = Duration::from_millis(retry);
406-
self.as_mut().reset_backoff();
395+
this.retry_strategy
396+
.change_base_delay(Duration::from_millis(retry));
407397
}
408398
Poll::Ready(Some(Ok(event)))
409399
}
@@ -441,7 +431,7 @@ where
441431
debug!("HTTP response: {:#?}", resp);
442432

443433
if resp.status().is_success() {
444-
self.as_mut().reset_backoff();
434+
self.as_mut().project().retry_strategy.reset(Instant::now());
445435
self.as_mut().reset_redirects();
446436
self.as_mut()
447437
.project()
@@ -482,7 +472,11 @@ where
482472
self.as_mut().project().state.set(State::New);
483473
return Poll::Ready(Some(Err(Error::HttpStream(Box::new(e)))));
484474
}
485-
let duration = self.as_mut().backoff();
475+
let duration = self
476+
.as_mut()
477+
.project()
478+
.retry_strategy
479+
.next_delay(Instant::now());
486480
self.as_mut()
487481
.project()
488482
.state
@@ -506,7 +500,11 @@ where
506500
}
507501
Some(Err(e)) => {
508502
if self.props.reconnect_opts.reconnect {
509-
let duration = self.as_mut().backoff();
503+
let duration = self
504+
.as_mut()
505+
.project()
506+
.retry_strategy
507+
.next_delay(Instant::now());
510508
self.as_mut()
511509
.project()
512510
.state
@@ -524,7 +522,11 @@ where
524522
}
525523
}
526524
None => {
527-
let duration = self.as_mut().backoff();
525+
let duration = self
526+
.as_mut()
527+
.project()
528+
.retry_strategy
529+
.next_delay(Instant::now());
528530
self.as_mut()
529531
.project()
530532
.state

eventsource-client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod client;
3030
mod config;
3131
mod error;
3232
mod event_parser;
33+
mod retry;
3334

3435
pub use client::*;
3536
pub use config::*;

eventsource-client/src/retry.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use std::time::{Duration, Instant};
2+
3+
pub(crate) trait RetryStrategy {
4+
/// Return the next amount of time a failed request should delay before re-attempting.
5+
fn next_delay(&mut self, current_time: Instant) -> Duration;
6+
7+
/// Modify the strategy's default base delay.
8+
fn change_base_delay(&mut self, base_delay: Duration);
9+
10+
/// Used to indicate to the strategy that it can reset as a successful connection has been made.
11+
fn reset(&mut self, current_time: Instant);
12+
}
13+
14+
const DEFAULT_RESET_RETRY_INTERVAL: Duration = Duration::from_secs(60);
15+
16+
pub(crate) struct BackoffRetry {
17+
base_delay: Duration,
18+
max_delay: Duration,
19+
backoff_factor: u32,
20+
21+
reset_interval: Duration,
22+
next_delay: Duration,
23+
good_since: Option<Instant>,
24+
}
25+
26+
impl BackoffRetry {
27+
pub fn new(base_delay: Duration, max_delay: Duration, backoff_factor: u32) -> Self {
28+
Self {
29+
base_delay,
30+
max_delay,
31+
backoff_factor,
32+
reset_interval: DEFAULT_RESET_RETRY_INTERVAL,
33+
next_delay: base_delay,
34+
good_since: None,
35+
}
36+
}
37+
}
38+
39+
impl RetryStrategy for BackoffRetry {
40+
fn next_delay(&mut self, current_time: Instant) -> Duration {
41+
let mut next_delay = self.next_delay;
42+
43+
if let Some(good_since) = self.good_since {
44+
if current_time - good_since >= self.reset_interval {
45+
next_delay = self.base_delay;
46+
}
47+
}
48+
49+
self.good_since = None;
50+
self.next_delay = std::cmp::min(self.max_delay, next_delay * self.backoff_factor);
51+
52+
next_delay
53+
}
54+
55+
fn change_base_delay(&mut self, base_delay: Duration) {
56+
self.base_delay = base_delay;
57+
self.next_delay = self.base_delay;
58+
}
59+
60+
fn reset(&mut self, current_time: Instant) {
61+
// While the external application has indicated success, we don't actually want to reset the
62+
// retry policy just yet. Instead, we want to record the time it was successful. Then when
63+
// we calculate the next delay, we can reset the strategy ONLY when it has been at least
64+
// DEFAULT_RESET_RETRY_INTERVAL seconds.
65+
self.good_since = Some(current_time);
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use std::ops::Add;
72+
use std::time::{Duration, Instant};
73+
74+
use crate::retry::{BackoffRetry, RetryStrategy};
75+
76+
#[test]
77+
fn test_fixed_retry() {
78+
let base = Duration::from_secs(10);
79+
let mut retry = BackoffRetry::new(base, Duration::from_secs(30), 1);
80+
let start = Instant::now() - Duration::from_secs(60);
81+
82+
assert_eq!(retry.next_delay(start), base);
83+
assert_eq!(retry.next_delay(start.add(Duration::from_secs(1))), base);
84+
assert_eq!(retry.next_delay(start.add(Duration::from_secs(2))), base);
85+
}
86+
87+
#[test]
88+
fn test_able_to_reset_base_delay() {
89+
let base = Duration::from_secs(10);
90+
let mut retry = BackoffRetry::new(base, Duration::from_secs(30), 1);
91+
let start = Instant::now();
92+
93+
assert_eq!(retry.next_delay(start), base);
94+
assert_eq!(retry.next_delay(start.add(Duration::from_secs(1))), base);
95+
96+
let base = Duration::from_secs(3);
97+
retry.change_base_delay(base);
98+
assert_eq!(retry.next_delay(start.add(Duration::from_secs(2))), base);
99+
}
100+
101+
#[test]
102+
fn test_with_backoff() {
103+
let base = Duration::from_secs(10);
104+
let max = Duration::from_secs(60);
105+
let mut retry = BackoffRetry::new(base, max, 2);
106+
let start = Instant::now() - Duration::from_secs(60);
107+
108+
assert_eq!(retry.next_delay(start), base);
109+
assert_eq!(
110+
retry.next_delay(start.add(Duration::from_secs(1))),
111+
base * 2
112+
);
113+
assert_eq!(
114+
retry.next_delay(start.add(Duration::from_secs(2))),
115+
base * 4
116+
);
117+
assert_eq!(retry.next_delay(start.add(Duration::from_secs(3))), max);
118+
}
119+
120+
#[test]
121+
fn test_retry_holds_at_max() {
122+
let base = Duration::from_secs(20);
123+
let max = Duration::from_secs(30);
124+
125+
let mut retry = BackoffRetry::new(base, max, 2);
126+
let start = Instant::now();
127+
retry.reset(start);
128+
129+
let time = start.add(Duration::from_secs(20));
130+
let delay = retry.next_delay(time);
131+
assert_eq!(delay, base);
132+
133+
let time = time.add(Duration::from_secs(20));
134+
let delay = retry.next_delay(time);
135+
assert_eq!(delay, max);
136+
137+
let time = time.add(Duration::from_secs(20));
138+
let delay = retry.next_delay(time);
139+
assert_eq!(delay, max);
140+
}
141+
142+
#[test]
143+
fn test_reset_interval() {
144+
let base = Duration::from_secs(10);
145+
let max = Duration::from_secs(60);
146+
let reset_interval = Duration::from_secs(45);
147+
148+
// Prepare a retry strategy that has succeeded at a specific point.
149+
let mut retry = BackoffRetry::new(base, max, 2);
150+
retry.reset_interval = reset_interval;
151+
let start = Instant::now() - Duration::from_secs(60);
152+
retry.reset(start);
153+
154+
// Verify that calculating the next delay returns as expected
155+
let time = start.add(Duration::from_secs(1));
156+
let delay = retry.next_delay(time);
157+
assert_eq!(delay, base);
158+
159+
// Verify resetting the last known good time doesn't change the retry policy since it hasn't
160+
// exceeded the retry interval.
161+
let time = time.add(delay);
162+
retry.reset(time);
163+
164+
let time = time.add(Duration::from_secs(10));
165+
let delay = retry.next_delay(time);
166+
assert_eq!(delay, base * 2);
167+
168+
// And finally check that if we exceed the reset interval, the retry strategy will default
169+
// back to base.
170+
let time = time.add(delay);
171+
retry.reset(time);
172+
173+
let time = time.add(reset_interval);
174+
let delay = retry.next_delay(time);
175+
assert_eq!(delay, base);
176+
}
177+
}

0 commit comments

Comments
 (0)