Skip to content

Commit 94f2965

Browse files
authored
Merge pull request #60 from Mark-Simulacrum/refactor-compression
Refactor compression
2 parents 18e3608 + ebb3999 commit 94f2965

File tree

2 files changed

+156
-102
lines changed

2 files changed

+156
-102
lines changed

src/main.rs

Lines changed: 9 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ mod config;
66
mod curl_helper;
77
mod discourse;
88
mod github;
9+
mod recompress;
910
mod sign;
1011
mod smoke_test;
1112

1213
use std::fs::{self, File, OpenOptions};
13-
use std::io::{self, Read};
14+
use std::io::Read;
1415
use std::path::{Path, PathBuf};
1516
use std::process::Command;
16-
use std::time::{Duration, Instant};
17+
use std::time::Duration;
1718
use std::{collections::HashSet, env};
1819

1920
use crate::build_manifest::BuildManifest;
@@ -24,8 +25,6 @@ use chrono::Utc;
2425
use curl::easy::Easy;
2526
use fs2::FileExt;
2627
use github::{CreateTag, Github};
27-
use rayon::prelude::*;
28-
use xz2::read::XzDecoder;
2928

3029
use crate::config::{Channel, Config};
3130

@@ -376,115 +375,23 @@ impl Context {
376375
let file = file?;
377376
let path = file.path();
378377
match path.extension().and_then(|s| s.to_str()) {
379-
// Delete signature/hash files...
380-
Some("asc") | Some("sha256") => {
381-
fs::remove_file(&path)?;
382-
}
383378
// Store off the input files for potential recompression.
384379
Some("xz") => {
385380
to_recompress.push(path.to_path_buf());
386381
}
382+
// Delete signature/hash files...
383+
Some("asc") | Some("sha256") => {
384+
fs::remove_file(&path)?;
385+
}
387386
Some("gz") if self.config.recompress_gz => {
388387
fs::remove_file(&path)?;
389388
}
390389
_ => {}
391390
}
392391
}
393392

394-
// Also, generate *.gz from *.xz if the former is missing. Since the gz
395-
// and xz tarballs have the same content, we did not deploy the gz files
396-
// from the CI. But rustup users may still expect to get gz files, so we
397-
// are recompressing the xz files as gz here.
398-
if !to_recompress.is_empty() {
399-
println!(
400-
"starting to recompress {} files across {} threads",
401-
to_recompress.len(),
402-
to_recompress.len().min(rayon::current_num_threads()),
403-
);
404-
println!(
405-
"gz recompression enabled: {} (note: may occur anyway for missing gz artifacts)",
406-
self.config.recompress_gz
407-
);
408-
println!("xz recompression enabled: {}", self.config.recompress_xz);
409-
let recompress_start = Instant::now();
410-
411-
let recompress_gz = self.config.recompress_gz;
412-
let recompress_xz = self.config.recompress_xz;
413-
let compression_level = flate2::Compression::new(self.config.gzip_compression_level);
414-
to_recompress
415-
.par_iter()
416-
.map(|xz_path| {
417-
println!("recompressing {}...", xz_path.display());
418-
let gz_path = xz_path.with_extension("gz");
419-
420-
// Produce gzip if explicitly enabled or the destination file doesn't exist.
421-
if recompress_gz || !gz_path.is_file() {
422-
let mut xz_orig = XzDecoder::new(File::open(xz_path)?);
423-
let gz = File::create(gz_path)?;
424-
let mut gz = flate2::write::GzEncoder::new(gz, compression_level);
425-
io::copy(&mut xz_orig, &mut gz)?;
426-
}
427-
428-
// xz recompression with more aggressive settings than we want to take the time
429-
// for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
430-
//
431-
// Note that this is using a single-threaded compressor as we're parallelizing
432-
// via rayon already. In rust-lang/rust we were trying to use parallel
433-
// compression, but the default block size for that is 3*dict_size so we
434-
// weren't actually using more than one core in most of the builders with
435-
// <192MB uncompressed tarballs. In promote-release since we're recompressing
436-
// 100s of tarballs there's no need for each individual compression to be
437-
// parallel.
438-
if recompress_xz {
439-
let mut filters = xz2::stream::Filters::new();
440-
let mut lzma_ops = xz2::stream::LzmaOptions::new_preset(9).unwrap();
441-
// This sets the overall dictionary size, which is also how much memory (baseline)
442-
// is needed for decompression.
443-
lzma_ops.dict_size(64 * 1024 * 1024);
444-
// Use the best match finder for compression ratio.
445-
lzma_ops.match_finder(xz2::stream::MatchFinder::BinaryTree4);
446-
lzma_ops.mode(xz2::stream::Mode::Normal);
447-
// Set nice len to the maximum for best compression ratio
448-
lzma_ops.nice_len(273);
449-
// Set depth to a reasonable value, 0 means auto, 1000 is somwhat high but gives
450-
// good results.
451-
lzma_ops.depth(1000);
452-
// 2 is the default and does well for most files
453-
lzma_ops.position_bits(2);
454-
// 0 is the default and does well for most files
455-
lzma_ops.literal_position_bits(0);
456-
// 3 is the default and does well for most files
457-
lzma_ops.literal_context_bits(3);
458-
459-
filters.lzma2(&lzma_ops);
460-
461-
// FIXME: Do we want a checksum as part of compression?
462-
let stream = xz2::stream::Stream::new_stream_encoder(
463-
&filters,
464-
xz2::stream::Check::None,
465-
)
466-
.unwrap();
467-
let xz_recompressed = xz_path.with_extension("xz_recompressed");
468-
let xz_out = File::create(&xz_recompressed)?;
469-
let mut xz_out = xz2::write::XzEncoder::new_stream(
470-
std::io::BufWriter::new(xz_out),
471-
stream,
472-
);
473-
let mut xz_orig = XzDecoder::new(File::open(xz_path)?);
474-
io::copy(&mut xz_orig, &mut xz_out)?;
475-
fs::rename(&xz_recompressed, xz_path)?;
476-
}
477-
478-
Ok::<(), Error>(())
479-
})
480-
.collect::<Result<Vec<()>, Error>>()?;
481-
482-
println!(
483-
"finished recompressing {} files in {:.2?}",
484-
to_recompress.len(),
485-
recompress_start.elapsed(),
486-
);
487-
}
393+
// Generate recompressed artifacts from the input set.
394+
self.recompress(to_recompress)?;
488395

489396
Ok(())
490397
}

src/recompress.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//! This takes care of mapping our input set of tarballs to the output set of tarballs.
2+
//!
3+
//! Currently rust-lang/rust CI produces .xz tarballs with moderate compression, and this module
4+
//! maps that into the following:
5+
//!
6+
//! * gzip tarballs, with compression=9
7+
//! * xz tarballs, with manually tuned compression settings
8+
//!
9+
//! We have ~500 tarballs as of March 2023, and this recompression takes a considerable amount of
10+
//! time, particularly for the xz outputs. In our infrastructure this runs on a 72 vCPU container to
11+
//! finish in a reasonable amount of time.
12+
13+
use crate::Context;
14+
use rayon::prelude::*;
15+
use std::fs::{self, File};
16+
use std::io::{self, Read, Write};
17+
use std::path::PathBuf;
18+
use std::time::Instant;
19+
use xz2::read::XzDecoder;
20+
21+
impl Context {
22+
pub fn recompress(&self, mut to_recompress: Vec<PathBuf>) -> anyhow::Result<()> {
23+
println!(
24+
"starting to recompress {} files across {} threads",
25+
to_recompress.len(),
26+
to_recompress.len().min(rayon::current_num_threads()),
27+
);
28+
println!(
29+
"gz recompression enabled: {} (note: may occur anyway for missing gz artifacts)",
30+
self.config.recompress_gz
31+
);
32+
println!("xz recompression enabled: {}", self.config.recompress_xz);
33+
let recompress_start = Instant::now();
34+
35+
let recompress_gz = self.config.recompress_gz;
36+
let recompress_xz = self.config.recompress_xz;
37+
let compression_level = flate2::Compression::new(self.config.gzip_compression_level);
38+
39+
// Query the length of each file, and sort by length. This puts the smallest files
40+
// toward the end of the array, which will generally deprioritize them in the parallel
41+
// next parallel loop, avoiding as much of a long-tail on the compression work
42+
// (smallest files are fastest to recompress typically).
43+
//
44+
// FIXME: Rayon's documentation on par_iter isn't very detailed in terms of whether this
45+
// does any good. We may want to replace this with our own manual thread pool
46+
// implementation that guarantees this property - each task is large enough that just
47+
// popping from a single Mutex<Vec<...>> will be plenty fast enough.
48+
to_recompress.sort_by_cached_key(|path| {
49+
std::cmp::Reverse(fs::metadata(path).map(|m| m.len()).unwrap_or(0))
50+
});
51+
52+
to_recompress
53+
.par_iter()
54+
.map(|xz_path| {
55+
println!("recompressing {}...", xz_path.display());
56+
let gz_path = xz_path.with_extension("gz");
57+
58+
let mut destinations: Vec<Box<dyn io::Write>> = Vec::new();
59+
60+
// Produce gzip if explicitly enabled or the destination file doesn't exist.
61+
if recompress_gz || !gz_path.is_file() {
62+
let gz = File::create(gz_path)?;
63+
destinations.push(Box::new(flate2::write::GzEncoder::new(
64+
gz,
65+
compression_level,
66+
)));
67+
}
68+
69+
// xz recompression with more aggressive settings than we want to take the time
70+
// for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
71+
//
72+
// Note that this is using a single-threaded compressor as we're parallelizing
73+
// via rayon already. In rust-lang/rust we were trying to use parallel
74+
// compression, but the default block size for that is 3*dict_size so we
75+
// weren't actually using more than one core in most of the builders with
76+
// <192MB uncompressed tarballs. In promote-release since we're recompressing
77+
// 100s of tarballs there's no need for each individual compression to be
78+
// parallel.
79+
let xz_recompressed = xz_path.with_extension("xz_recompressed");
80+
if recompress_xz {
81+
let mut filters = xz2::stream::Filters::new();
82+
let mut lzma_ops = xz2::stream::LzmaOptions::new_preset(9).unwrap();
83+
// This sets the overall dictionary size, which is also how much memory (baseline)
84+
// is needed for decompression.
85+
lzma_ops.dict_size(64 * 1024 * 1024);
86+
// Use the best match finder for compression ratio.
87+
lzma_ops.match_finder(xz2::stream::MatchFinder::BinaryTree4);
88+
lzma_ops.mode(xz2::stream::Mode::Normal);
89+
// Set nice len to the maximum for best compression ratio
90+
lzma_ops.nice_len(273);
91+
// Set depth to a reasonable value, 0 means auto, 1000 is somwhat high but gives
92+
// good results.
93+
lzma_ops.depth(1000);
94+
// 2 is the default and does well for most files
95+
lzma_ops.position_bits(2);
96+
// 0 is the default and does well for most files
97+
lzma_ops.literal_position_bits(0);
98+
// 3 is the default and does well for most files
99+
lzma_ops.literal_context_bits(3);
100+
101+
filters.lzma2(&lzma_ops);
102+
103+
// FIXME: Do we want a checksum as part of compression?
104+
let stream =
105+
xz2::stream::Stream::new_stream_encoder(&filters, xz2::stream::Check::None)
106+
.unwrap();
107+
let xz_out = File::create(&xz_recompressed)?;
108+
destinations.push(Box::new(xz2::write::XzEncoder::new_stream(
109+
std::io::BufWriter::new(xz_out),
110+
stream,
111+
)));
112+
}
113+
114+
// We only decompress once and then write into each of the compressors before
115+
// moving on.
116+
//
117+
// This code assumes that compression with `write_all` will never fail (i.e., we
118+
// can take arbitrary amounts of data as input). That seems like a reasonable
119+
// assumption though.
120+
let mut decompressor = XzDecoder::new(File::open(xz_path)?);
121+
let mut buffer = vec![0u8; 4 * 1024 * 1024];
122+
loop {
123+
let length = decompressor.read(&mut buffer)?;
124+
if length == 0 {
125+
break;
126+
}
127+
for destination in destinations.iter_mut() {
128+
destination.write_all(&buffer[..length])?;
129+
}
130+
}
131+
132+
if recompress_xz {
133+
fs::rename(&xz_recompressed, xz_path)?;
134+
}
135+
136+
Ok::<(), anyhow::Error>(())
137+
})
138+
.collect::<anyhow::Result<Vec<()>>>()?;
139+
140+
println!(
141+
"finished recompressing {} files in {:.2?}",
142+
to_recompress.len(),
143+
recompress_start.elapsed(),
144+
);
145+
Ok(())
146+
}
147+
}

0 commit comments

Comments
 (0)