Skip to content

Commit 0e474cf

Browse files
committed
Auto merge of #11881 - ehuss:http-retry, r=epage
Add delays to network retries. This adds a delay to network retries to help guard against short-lived transient errors. The overview of how this works is: Requests that fail are placed into a queue with a timestamp of when they should be retried. The download code then checks for any requests that need to be retried, and re-injects them back into curl. Care must be taken in the download loops to track which requests are currently in flight *plus* which requests are waiting to be retried. This also adds jitter to the retry times so that multiple failed requests don't all flood the server when they are retried. This is a very primitive form of jitter which suffers from clumping, but I think it should be fine. The max number of retries is raised to 3 in order to bring the total retry time to around 10 seconds. This is intended to address Cloudfront's default negative TTL of 10 seconds. The retry schedule looks like 0.5seconds ± 1 second, then 3.5 seconds then 6.5 seconds, for a total of 10.5 to 11.5 seconds. If the user configures additional retries, each attempt afterwards has a max delay of 10 seconds. The retry times are currently not user-configurable. This could potentially be added in the future, but I don't think is particularly necessary for now. There is an unfortunate amount of duplication between the crate download and HTTP index code. I think in the near future we should work on consolidating that code. That might be challenging, since their use patterns are different, but I think it is feasible.
2 parents e589a5f + 4fdea65 commit 0e474cf

File tree

12 files changed

+585
-136
lines changed

12 files changed

+585
-136
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ os_info = "3.5.0"
5858
pasetors = { version = "0.6.4", features = ["v3", "paserk", "std", "serde"] }
5959
pathdiff = "0.2"
6060
pretty_env_logger = { version = "0.4", optional = true }
61+
rand = "0.8.5"
6162
rustfix = "0.6.0"
6263
semver = { version = "1.0.3", features = ["serde"] }
6364
serde = { version = "1.0.123", features = ["derive"] }

crates/cargo-test-support/src/registry.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub struct RegistryBuilder {
9797
/// Write the registry in configuration.
9898
configure_registry: bool,
9999
/// API responders.
100-
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
100+
custom_responders: HashMap<String, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
101101
/// If nonzero, the git index update to be delayed by the given number of seconds.
102102
delayed_index_update: usize,
103103
}
@@ -167,10 +167,11 @@ impl RegistryBuilder {
167167
#[must_use]
168168
pub fn add_responder<R: 'static + Send + Fn(&Request, &HttpServer) -> Response>(
169169
mut self,
170-
url: &'static str,
170+
url: impl Into<String>,
171171
responder: R,
172172
) -> Self {
173-
self.custom_responders.insert(url, Box::new(responder));
173+
self.custom_responders
174+
.insert(url.into(), Box::new(responder));
174175
self
175176
}
176177

@@ -601,7 +602,7 @@ pub struct HttpServer {
601602
addr: SocketAddr,
602603
token: Token,
603604
auth_required: bool,
604-
custom_responders: HashMap<&'static str, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
605+
custom_responders: HashMap<String, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
605606
delayed_index_update: usize,
606607
}
607608

@@ -621,10 +622,7 @@ impl HttpServer {
621622
api_path: PathBuf,
622623
token: Token,
623624
auth_required: bool,
624-
api_responders: HashMap<
625-
&'static str,
626-
Box<dyn Send + Fn(&Request, &HttpServer) -> Response>,
627-
>,
625+
api_responders: HashMap<String, Box<dyn Send + Fn(&Request, &HttpServer) -> Response>>,
628626
delayed_index_update: usize,
629627
) -> HttpServerHandle {
630628
let listener = TcpListener::bind("127.0.0.1:0").unwrap();

src/cargo/core/package.rs

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ use crate::ops;
2828
use crate::util::config::PackageCacheLock;
2929
use crate::util::errors::{CargoResult, HttpNotSuccessful};
3030
use crate::util::interning::InternedString;
31-
use crate::util::network::Retry;
31+
use crate::util::network::retry::{Retry, RetryResult};
32+
use crate::util::network::sleep::SleepTracker;
3233
use crate::util::{self, internal, Config, Progress, ProgressStyle};
3334

3435
pub const MANIFEST_PREAMBLE: &str = "\
@@ -319,6 +320,8 @@ pub struct Downloads<'a, 'cfg> {
319320
/// Set of packages currently being downloaded. This should stay in sync
320321
/// with `pending`.
321322
pending_ids: HashSet<PackageId>,
323+
/// Downloads that have failed and are waiting to retry again later.
324+
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
322325
/// The final result of each download. A pair `(token, result)`. This is a
323326
/// temporary holding area, needed because curl can report multiple
324327
/// downloads at once, but the main loop (`wait`) is written to only
@@ -442,6 +445,7 @@ impl<'cfg> PackageSet<'cfg> {
442445
next: 0,
443446
pending: HashMap::new(),
444447
pending_ids: HashSet::new(),
448+
sleeping: SleepTracker::new(),
445449
results: Vec::new(),
446450
progress: RefCell::new(Some(Progress::with_style(
447451
"Downloading",
@@ -800,7 +804,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
800804

801805
/// Returns the number of crates that are still downloading.
802806
pub fn remaining(&self) -> usize {
803-
self.pending.len()
807+
self.pending.len() + self.sleeping.len()
804808
}
805809

806810
/// Blocks the current thread waiting for a package to finish downloading.
@@ -831,51 +835,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
831835
let ret = {
832836
let timed_out = &dl.timed_out;
833837
let url = &dl.url;
834-
dl.retry
835-
.r#try(|| {
836-
if let Err(e) = result {
837-
// If this error is "aborted by callback" then that's
838-
// probably because our progress callback aborted due to
839-
// a timeout. We'll find out by looking at the
840-
// `timed_out` field, looking for a descriptive message.
841-
// If one is found we switch the error code (to ensure
842-
// it's flagged as spurious) and then attach our extra
843-
// information to the error.
844-
if !e.is_aborted_by_callback() {
845-
return Err(e.into());
846-
}
838+
dl.retry.r#try(|| {
839+
if let Err(e) = result {
840+
// If this error is "aborted by callback" then that's
841+
// probably because our progress callback aborted due to
842+
// a timeout. We'll find out by looking at the
843+
// `timed_out` field, looking for a descriptive message.
844+
// If one is found we switch the error code (to ensure
845+
// it's flagged as spurious) and then attach our extra
846+
// information to the error.
847+
if !e.is_aborted_by_callback() {
848+
return Err(e.into());
849+
}
847850

848-
return Err(match timed_out.replace(None) {
849-
Some(msg) => {
850-
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
851-
let mut err = curl::Error::new(code);
852-
err.set_extra(msg);
853-
err
854-
}
855-
None => e,
851+
return Err(match timed_out.replace(None) {
852+
Some(msg) => {
853+
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
854+
let mut err = curl::Error::new(code);
855+
err.set_extra(msg);
856+
err
856857
}
857-
.into());
858+
None => e,
858859
}
860+
.into());
861+
}
859862

860-
let code = handle.response_code()?;
861-
if code != 200 && code != 0 {
862-
let url = handle.effective_url()?.unwrap_or(url);
863-
return Err(HttpNotSuccessful {
864-
code,
865-
url: url.to_string(),
866-
body: data,
867-
}
868-
.into());
863+
let code = handle.response_code()?;
864+
if code != 200 && code != 0 {
865+
let url = handle.effective_url()?.unwrap_or(url);
866+
return Err(HttpNotSuccessful {
867+
code,
868+
url: url.to_string(),
869+
body: data,
869870
}
870-
Ok(data)
871-
})
872-
.with_context(|| format!("failed to download from `{}`", dl.url))?
871+
.into());
872+
}
873+
Ok(data)
874+
})
873875
};
874876
match ret {
875-
Some(data) => break (dl, data),
876-
None => {
877-
self.pending_ids.insert(dl.id);
878-
self.enqueue(dl, handle)?
877+
RetryResult::Success(data) => break (dl, data),
878+
RetryResult::Err(e) => {
879+
return Err(e.context(format!("failed to download from `{}`", dl.url)))
880+
}
881+
RetryResult::Retry(sleep) => {
882+
debug!("download retry {} for {sleep}ms", dl.url);
883+
self.sleeping.push(sleep, (dl, handle));
879884
}
880885
}
881886
};
@@ -963,6 +968,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
963968
// actually block waiting for I/O to happen, which we achieve with the
964969
// `wait` method on `multi`.
965970
loop {
971+
self.add_sleepers()?;
966972
let n = tls::set(self, || {
967973
self.set
968974
.multi
@@ -985,17 +991,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
985991
if let Some(pair) = results.pop() {
986992
break Ok(pair);
987993
}
988-
assert!(!self.pending.is_empty());
989-
let min_timeout = Duration::new(1, 0);
990-
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
991-
let timeout = timeout.min(min_timeout);
992-
self.set
993-
.multi
994-
.wait(&mut [], timeout)
995-
.with_context(|| "failed to wait on curl `Multi`")?;
994+
assert_ne!(self.remaining(), 0);
995+
if self.pending.is_empty() {
996+
let delay = self.sleeping.time_to_next().unwrap();
997+
debug!("sleeping main thread for {delay:?}");
998+
std::thread::sleep(delay);
999+
} else {
1000+
let min_timeout = Duration::new(1, 0);
1001+
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1002+
let timeout = timeout.min(min_timeout);
1003+
self.set
1004+
.multi
1005+
.wait(&mut [], timeout)
1006+
.with_context(|| "failed to wait on curl `Multi`")?;
1007+
}
9961008
}
9971009
}
9981010

1011+
fn add_sleepers(&mut self) -> CargoResult<()> {
1012+
for (dl, handle) in self.sleeping.to_retry() {
1013+
self.pending_ids.insert(dl.id);
1014+
self.enqueue(dl, handle)?;
1015+
}
1016+
Ok(())
1017+
}
1018+
9991019
fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
10001020
let dl = &self.pending[&token].0;
10011021
dl.total.set(total);
@@ -1061,7 +1081,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
10611081
return Ok(());
10621082
}
10631083
}
1064-
let pending = self.pending.len();
1084+
let pending = self.remaining();
10651085
let mut msg = if pending == 1 {
10661086
format!("{} crate", pending)
10671087
} else {

src/cargo/sources/git/oxide.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub fn with_retry_and_progress(
2929
) -> CargoResult<()> {
3030
std::thread::scope(|s| {
3131
let mut progress_bar = Progress::new("Fetch", config);
32-
network::with_retry(config, || {
32+
network::retry::with_retry(config, || {
3333
let progress_root: Arc<gix::progress::tree::Root> =
3434
gix::progress::tree::root::Options {
3535
initial_capacity: 10,

src/cargo/sources/git/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,7 @@ pub fn with_fetch_options(
739739
let ssh_config = config.net_config()?.ssh.as_ref();
740740
let config_known_hosts = ssh_config.and_then(|ssh| ssh.known_hosts.as_ref());
741741
let diagnostic_home_config = config.diagnostic_home_config();
742-
network::with_retry(config, || {
742+
network::retry::with_retry(config, || {
743743
with_authentication(config, url, git_config, |f| {
744744
let port = Url::parse(url).ok().and_then(|url| url.port());
745745
let mut last_update = Instant::now();

0 commit comments

Comments
 (0)