Skip to content

Commit 19e271d

Browse files
committed
Do not load the entire artifact in memory when uploading (#618)
1 parent bc16255 commit 19e271d

File tree

4 files changed

+425
-37
lines changed

4 files changed

+425
-37
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ tar = "0.4.40"
3737
tempfile = "3.10.0"
3838
text-stub-library = "0.9.0"
3939
tokio = "1.43.1"
40+
tokio-util = "0.7.13"
4041
url = "2.5.0"
4142
version-compare = "0.1.1"
4243
zip = "0.6.6"

src/github.rs

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ use {
1717
},
1818
rayon::prelude::*,
1919
reqwest::{Client, StatusCode},
20-
reqwest_middleware::{self, ClientWithMiddleware},
2120
reqwest_retry::{
22-
default_on_request_failure, policies::ExponentialBackoff, RetryTransientMiddleware,
23-
Retryable, RetryableStrategy,
21+
default_on_request_failure, policies::ExponentialBackoff, RetryPolicy, Retryable,
22+
RetryableStrategy,
2423
},
2524
sha2::{Digest, Sha256},
2625
std::{
2726
collections::{BTreeMap, BTreeSet, HashMap},
2827
io::Read,
2928
path::PathBuf,
3029
str::FromStr,
30+
time::{Duration, SystemTime},
3131
},
3232
url::Url,
3333
zip::ZipArchive,
@@ -65,12 +65,19 @@ async fn fetch_artifact(
6565
Ok(res)
6666
}
6767

68+
enum UploadSource {
69+
Filename(PathBuf),
70+
Data(Bytes),
71+
}
72+
6873
async fn upload_release_artifact(
69-
client: &ClientWithMiddleware,
74+
client: &Client,
75+
retry_policy: &impl RetryPolicy,
76+
retryable_strategy: &impl RetryableStrategy,
7077
auth_token: String,
7178
release: &Release,
7279
filename: String,
73-
data: Bytes,
80+
body: UploadSource,
7481
dry_run: bool,
7582
) -> Result<()> {
7683
if release.assets.iter().any(|asset| asset.name == filename) {
@@ -93,17 +100,52 @@ async fn upload_release_artifact(
93100
return Ok(());
94101
}
95102

96-
// Octocrab doesn't yet support release artifact upload. And the low-level HTTP API
97-
// forces the use of strings on us. So we have to make our own HTTP client.
98-
99-
let response = client
100-
.put(url)
101-
.header("Authorization", format!("Bearer {auth_token}"))
102-
.header("Content-Length", data.len())
103-
.header("Content-Type", "application/x-tar")
104-
.body(data)
105-
.send()
106-
.await?;
103+
// Octocrab's high-level API for uploading release artifacts doesn't yet support streaming
104+
// bodies, and their low-level API isn't more helpful than using our own HTTP client.
105+
//
106+
// Because we are streaming the body, we can't use the standard retry middleware for reqwest
107+
// (see e.g. https://github.com/seanmonstar/reqwest/issues/2416), so we have to recreate the
108+
// request on each retry and handle the retry logic ourself. This logic is inspired by
109+
// uv/crates/uv-publish/src/lib.rs (which has the same problem), which in turn is inspired by
110+
// reqwest-middleware/reqwest-retry/src/middleware.rs.
111+
//
112+
// (While Octocrab's API would work fine for the non-streaming case, we just use this function
113+
// for both cases so that we can make a homogeneous Vec<impl Future> later in the file.)
114+
115+
let mut n_past_retries = 0;
116+
let start_time = SystemTime::now();
117+
let response = loop {
118+
let request = client
119+
.put(url.clone())
120+
.timeout(Duration::from_secs(60))
121+
.header("Authorization", format!("Bearer {auth_token}"))
122+
.header("Content-Type", "application/octet-stream");
123+
let request = match body {
124+
UploadSource::Filename(ref path) => {
125+
let file = tokio::fs::File::open(&path).await?;
126+
let len = file.metadata().await?.len();
127+
request.header("Content-Length", len).body(file)
128+
}
129+
UploadSource::Data(ref bytes) => request
130+
.header("Content-Length", bytes.len())
131+
.body(bytes.clone()),
132+
};
133+
let result = request.send().await.map_err(|e| e.into());
134+
135+
if retryable_strategy.handle(&result) == Some(Retryable::Transient) {
136+
let retry_decision = retry_policy.should_retry(start_time, n_past_retries);
137+
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
138+
println!("retrying upload to {url} after {result:?}");
139+
let duration = execute_after
140+
.duration_since(SystemTime::now())
141+
.unwrap_or_else(|_| Duration::default());
142+
tokio::time::sleep(duration).await;
143+
n_past_retries += 1;
144+
continue;
145+
}
146+
}
147+
break result?;
148+
};
107149

108150
if !response.status().is_success() {
109151
return Err(anyhow!("HTTP {}", response.status()));
@@ -215,10 +257,8 @@ pub async fn command_fetch_release_distributions(args: &ArgMatches) -> Result<()
215257
.await?;
216258

217259
for artifact in artifacts {
218-
if matches!(
219-
artifact.name.as_str(),
220-
"pythonbuild" | "toolchain"
221-
) || artifact.name.contains("install-only")
260+
if matches!(artifact.name.as_str(), "pythonbuild" | "toolchain")
261+
|| artifact.name.contains("install-only")
222262
{
223263
continue;
224264
}
@@ -475,12 +515,7 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
475515
let mut digests = BTreeMap::new();
476516

477517
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
478-
let raw_client = reqwest_middleware::ClientBuilder::new(Client::new())
479-
.with(RetryTransientMiddleware::new_with_policy_and_strategy(
480-
retry_policy,
481-
GitHubUploadRetryStrategy,
482-
))
483-
.build();
518+
let raw_client = Client::new();
484519

485520
{
486521
let mut fs = vec![];
@@ -490,23 +525,31 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
490525
continue;
491526
}
492527

493-
let file_data = Bytes::copy_from_slice(&std::fs::read(dist_dir.join(&source))?);
494-
495-
let mut digest = Sha256::new();
496-
digest.update(&file_data);
497-
498-
let digest = hex::encode(digest.finalize());
499-
500-
digests.insert(dest.clone(), digest.clone());
501-
528+
let local_filename = dist_dir.join(&source);
502529
fs.push(upload_release_artifact(
503530
&raw_client,
531+
&retry_policy,
532+
&GitHubUploadRetryStrategy,
504533
token.clone(),
505534
&release,
506535
dest.clone(),
507-
file_data,
536+
UploadSource::Filename(local_filename.clone()),
508537
dry_run,
509538
));
539+
540+
// reqwest wants to take ownership of the body, so it's hard for us to do anything
541+
// clever with reading the file once and calculating the sha256sum while we read.
542+
// So we open and read the file again.
543+
let digest = {
544+
let file = tokio::fs::File::open(local_filename).await?;
545+
let mut stream = tokio_util::io::ReaderStream::with_capacity(file, 1048576);
546+
let mut hasher = Sha256::new();
547+
while let Some(chunk) = stream.next().await {
548+
hasher.update(&chunk?);
549+
}
550+
hex::encode(hasher.finalize())
551+
};
552+
digests.insert(dest.clone(), digest.clone());
510553
}
511554

512555
let mut buffered = futures::stream::iter(fs).buffer_unordered(16);
@@ -526,10 +569,12 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
526569

527570
upload_release_artifact(
528571
&raw_client,
572+
&retry_policy,
573+
&GitHubUploadRetryStrategy,
529574
token.clone(),
530575
&release,
531576
"SHA256SUMS".to_string(),
532-
Bytes::copy_from_slice(shasums.as_bytes()),
577+
UploadSource::Data(Bytes::copy_from_slice(shasums.as_bytes())),
533578
dry_run,
534579
)
535580
.await?;

0 commit comments

Comments
 (0)