Skip to content

Commit 3ea629d

Browse files
committed
Auto merge of #613 - Mark-Simulacrum:opt, r=Mark-Simulacrum
Reduce memory pressure when writing out log archives The S3 archive writing was forced to keep two copies of the archive in memory, which is updated to just one archive by this commit. We may wish to incrementally write out the report in the future, but that will take a good bit more work.
2 parents 703374f + 9a85380 commit 3ea629d

File tree

3 files changed

+200
-58
lines changed

3 files changed

+200
-58
lines changed

src/report/archives.rs

Lines changed: 132 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::config::Config;
22
use crate::crates::Crate;
33
use crate::experiments::Experiment;
44
use crate::prelude::*;
5-
use crate::report::{compare, ReportWriter};
5+
use crate::report::{compare, Comparison, ReportWriter};
66
use crate::results::{EncodedLog, EncodingType, ReadResults};
77
use flate2::{write::GzEncoder, Compression};
88
use indexmap::IndexMap;
@@ -14,77 +14,151 @@ pub struct Archive {
1414
path: String,
1515
}
1616

17-
pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
17+
struct LogEntry {
18+
path: String,
19+
comparison: Comparison,
20+
log_bytes: Vec<u8>,
21+
}
22+
23+
impl LogEntry {
24+
fn header(&self) -> TarHeader {
25+
let mut header = TarHeader::new_gnu();
26+
header.set_size(self.log_bytes.len() as u64);
27+
header.set_mode(0o644);
28+
header.set_cksum();
29+
header
30+
}
31+
}
32+
33+
fn iterate<'a, DB: ReadResults + 'a>(
34+
db: &'a DB,
35+
ex: &'a Experiment,
36+
crates: &'a [Crate],
37+
config: &'a Config,
38+
) -> impl Iterator<Item = Fallible<LogEntry>> + 'a {
39+
let mut iter = crates
40+
.iter()
41+
.filter(move |krate| !config.should_skip(krate))
42+
.map(move |krate| -> Fallible<Vec<LogEntry>> {
43+
let res1 = db.load_test_result(ex, &ex.toolchains[0], krate)?;
44+
let res2 = db.load_test_result(ex, &ex.toolchains[1], krate)?;
45+
let comparison = compare(config, krate, res1.as_ref(), res2.as_ref());
46+
47+
ex.toolchains
48+
.iter()
49+
.filter_map(move |tc| {
50+
let log = db
51+
.load_log(ex, tc, krate)
52+
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
53+
.with_context(|_| format!("failed to read log of {} on {}", krate, tc));
54+
55+
let log_bytes: EncodedLog = match log {
56+
Ok(l) => l,
57+
Err(e) => {
58+
crate::utils::report_failure(&e);
59+
return None;
60+
}
61+
};
62+
63+
let log_bytes = match log_bytes.to_plain() {
64+
Ok(it) => it,
65+
Err(err) => return Some(Err(err)),
66+
};
67+
68+
let path = format!(
69+
"{}/{}/{}.txt",
70+
comparison,
71+
krate.id(),
72+
tc.to_path_component(),
73+
);
74+
Some(Ok(LogEntry {
75+
path,
76+
comparison,
77+
log_bytes,
78+
}))
79+
})
80+
.collect()
81+
});
82+
83+
let mut in_progress = vec![].into_iter();
84+
std::iter::from_fn(move || loop {
85+
if let Some(next) = in_progress.next() {
86+
return Some(Ok(next));
87+
}
88+
match iter.next()? {
89+
Ok(list) => in_progress = list.into_iter(),
90+
Err(err) => return Some(Err(err)),
91+
}
92+
})
93+
}
94+
95+
fn write_all_archive<DB: ReadResults, W: ReportWriter>(
1896
db: &DB,
1997
ex: &Experiment,
2098
crates: &[Crate],
2199
dest: &W,
22100
config: &Config,
23-
) -> Fallible<Vec<Archive>> {
24-
let mut archives = Vec::new();
25-
let mut all = TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()));
26-
let mut by_comparison = IndexMap::new();
27-
28-
for krate in crates {
29-
if config.should_skip(krate) {
30-
continue;
101+
) -> Fallible<Archive> {
102+
for i in 1..=RETRIES {
103+
let mut all = TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()));
104+
for entry in iterate(db, ex, crates, config) {
105+
let entry = entry?;
106+
let mut header = entry.header();
107+
all.append_data(&mut header, &entry.path, &entry.log_bytes[..])?;
31108
}
32109

33-
let res1 = db.load_test_result(ex, &ex.toolchains[0], krate)?;
34-
let res2 = db.load_test_result(ex, &ex.toolchains[1], krate)?;
35-
let comparison = compare(config, krate, res1.as_ref(), res2.as_ref());
36-
37-
for tc in &ex.toolchains {
38-
let log = db
39-
.load_log(ex, tc, krate)
40-
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
41-
.with_context(|_| format!("failed to read log of {} on {}", krate, tc));
42-
43-
let log_bytes: EncodedLog = match log {
44-
Ok(l) => l,
45-
Err(e) => {
46-
crate::utils::report_failure(&e);
110+
let data = all.into_inner()?.finish()?;
111+
let len = data.len();
112+
match dest.write_bytes_once(
113+
"logs-archives/all.tar.gz",
114+
data,
115+
&"application/gzip".parse().unwrap(),
116+
EncodingType::Plain,
117+
) {
118+
Ok(()) => break,
119+
Err(e) => {
120+
if i == RETRIES {
121+
return Err(e);
122+
} else {
123+
std::thread::sleep(std::time::Duration::from_secs(2));
124+
warn!(
125+
"retry ({}/{}) writing logs-archives/all.tar.gz ({} bytes)",
126+
i, RETRIES, len,
127+
);
47128
continue;
48129
}
49-
};
50-
51-
let log_bytes = log_bytes.to_plain()?;
52-
let log_bytes = log_bytes.as_slice();
53-
54-
let path = format!(
55-
"{}/{}/{}.txt",
56-
comparison,
57-
krate.id(),
58-
tc.to_path_component(),
59-
);
60-
61-
let mut header = TarHeader::new_gnu();
62-
header.set_size(log_bytes.len() as u64);
63-
header.set_mode(0o644);
64-
header.set_cksum();
65-
66-
all.append_data(&mut header, &path, log_bytes)?;
67-
by_comparison
68-
.entry(comparison)
69-
.or_insert_with(|| {
70-
TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()))
71-
})
72-
.append_data(&mut header, &path, log_bytes)?;
130+
}
73131
}
74132
}
75133

76-
let data = all.into_inner()?.finish()?;
77-
dest.write_bytes(
78-
"logs-archives/all.tar.gz",
79-
data,
80-
&"application/gzip".parse().unwrap(),
81-
EncodingType::Plain,
82-
)?;
83-
84-
archives.push(Archive {
134+
Ok(Archive {
85135
name: "All the crates".to_string(),
86136
path: "logs-archives/all.tar.gz".to_string(),
87-
});
137+
})
138+
}
139+
140+
const RETRIES: usize = 4;
141+
142+
pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
143+
db: &DB,
144+
ex: &Experiment,
145+
crates: &[Crate],
146+
dest: &W,
147+
config: &Config,
148+
) -> Fallible<Vec<Archive>> {
149+
let mut archives = Vec::new();
150+
let mut by_comparison = IndexMap::new();
151+
152+
archives.push(write_all_archive(db, ex, crates, dest, config)?);
153+
154+
for entry in iterate(db, ex, crates, config) {
155+
let entry = entry?;
156+
157+
by_comparison
158+
.entry(entry.comparison)
159+
.or_insert_with(|| TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default())))
160+
.append_data(&mut entry.header(), &entry.path, &entry.log_bytes[..])?;
161+
}
88162

89163
for (comparison, archive) in by_comparison.drain(..) {
90164
let data = archive.into_inner()?.finish()?;

src/report/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,17 @@ pub trait ReportWriter {
512512
mime: &Mime,
513513
encoding_type: EncodingType,
514514
) -> Fallible<()>;
515+
// This doesn't retry writing -- lets the caller handle failures.
516+
//
517+
// This is done to avoid copying the bytes buffer, since rusoto at least
518+
// currently requires ownership.
519+
fn write_bytes_once<P: AsRef<Path>>(
520+
&self,
521+
path: P,
522+
b: Vec<u8>,
523+
mime: &Mime,
524+
encoding_type: EncodingType,
525+
) -> Fallible<()>;
515526
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()>;
516527
fn copy<P: AsRef<Path>, R: Read>(&self, r: &mut R, path: P, mime: &Mime) -> Fallible<()>;
517528
}
@@ -544,6 +555,16 @@ impl ReportWriter for FileWriter {
544555
Ok(())
545556
}
546557

558+
fn write_bytes_once<P: AsRef<Path>>(
559+
&self,
560+
path: P,
561+
b: Vec<u8>,
562+
mime: &Mime,
563+
encoding: EncodingType,
564+
) -> Fallible<()> {
565+
self.write_bytes(path, b, mime, encoding)
566+
}
567+
547568
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, _: &Mime) -> Fallible<()> {
548569
self.create_prefix(path.as_ref())?;
549570
fs::write(&self.0.join(path.as_ref()), s.as_ref().as_bytes())?;
@@ -595,6 +616,19 @@ impl ReportWriter for DummyWriter {
595616
Ok(())
596617
}
597618

619+
fn write_bytes_once<P: AsRef<Path>>(
620+
&self,
621+
path: P,
622+
b: Vec<u8>,
623+
mime: &Mime,
624+
_: EncodingType,
625+
) -> Fallible<()> {
626+
self.results
627+
.borrow_mut()
628+
.insert((path.as_ref().to_path_buf(), mime.clone()), b);
629+
Ok(())
630+
}
631+
598632
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
599633
self.results.borrow_mut().insert(
600634
(path.as_ref().to_path_buf(), mime.clone()),

src/report/s3.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,40 @@ impl ReportWriter for S3Writer {
144144
}
145145
}
146146

147+
fn write_bytes_once<P: AsRef<Path>>(
148+
&self,
149+
path: P,
150+
s: Vec<u8>,
151+
mime: &Mime,
152+
encoding_type: EncodingType,
153+
) -> Fallible<()> {
154+
let req = PutObjectRequest {
155+
acl: Some("public-read".into()),
156+
body: Some(s.into()),
157+
bucket: self.prefix.bucket.clone(),
158+
key: self
159+
.prefix
160+
.prefix
161+
.join(path.as_ref())
162+
.to_string_lossy()
163+
.into(),
164+
content_type: Some(mime.to_string()),
165+
content_encoding: match encoding_type {
166+
EncodingType::Plain => None,
167+
EncodingType::Gzip => Some("gzip".into()),
168+
},
169+
..Default::default()
170+
};
171+
let r = self.client.put_object(req).sync();
172+
if let Err(::rusoto_s3::PutObjectError::Unknown(ref resp)) = r {
173+
error!("S3 request status: {}", resp.status);
174+
error!("S3 request body: {}", String::from_utf8_lossy(&resp.body));
175+
error!("S3 request headers: {:?}", resp.headers);
176+
}
177+
r.with_context(|_| format!("S3 failure to upload {:?}", path.as_ref()))?;
178+
Ok(())
179+
}
180+
147181
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
148182
self.write_bytes(path, s.into_owned().into_bytes(), mime, EncodingType::Plain)
149183
}

0 commit comments

Comments
 (0)