Skip to content

Commit 2a48044

Browse files
Avoid keeping multiple log archives in memory
1 parent bf29e25 commit 2a48044

File tree

3 files changed

+153
-15
lines changed

3 files changed

+153
-15
lines changed

src/report/archives.rs

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,89 @@ pub struct Archive {
1414
path: String,
1515
}
1616

17+
fn write_all_archive<DB: ReadResults, W: ReportWriter>(
18+
db: &DB,
19+
ex: &Experiment,
20+
crates: &[Crate],
21+
dest: &W,
22+
config: &Config,
23+
) -> Fallible<Archive> {
24+
for i in 1..=RETRIES {
25+
let mut all = TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()));
26+
for krate in crates {
27+
if config.should_skip(krate) {
28+
continue;
29+
}
30+
31+
let res1 = db.load_test_result(ex, &ex.toolchains[0], krate)?;
32+
let res2 = db.load_test_result(ex, &ex.toolchains[1], krate)?;
33+
let comparison = compare(config, krate, res1.as_ref(), res2.as_ref());
34+
35+
for tc in &ex.toolchains {
36+
let log = db
37+
.load_log(ex, tc, krate)
38+
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
39+
.with_context(|_| format!("failed to read log of {} on {}", krate, tc));
40+
41+
let log_bytes: EncodedLog = match log {
42+
Ok(l) => l,
43+
Err(e) => {
44+
crate::utils::report_failure(&e);
45+
continue;
46+
}
47+
};
48+
49+
let log_bytes = log_bytes.to_plain()?;
50+
let log_bytes = log_bytes.as_slice();
51+
52+
let path = format!(
53+
"{}/{}/{}.txt",
54+
comparison,
55+
krate.id(),
56+
tc.to_path_component(),
57+
);
58+
59+
let mut header = TarHeader::new_gnu();
60+
header.set_size(log_bytes.len() as u64);
61+
header.set_mode(0o644);
62+
header.set_cksum();
63+
64+
all.append_data(&mut header, &path, log_bytes)?;
65+
}
66+
}
67+
68+
let data = all.into_inner()?.finish()?;
69+
let len = data.len();
70+
match dest.write_bytes_once(
71+
"logs-archives/all.tar.gz",
72+
data,
73+
&"application/gzip".parse().unwrap(),
74+
EncodingType::Plain,
75+
) {
76+
Ok(()) => break,
77+
Err(e) => {
78+
if i == RETRIES {
79+
return Err(e);
80+
} else {
81+
std::thread::sleep(std::time::Duration::from_secs(2));
82+
warn!(
83+
"retry ({}/{}) writing logs-archives/all.tar.gz ({} bytes)",
84+
i, RETRIES, len,
85+
);
86+
continue;
87+
}
88+
}
89+
}
90+
}
91+
92+
Ok(Archive {
93+
name: "All the crates".to_string(),
94+
path: "logs-archives/all.tar.gz".to_string(),
95+
})
96+
}
97+
98+
const RETRIES: usize = 4;
99+
17100
pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
18101
db: &DB,
19102
ex: &Experiment,
@@ -22,9 +105,10 @@ pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
22105
config: &Config,
23106
) -> Fallible<Vec<Archive>> {
24107
let mut archives = Vec::new();
25-
let mut all = TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()));
26108
let mut by_comparison = IndexMap::new();
27109

110+
archives.push(write_all_archive(db, ex, crates, dest, config)?);
111+
28112
for krate in crates {
29113
if config.should_skip(krate) {
30114
continue;
@@ -63,7 +147,6 @@ pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
63147
header.set_mode(0o644);
64148
header.set_cksum();
65149

66-
all.append_data(&mut header, &path, log_bytes)?;
67150
by_comparison
68151
.entry(comparison)
69152
.or_insert_with(|| {
@@ -73,19 +156,6 @@ pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
73156
}
74157
}
75158

76-
let data = all.into_inner()?.finish()?;
77-
dest.write_bytes(
78-
"logs-archives/all.tar.gz",
79-
data,
80-
&"application/gzip".parse().unwrap(),
81-
EncodingType::Plain,
82-
)?;
83-
84-
archives.push(Archive {
85-
name: "All the crates".to_string(),
86-
path: "logs-archives/all.tar.gz".to_string(),
87-
});
88-
89159
for (comparison, archive) in by_comparison.drain(..) {
90160
let data = archive.into_inner()?.finish()?;
91161
dest.write_bytes(

src/report/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,17 @@ pub trait ReportWriter {
512512
mime: &Mime,
513513
encoding_type: EncodingType,
514514
) -> Fallible<()>;
515+
// This doesn't retry writing -- lets the caller handle failures.
516+
//
517+
// This is done to avoid copying the bytes buffer, since rusoto at least
518+
// currently requires ownership.
519+
fn write_bytes_once<P: AsRef<Path>>(
520+
&self,
521+
path: P,
522+
b: Vec<u8>,
523+
mime: &Mime,
524+
encoding_type: EncodingType,
525+
) -> Fallible<()>;
515526
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()>;
516527
fn copy<P: AsRef<Path>, R: Read>(&self, r: &mut R, path: P, mime: &Mime) -> Fallible<()>;
517528
}
@@ -544,6 +555,16 @@ impl ReportWriter for FileWriter {
544555
Ok(())
545556
}
546557

558+
fn write_bytes_once<P: AsRef<Path>>(
559+
&self,
560+
path: P,
561+
b: Vec<u8>,
562+
mime: &Mime,
563+
encoding: EncodingType,
564+
) -> Fallible<()> {
565+
self.write_bytes(path, b, mime, encoding)
566+
}
567+
547568
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, _: &Mime) -> Fallible<()> {
548569
self.create_prefix(path.as_ref())?;
549570
fs::write(&self.0.join(path.as_ref()), s.as_ref().as_bytes())?;
@@ -595,6 +616,19 @@ impl ReportWriter for DummyWriter {
595616
Ok(())
596617
}
597618

619+
fn write_bytes_once<P: AsRef<Path>>(
620+
&self,
621+
path: P,
622+
b: Vec<u8>,
623+
mime: &Mime,
624+
_: EncodingType,
625+
) -> Fallible<()> {
626+
self.results
627+
.borrow_mut()
628+
.insert((path.as_ref().to_path_buf(), mime.clone()), b);
629+
Ok(())
630+
}
631+
598632
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
599633
self.results.borrow_mut().insert(
600634
(path.as_ref().to_path_buf(), mime.clone()),

src/report/s3.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,40 @@ impl ReportWriter for S3Writer {
144144
}
145145
}
146146

147+
fn write_bytes_once<P: AsRef<Path>>(
148+
&self,
149+
path: P,
150+
s: Vec<u8>,
151+
mime: &Mime,
152+
encoding_type: EncodingType,
153+
) -> Fallible<()> {
154+
let req = PutObjectRequest {
155+
acl: Some("public-read".into()),
156+
body: Some(s.into()),
157+
bucket: self.prefix.bucket.clone(),
158+
key: self
159+
.prefix
160+
.prefix
161+
.join(path.as_ref())
162+
.to_string_lossy()
163+
.into(),
164+
content_type: Some(mime.to_string()),
165+
content_encoding: match encoding_type {
166+
EncodingType::Plain => None,
167+
EncodingType::Gzip => Some("gzip".into()),
168+
},
169+
..Default::default()
170+
};
171+
let r = self.client.put_object(req).sync();
172+
if let Err(::rusoto_s3::PutObjectError::Unknown(ref resp)) = r {
173+
error!("S3 request status: {}", resp.status);
174+
error!("S3 request body: {}", String::from_utf8_lossy(&resp.body));
175+
error!("S3 request headers: {:?}", resp.headers);
176+
}
177+
r.with_context(|_| format!("S3 failure to upload {:?}", path.as_ref()))?;
178+
Ok(())
179+
}
180+
147181
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
148182
self.write_bytes(path, s.into_owned().into_bytes(), mime, EncodingType::Plain)
149183
}

0 commit comments

Comments
 (0)