Skip to content

Commit a0cb14c

Browse files
committed
Auto merge of #707 - Mark-Simulacrum:no-state, r=Mark-Simulacrum
Parallelize log uploads This should drastically speed up report generation.
2 parents 00ec081 + c0882fe commit a0cb14c

File tree

1 file changed

+70
-34
lines changed

1 file changed

+70
-34
lines changed

src/report/mod.rs

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ use mime::{self, Mime};
1212
use percent_encoding::{utf8_percent_encode, AsciiSet};
1313
use std::borrow::Cow;
1414
#[cfg(test)]
15-
use std::cell::RefCell;
16-
#[cfg(test)]
1715
use std::collections::HashMap;
1816
use std::convert::AsRef;
1917
use std::fmt::{self, Display};
@@ -281,40 +279,76 @@ fn write_logs<DB: ReadResults, W: ReportWriter>(
281279
) -> Fallible<()> {
282280
let num_crates = crates.len();
283281
let progress_every = (num_crates / PROGRESS_FRACTION) + 1;
284-
for (i, krate) in crates.iter().enumerate() {
285-
if i % progress_every == 0 {
286-
info!("wrote logs for {}/{} crates", i, num_crates)
287-
}
288282

289-
if config.should_skip(krate) {
290-
continue;
283+
let errors = std::sync::Mutex::new(vec![]);
284+
std::thread::scope(|s| {
285+
let mut channels = vec![];
286+
// This isn't really related to the number of cores on the system, since these threads are
287+
// mostly driving network-related traffic. 8 is a reasonable number to not overwhelm
288+
// systems while keeping things moving much faster than fully serial uploads.
289+
for _ in 0..8 {
290+
let (tx, rx) = std::sync::mpsc::sync_channel::<(PathBuf, Vec<u8>, EncodingType)>(32);
291+
channels.push(tx);
292+
let errors = &errors;
293+
s.spawn(move || {
294+
while let Ok((log_path, data, encoding)) = rx.recv() {
295+
if let Err(e) =
296+
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, encoding)
297+
{
298+
errors.lock().unwrap().push(e);
299+
}
300+
}
301+
});
291302
}
292303

293-
for tc in &ex.toolchains {
294-
let log_path =
295-
crate_to_path_fragment(tc, krate, SanitizationContext::Path).join("log.txt");
296-
let content = db
297-
.load_log(ex, tc, krate)
298-
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
299-
.with_context(|_| format!("failed to read log of {krate} on {tc}"));
300-
let content = match content {
301-
Ok(c) => c,
302-
Err(e) => {
303-
utils::report_failure(&e);
304-
continue;
305-
}
306-
};
304+
for (i, krate) in crates.iter().enumerate() {
305+
if i % progress_every == 0 {
306+
info!("wrote logs for {}/{} crates", i, num_crates)
307+
}
307308

308-
match content {
309-
EncodedLog::Plain(data) => {
310-
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, EncodingType::Plain)
311-
}
312-
EncodedLog::Gzip(data) => {
313-
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, EncodingType::Gzip)
309+
if config.should_skip(krate) {
310+
continue;
311+
}
312+
313+
for tc in &ex.toolchains {
314+
let log_path =
315+
crate_to_path_fragment(tc, krate, SanitizationContext::Path).join("log.txt");
316+
let content = db
317+
.load_log(ex, tc, krate)
318+
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
319+
.with_context(|_| format!("failed to read log of {krate} on {tc}"));
320+
let content = match content {
321+
Ok(c) => c,
322+
Err(e) => {
323+
utils::report_failure(&e);
324+
continue;
325+
}
326+
};
327+
328+
match content {
329+
EncodedLog::Plain(data) => {
330+
channels[i % channels.len()]
331+
.send((log_path, data, EncodingType::Plain))
332+
.unwrap();
333+
}
334+
EncodedLog::Gzip(data) => {
335+
channels[i % channels.len()]
336+
.send((log_path, data, EncodingType::Gzip))
337+
.unwrap();
338+
}
314339
}
315-
}?;
340+
}
316341
}
342+
});
343+
344+
let mut errors = errors.into_inner().unwrap();
345+
for error in errors.iter() {
346+
utils::report_failure(&failure::format_err!("Logging upload failed: {:?}", error));
347+
}
348+
if !errors.is_empty() {
349+
return Err(errors.remove(0));
317350
}
351+
318352
Ok(())
319353
}
320354

@@ -510,7 +544,7 @@ fn compare(
510544
}
511545
}
512546

513-
pub trait ReportWriter {
547+
pub trait ReportWriter: Send + Sync {
514548
fn write_bytes<P: AsRef<Path>>(
515549
&self,
516550
path: P,
@@ -565,14 +599,15 @@ impl Display for FileWriter {
565599
#[cfg(test)]
566600
#[derive(Default)]
567601
pub struct DummyWriter {
568-
results: RefCell<HashMap<(PathBuf, Mime), Vec<u8>>>,
602+
results: std::sync::Mutex<HashMap<(PathBuf, Mime), Vec<u8>>>,
569603
}
570604

571605
#[cfg(test)]
572606
impl DummyWriter {
573607
pub fn get<P: AsRef<Path>>(&self, path: P, mime: &Mime) -> Vec<u8> {
574608
self.results
575-
.borrow()
609+
.lock()
610+
.unwrap()
576611
.get(&(path.as_ref().to_path_buf(), mime.clone()))
577612
.unwrap()
578613
.clone()
@@ -589,13 +624,14 @@ impl ReportWriter for DummyWriter {
589624
_: EncodingType,
590625
) -> Fallible<()> {
591626
self.results
592-
.borrow_mut()
627+
.lock()
628+
.unwrap()
593629
.insert((path.as_ref().to_path_buf(), mime.clone()), b);
594630
Ok(())
595631
}
596632

597633
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
598-
self.results.borrow_mut().insert(
634+
self.results.lock().unwrap().insert(
599635
(path.as_ref().to_path_buf(), mime.clone()),
600636
s.bytes().collect(),
601637
);

0 commit comments

Comments
 (0)