Skip to content

Commit 0b14bb1

Browse files
Upload to S3 in parallel with database writing
1 parent 1854295 commit 0b14bb1

File tree

1 file changed

+100
-83
lines changed

1 file changed

+100
-83
lines changed

collector/src/execute.rs

Lines changed: 100 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,6 @@ use std::time::Duration;
1919
use tempfile::TempDir;
2020
use tokio::runtime::Runtime;
2121

22-
fn to_s3(local: &Path, remote: &Path) -> anyhow::Result<()> {
23-
let start = std::time::Instant::now();
24-
let status = Command::new("aws")
25-
.arg("s3")
26-
.arg("cp")
27-
.arg("--only-show-errors")
28-
.arg(local)
29-
.arg(&format!("s3://rustc-perf/{}", remote.to_str().unwrap()))
30-
.status()
31-
.with_context(|| {
32-
format!(
33-
"upload {:?} to s3://rustc-perf/{}",
34-
local,
35-
remote.to_str().unwrap()
36-
)
37-
})?;
38-
if !status.success() {
39-
anyhow::bail!(
40-
"upload {:?} to s3://rustc-perf/{}: {:?}",
41-
local,
42-
remote.to_str().unwrap(),
43-
status
44-
);
45-
}
46-
log::trace!("uploaded {:?} to S3 in {:?}", local, start.elapsed());
47-
Ok(())
48-
}
49-
5022
fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> anyhow::Result<()> {
5123
let (from, to) = (from.as_ref(), to.as_ref());
5224
if fs::rename(from, to).is_err() {
@@ -527,6 +499,7 @@ pub struct MeasureProcessor<'a> {
527499
krate: &'a BenchmarkName,
528500
conn: &'a mut dyn database::Connection,
529501
cid: database::ArtifactIdNumber,
502+
upload: Option<Upload>,
530503
is_first_collection: bool,
531504
self_profile: bool,
532505
tries: u8,
@@ -546,6 +519,7 @@ impl<'a> MeasureProcessor<'a> {
546519

547520
MeasureProcessor {
548521
rt,
522+
upload: None,
549523
conn,
550524
krate,
551525
cid,
@@ -581,6 +555,36 @@ impl<'a> MeasureProcessor<'a> {
581555
BuildKind::Doc => database::Profile::Doc,
582556
BuildKind::Opt => database::Profile::Opt,
583557
};
558+
559+
if let Some(files) = stats.2 {
560+
if env::var_os("RUSTC_PERF_UPLOAD_TO_S3").is_some() {
561+
// We can afford to have the uploads run concurrently with
562+
// rustc. Generally speaking, they take up almost no CPU time
563+
// (just copying data into the network). Plus, during
564+
// self-profile data timing noise doesn't matter as much. (We'll
565+
// be migrating to instructions soon, hopefully, where the
566+
// upload will cause even less noise). We may also opt at some
567+
// point to defer these uploads entirely to the *end* or
568+
// something like that. For now though this works quite well.
569+
if let Some(u) = self.upload.take() {
570+
u.wait();
571+
}
572+
let prefix = PathBuf::from("self-profile")
573+
.join(self.cid.0.to_string())
574+
.join(self.krate.0.as_str())
575+
.join(profile.to_string())
576+
.join(cache.to_id());
577+
self.upload = Some(Upload::new(prefix, collection, files));
578+
self.rt.block_on(self.conn.record_raw_self_profile(
579+
collection,
580+
self.cid,
581+
self.krate.0.as_str(),
582+
profile,
583+
cache,
584+
));
585+
}
586+
}
587+
584588
let mut buf = FuturesUnordered::new();
585589
for (stat, value) in stats.0.iter() {
586590
buf.push(self.conn.record_statistic(
@@ -617,64 +621,77 @@ impl<'a> MeasureProcessor<'a> {
617621
}
618622
}
619623

620-
if let Some(files) = &stats.2 {
621-
if env::var_os("RUSTC_PERF_UPLOAD_TO_S3").is_some() {
622-
// Files are placed at
623-
// * self-profile/<artifact id>/<krate>/<profile>/<cache>
624-
// /self-profile-<collection-id>.{extension}
625-
let prefix = PathBuf::from("self-profile")
626-
.join(self.cid.0.to_string())
627-
.join(self.krate.0.as_str())
628-
.join(profile.to_string())
629-
.join(cache.to_id());
630-
let tarball = snap::write::FrameEncoder::new(Vec::new());
631-
let mut builder = tar::Builder::new(tarball);
632-
builder.mode(tar::HeaderMode::Deterministic);
633-
634-
let append_file = |builder: &mut tar::Builder<_>,
635-
file: &Path,
636-
name: &str|
637-
-> anyhow::Result<()> {
638-
builder.append_path_with_name(file, name)?;
639-
Ok(())
640-
};
641-
append_file(&mut builder, &files.string_index, "string_index")
642-
.expect("append string index");
643-
append_file(&mut builder, &files.string_data, "string_data")
644-
.expect("append string data");
645-
append_file(&mut builder, &files.events, "events").expect("append events");
646-
647-
let upload = tempfile::NamedTempFile::new()
648-
.context("create temporary file")
649-
.unwrap();
650-
builder.finish().expect("complete tarball");
651-
std::fs::write(
652-
upload.path(),
653-
builder
654-
.into_inner()
655-
.expect("get")
656-
.into_inner()
657-
.expect("snap success"),
658-
)
659-
.expect("wrote tarball");
660-
to_s3(
661-
upload.path(),
662-
&prefix.join(format!("self-profile-{}.tar.sz", collection)),
663-
)
664-
.expect("s3 upload succeeded");
624+
self.rt
625+
.block_on(async move { while let Some(()) = buf.next().await {} });
626+
}
627+
}
665628

666-
self.rt.block_on(self.conn.record_raw_self_profile(
667-
collection,
668-
self.cid,
669-
self.krate.0.as_str(),
670-
profile,
671-
cache,
672-
));
673-
}
629+
struct Upload(std::process::Child, tempfile::NamedTempFile);
630+
631+
impl Upload {
632+
fn new(prefix: PathBuf, collection: database::CollectionId, files: SelfProfileFiles) -> Upload {
633+
// Files are placed at
634+
// * self-profile/<artifact id>/<krate>/<profile>/<cache>
635+
// /self-profile-<collection-id>.{extension}
636+
let tarball = snap::write::FrameEncoder::new(Vec::new());
637+
let mut builder = tar::Builder::new(tarball);
638+
builder.mode(tar::HeaderMode::Deterministic);
639+
640+
let append_file =
641+
|builder: &mut tar::Builder<_>, file: &Path, name: &str| -> anyhow::Result<()> {
642+
builder.append_path_with_name(file, name)?;
643+
Ok(())
644+
};
645+
append_file(
646+
&mut builder,
647+
&files.string_index,
648+
"self-profile.string_index",
649+
)
650+
.expect("append string index");
651+
append_file(&mut builder, &files.string_data, "self-profile.string_data")
652+
.expect("append string data");
653+
append_file(&mut builder, &files.events, "self-profile.events").expect("append events");
654+
655+
let upload = tempfile::NamedTempFile::new()
656+
.context("create temporary file")
657+
.unwrap();
658+
builder.finish().expect("complete tarball");
659+
std::fs::write(
660+
upload.path(),
661+
builder
662+
.into_inner()
663+
.expect("get")
664+
.into_inner()
665+
.expect("snap success"),
666+
)
667+
.expect("wrote tarball");
668+
669+
let child = Command::new("aws")
670+
.arg("s3")
671+
.arg("cp")
672+
.arg("--only-show-errors")
673+
.arg(upload.path())
674+
.arg(&format!(
675+
"s3://rustc-perf/{}",
676+
&prefix
677+
.join(format!("self-profile-{}.tar.sz", collection))
678+
.to_str()
679+
.unwrap()
680+
))
681+
.spawn()
682+
.expect("spawn aws");
683+
684+
Upload(child, upload)
685+
}
686+
687+
fn wait(mut self) {
688+
let start = std::time::Instant::now();
689+
let status = self.0.wait().expect("waiting for child");
690+
if !status.success() {
691+
panic!("S3 upload failed: {:?}", status);
674692
}
675693

676-
self.rt
677-
.block_on(async move { while let Some(()) = buf.next().await {} });
694+
log::trace!("uploaded to S3, additional wait: {:?}", start.elapsed());
678695
}
679696
}
680697

0 commit comments

Comments
 (0)