Skip to content

Commit b55c4b3

Browse files
authored
Merge pull request #64 from Mark-Simulacrum/custom-recompress
Replace rayon with manual parallelism
2 parents 151315f + 3ff132d commit b55c4b3

File tree

1 file changed

+144
-126
lines changed

1 file changed

+144
-126
lines changed

src/recompress.rs

Lines changed: 144 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
//! finish in a reasonable amount of time.
1212
1313
use crate::Context;
14-
use rayon::prelude::*;
1514
use std::fmt::Write as FmtWrite;
1615
use std::fs::{self, File};
1716
use std::io::{self, Read, Write};
@@ -24,7 +23,7 @@ impl Context {
2423
println!(
2524
"starting to recompress {} files across {} threads",
2625
to_recompress.len(),
27-
to_recompress.len().min(rayon::current_num_threads()),
26+
to_recompress.len().min(self.config.num_threads),
2827
);
2928
println!(
3029
"gz recompression enabled: {} (note: may occur anyway for missing gz artifacts)",
@@ -38,135 +37,154 @@ impl Context {
3837
let compression_level = flate2::Compression::new(self.config.gzip_compression_level);
3938

4039
// Query the length of each file, and sort by length. This puts the smallest files
41-
// toward the end of the array, which will generally deprioritize them in the parallel
42-
// next parallel loop, avoiding as much of a long-tail on the compression work
43-
// (smallest files are fastest to recompress typically).
44-
//
45-
// FIXME: Rayon's documentation on par_iter isn't very detailed in terms of whether this
46-
// does any good. We may want to replace this with our own manual thread pool
47-
// implementation that guarantees this property - each task is large enough that just
48-
// popping from a single Mutex<Vec<...>> will be plenty fast enough.
49-
to_recompress.sort_by_cached_key(|path| {
50-
std::cmp::Reverse(fs::metadata(path).map(|m| m.len()).unwrap_or(0))
51-
});
52-
53-
to_recompress
54-
.par_iter()
55-
.map(|xz_path| {
56-
println!("recompressing {}...", xz_path.display());
57-
let file_start = Instant::now();
58-
let gz_path = xz_path.with_extension("gz");
59-
60-
let mut destinations: Vec<(&str, Box<dyn io::Write>)> = Vec::new();
61-
62-
// Produce gzip if explicitly enabled or the destination file doesn't exist.
63-
if recompress_gz || !gz_path.is_file() {
64-
let gz = File::create(gz_path)?;
65-
destinations.push((
66-
"gz",
67-
Box::new(flate2::write::GzEncoder::new(gz, compression_level)),
68-
));
69-
}
70-
71-
// xz recompression with more aggressive settings than we want to take the time
72-
// for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
73-
//
74-
// Note that this is using a single-threaded compressor as we're parallelizing
75-
// via rayon already. In rust-lang/rust we were trying to use parallel
76-
// compression, but the default block size for that is 3*dict_size so we
77-
// weren't actually using more than one core in most of the builders with
78-
// <192MB uncompressed tarballs. In promote-release since we're recompressing
79-
// 100s of tarballs there's no need for each individual compression to be
80-
// parallel.
81-
let xz_recompressed = xz_path.with_extension("xz_recompressed");
82-
if recompress_xz {
83-
let mut filters = xz2::stream::Filters::new();
84-
let mut lzma_ops = xz2::stream::LzmaOptions::new_preset(9).unwrap();
85-
// This sets the overall dictionary size, which is also how much memory (baseline)
86-
// is needed for decompression.
87-
lzma_ops.dict_size(64 * 1024 * 1024);
88-
// Use the best match finder for compression ratio.
89-
lzma_ops.match_finder(xz2::stream::MatchFinder::BinaryTree4);
90-
lzma_ops.mode(xz2::stream::Mode::Normal);
91-
// Set nice len to the maximum for best compression ratio
92-
lzma_ops.nice_len(273);
93-
// Set depth to a reasonable value, 0 means auto, 1000 is somwhat high but gives
94-
// good results.
95-
lzma_ops.depth(1000);
96-
// 2 is the default and does well for most files
97-
lzma_ops.position_bits(2);
98-
// 0 is the default and does well for most files
99-
lzma_ops.literal_position_bits(0);
100-
// 3 is the default and does well for most files
101-
lzma_ops.literal_context_bits(3);
102-
103-
filters.lzma2(&lzma_ops);
104-
105-
// FIXME: Do we want a checksum as part of compression?
106-
let stream =
107-
xz2::stream::Stream::new_stream_encoder(&filters, xz2::stream::Check::None)
40+
// toward the start of the array, which will make us pop them last. Smaller units of work
41+
// are less likely to lead to a long tail of a single thread doing work while others are
42+
// idle, so we want to schedule them last (i.e., in the tail of the build).
43+
to_recompress.sort_by_cached_key(|path| fs::metadata(path).map(|m| m.len()).unwrap_or(0));
44+
45+
let total_length = to_recompress.len();
46+
47+
// Manually parallelize across freshly spawned worker threads. rayon is nice, but since we
48+
// care about the scheduling order and have very large units of work (>500ms, typically 10s
49+
// of seconds) the more efficient parallelism in rayon isn't desirable. (Scheduling order
50+
// is the particular problem for us).
51+
let to_recompress = std::sync::Mutex::new(to_recompress);
52+
std::thread::scope(|s| {
53+
// Spawn num_threads workers...
54+
let mut tasks = Vec::new();
55+
for _ in 0..self.config.num_threads {
56+
tasks.push(s.spawn(|| {
57+
while let Some(xz_path) = {
58+
// Extra block is needed to make sure the lock guard drops before we enter the
59+
// loop iteration, because while-let is desugared to a loop + match, and match
60+
// scopes live until the end of the match.
61+
let path = to_recompress.lock().unwrap().pop();
62+
path
63+
} {
64+
println!("recompressing {}...", xz_path.display());
65+
let file_start = Instant::now();
66+
let gz_path = xz_path.with_extension("gz");
67+
68+
let mut destinations: Vec<(&str, Box<dyn io::Write>)> = Vec::new();
69+
70+
// Produce gzip if explicitly enabled or the destination file doesn't exist.
71+
if recompress_gz || !gz_path.is_file() {
72+
let gz = File::create(gz_path)?;
73+
destinations.push((
74+
"gz",
75+
Box::new(flate2::write::GzEncoder::new(gz, compression_level)),
76+
));
77+
}
78+
79+
// xz recompression with more aggressive settings than we want to take the time
80+
// for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
81+
//
82+
// Note that this is using a single-threaded compressor as we're parallelizing
83+
// via rayon already. In rust-lang/rust we were trying to use parallel
84+
// compression, but the default block size for that is 3*dict_size so we
85+
// weren't actually using more than one core in most of the builders with
86+
// <192MB uncompressed tarballs. In promote-release since we're recompressing
87+
// 100s of tarballs there's no need for each individual compression to be
88+
// parallel.
89+
let xz_recompressed = xz_path.with_extension("xz_recompressed");
90+
if recompress_xz {
91+
let mut filters = xz2::stream::Filters::new();
92+
let mut lzma_ops = xz2::stream::LzmaOptions::new_preset(9).unwrap();
93+
// This sets the overall dictionary size, which is also how much memory (baseline)
94+
// is needed for decompression.
95+
lzma_ops.dict_size(64 * 1024 * 1024);
96+
// Use the best match finder for compression ratio.
97+
lzma_ops.match_finder(xz2::stream::MatchFinder::BinaryTree4);
98+
lzma_ops.mode(xz2::stream::Mode::Normal);
99+
// Set nice len to the maximum for best compression ratio
100+
lzma_ops.nice_len(273);
101+
// Set depth to a reasonable value, 0 means auto, 1000 is somwhat high but gives
102+
// good results.
103+
lzma_ops.depth(1000);
104+
// 2 is the default and does well for most files
105+
lzma_ops.position_bits(2);
106+
// 0 is the default and does well for most files
107+
lzma_ops.literal_position_bits(0);
108+
// 3 is the default and does well for most files
109+
lzma_ops.literal_context_bits(3);
110+
111+
filters.lzma2(&lzma_ops);
112+
113+
// FIXME: Do we want a checksum as part of compression?
114+
let stream = xz2::stream::Stream::new_stream_encoder(
115+
&filters,
116+
xz2::stream::Check::None,
117+
)
108118
.unwrap();
109-
let xz_out = File::create(&xz_recompressed)?;
110-
destinations.push((
111-
"xz",
112-
Box::new(xz2::write::XzEncoder::new_stream(
113-
std::io::BufWriter::new(xz_out),
114-
stream,
115-
)),
116-
));
117-
}
118-
119-
// We only decompress once and then write into each of the compressors before
120-
// moving on.
121-
//
122-
// This code assumes that compression with `write_all` will never fail (i.e., we
123-
// can take arbitrary amounts of data as input). That seems like a reasonable
124-
// assumption though.
125-
let mut decompressor = XzDecoder::new(File::open(xz_path)?);
126-
let mut buffer = vec![0u8; 4 * 1024 * 1024];
127-
let mut decompress_time = Duration::ZERO;
128-
let mut time_by_dest = vec![Duration::ZERO; destinations.len()];
129-
loop {
130-
let start = Instant::now();
131-
let length = decompressor.read(&mut buffer)?;
132-
decompress_time += start.elapsed();
133-
if length == 0 {
134-
break;
135-
}
136-
for (idx, (_, destination)) in destinations.iter_mut().enumerate() {
137-
let start = std::time::Instant::now();
138-
destination.write_all(&buffer[..length])?;
139-
time_by_dest[idx] += start.elapsed();
119+
let xz_out = File::create(&xz_recompressed)?;
120+
destinations.push((
121+
"xz",
122+
Box::new(xz2::write::XzEncoder::new_stream(
123+
std::io::BufWriter::new(xz_out),
124+
stream,
125+
)),
126+
));
127+
}
128+
129+
// We only decompress once and then write into each of the compressors before
130+
// moving on.
131+
//
132+
// This code assumes that compression with `write_all` will never fail (i.e., we
133+
// can take arbitrary amounts of data as input). That seems like a reasonable
134+
// assumption though.
135+
let mut decompressor = XzDecoder::new(File::open(&xz_path)?);
136+
let mut buffer = vec![0u8; 4 * 1024 * 1024];
137+
let mut decompress_time = Duration::ZERO;
138+
let mut time_by_dest = vec![Duration::ZERO; destinations.len()];
139+
loop {
140+
let start = Instant::now();
141+
let length = decompressor.read(&mut buffer)?;
142+
decompress_time += start.elapsed();
143+
if length == 0 {
144+
break;
145+
}
146+
for (idx, (_, destination)) in destinations.iter_mut().enumerate() {
147+
let start = std::time::Instant::now();
148+
destination.write_all(&buffer[..length])?;
149+
time_by_dest[idx] += start.elapsed();
150+
}
151+
}
152+
153+
let mut compression_times = String::new();
154+
for (idx, (name, _)) in destinations.iter().enumerate() {
155+
write!(
156+
compression_times,
157+
", {:.2?} {} compression",
158+
time_by_dest[idx], name
159+
)?;
160+
}
161+
println!(
162+
"recompressed {}: {:.2?} total, {:.2?} decompression{}",
163+
xz_path.display(),
164+
file_start.elapsed(),
165+
decompress_time,
166+
compression_times
167+
);
168+
169+
if recompress_xz {
170+
fs::rename(&xz_recompressed, xz_path)?;
171+
}
140172
}
141-
}
142-
143-
let mut compression_times = String::new();
144-
for (idx, (name, _)) in destinations.iter().enumerate() {
145-
write!(
146-
compression_times,
147-
", {:.2?} {} compression",
148-
time_by_dest[idx], name
149-
)?;
150-
}
151-
println!(
152-
"recompressed {}: {:.2?} total, {:.2?} decompression{}",
153-
xz_path.display(),
154-
file_start.elapsed(),
155-
decompress_time,
156-
compression_times
157-
);
158-
159-
if recompress_xz {
160-
fs::rename(&xz_recompressed, xz_path)?;
161-
}
162-
163-
Ok::<(), anyhow::Error>(())
164-
})
165-
.collect::<anyhow::Result<Vec<()>>>()?;
173+
174+
Ok::<_, anyhow::Error>(())
175+
}));
176+
}
177+
178+
for task in tasks {
179+
task.join().expect("no panics")?;
180+
}
181+
182+
Ok::<_, anyhow::Error>(())
183+
})?;
166184

167185
println!(
168186
"finished recompressing {} files in {:.2?}",
169-
to_recompress.len(),
187+
total_length,
170188
recompress_start.elapsed(),
171189
);
172190
Ok(())

0 commit comments

Comments
 (0)