11
11
//! finish in a reasonable amount of time.
12
12
13
13
use crate :: Context ;
14
- use rayon:: prelude:: * ;
15
14
use std:: fmt:: Write as FmtWrite ;
16
15
use std:: fs:: { self , File } ;
17
16
use std:: io:: { self , Read , Write } ;
@@ -24,7 +23,7 @@ impl Context {
24
23
println ! (
25
24
"starting to recompress {} files across {} threads" ,
26
25
to_recompress. len( ) ,
27
- to_recompress. len( ) . min( rayon :: current_num_threads ( ) ) ,
26
+ to_recompress. len( ) . min( self . config . num_threads ) ,
28
27
) ;
29
28
println ! (
30
29
"gz recompression enabled: {} (note: may occur anyway for missing gz artifacts)" ,
@@ -38,135 +37,154 @@ impl Context {
38
37
let compression_level = flate2:: Compression :: new ( self . config . gzip_compression_level ) ;
39
38
40
39
// Query the length of each file, and sort by length. This puts the smallest files
41
- // toward the end of the array, which will generally deprioritize them in the parallel
42
- // next parallel loop, avoiding as much of a long-tail on the compression work
43
- // (smallest files are fastest to recompress typically).
44
- //
45
- // FIXME: Rayon's documentation on par_iter isn't very detailed in terms of whether this
46
- // does any good. We may want to replace this with our own manual thread pool
47
- // implementation that guarantees this property - each task is large enough that just
48
- // popping from a single Mutex<Vec<...>> will be plenty fast enough.
49
- to_recompress. sort_by_cached_key ( |path| {
50
- std:: cmp:: Reverse ( fs:: metadata ( path) . map ( |m| m. len ( ) ) . unwrap_or ( 0 ) )
51
- } ) ;
52
-
53
- to_recompress
54
- . par_iter ( )
55
- . map ( |xz_path| {
56
- println ! ( "recompressing {}..." , xz_path. display( ) ) ;
57
- let file_start = Instant :: now ( ) ;
58
- let gz_path = xz_path. with_extension ( "gz" ) ;
59
-
60
- let mut destinations: Vec < ( & str , Box < dyn io:: Write > ) > = Vec :: new ( ) ;
61
-
62
- // Produce gzip if explicitly enabled or the destination file doesn't exist.
63
- if recompress_gz || !gz_path. is_file ( ) {
64
- let gz = File :: create ( gz_path) ?;
65
- destinations. push ( (
66
- "gz" ,
67
- Box :: new ( flate2:: write:: GzEncoder :: new ( gz, compression_level) ) ,
68
- ) ) ;
69
- }
70
-
71
- // xz recompression with more aggressive settings than we want to take the time
72
- // for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
73
- //
74
- // Note that this is using a single-threaded compressor as we're parallelizing
75
- // via rayon already. In rust-lang/rust we were trying to use parallel
76
- // compression, but the default block size for that is 3*dict_size so we
77
- // weren't actually using more than one core in most of the builders with
78
- // <192MB uncompressed tarballs. In promote-release since we're recompressing
79
- // 100s of tarballs there's no need for each individual compression to be
80
- // parallel.
81
- let xz_recompressed = xz_path. with_extension ( "xz_recompressed" ) ;
82
- if recompress_xz {
83
- let mut filters = xz2:: stream:: Filters :: new ( ) ;
84
- let mut lzma_ops = xz2:: stream:: LzmaOptions :: new_preset ( 9 ) . unwrap ( ) ;
85
- // This sets the overall dictionary size, which is also how much memory (baseline)
86
- // is needed for decompression.
87
- lzma_ops. dict_size ( 64 * 1024 * 1024 ) ;
88
- // Use the best match finder for compression ratio.
89
- lzma_ops. match_finder ( xz2:: stream:: MatchFinder :: BinaryTree4 ) ;
90
- lzma_ops. mode ( xz2:: stream:: Mode :: Normal ) ;
91
- // Set nice len to the maximum for best compression ratio
92
- lzma_ops. nice_len ( 273 ) ;
93
- // Set depth to a reasonable value, 0 means auto, 1000 is somwhat high but gives
94
- // good results.
95
- lzma_ops. depth ( 1000 ) ;
96
- // 2 is the default and does well for most files
97
- lzma_ops. position_bits ( 2 ) ;
98
- // 0 is the default and does well for most files
99
- lzma_ops. literal_position_bits ( 0 ) ;
100
- // 3 is the default and does well for most files
101
- lzma_ops. literal_context_bits ( 3 ) ;
102
-
103
- filters. lzma2 ( & lzma_ops) ;
104
-
105
- // FIXME: Do we want a checksum as part of compression?
106
- let stream =
107
- xz2:: stream:: Stream :: new_stream_encoder ( & filters, xz2:: stream:: Check :: None )
40
+ // toward the start of the array, which will make us pop them last. Smaller units of work
41
+ // are less likely to lead to a long tail of a single thread doing work while others are
42
+ // idle, so we want to schedule them last (i.e., in the tail of the build).
43
+ to_recompress. sort_by_cached_key ( |path| fs:: metadata ( path) . map ( |m| m. len ( ) ) . unwrap_or ( 0 ) ) ;
44
+
45
+ let total_length = to_recompress. len ( ) ;
46
+
47
+ // Manually parallelize across freshly spawned worker threads. rayon is nice, but since we
48
+ // care about the scheduling order and have very large units of work (>500ms, typically 10s
49
+ // of seconds) the more efficient parallelism in rayon isn't desirable. (Scheduling order
50
+ // is the particular problem for us).
51
+ let to_recompress = std:: sync:: Mutex :: new ( to_recompress) ;
52
+ std:: thread:: scope ( |s| {
53
+ // Spawn num_threads workers...
54
+ let mut tasks = Vec :: new ( ) ;
55
+ for _ in 0 ..self . config . num_threads {
56
+ tasks. push ( s. spawn ( || {
57
+ while let Some ( xz_path) = {
58
+ // Extra block is needed to make sure the lock guard drops before we enter the
59
+ // loop iteration, because while-let is desugared to a loop + match, and match
60
+ // scopes live until the end of the match.
61
+ let path = to_recompress. lock ( ) . unwrap ( ) . pop ( ) ;
62
+ path
63
+ } {
64
+ println ! ( "recompressing {}..." , xz_path. display( ) ) ;
65
+ let file_start = Instant :: now ( ) ;
66
+ let gz_path = xz_path. with_extension ( "gz" ) ;
67
+
68
+ let mut destinations: Vec < ( & str , Box < dyn io:: Write > ) > = Vec :: new ( ) ;
69
+
70
+ // Produce gzip if explicitly enabled or the destination file doesn't exist.
71
+ if recompress_gz || !gz_path. is_file ( ) {
72
+ let gz = File :: create ( gz_path) ?;
73
+ destinations. push ( (
74
+ "gz" ,
75
+ Box :: new ( flate2:: write:: GzEncoder :: new ( gz, compression_level) ) ,
76
+ ) ) ;
77
+ }
78
+
79
+ // xz recompression with more aggressive settings than we want to take the time
80
+ // for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
81
+ //
82
+ // Note that this is using a single-threaded compressor as we're parallelizing
83
+ // via rayon already. In rust-lang/rust we were trying to use parallel
84
+ // compression, but the default block size for that is 3*dict_size so we
85
+ // weren't actually using more than one core in most of the builders with
86
+ // <192MB uncompressed tarballs. In promote-release since we're recompressing
87
+ // 100s of tarballs there's no need for each individual compression to be
88
+ // parallel.
89
+ let xz_recompressed = xz_path. with_extension ( "xz_recompressed" ) ;
90
+ if recompress_xz {
91
+ let mut filters = xz2:: stream:: Filters :: new ( ) ;
92
+ let mut lzma_ops = xz2:: stream:: LzmaOptions :: new_preset ( 9 ) . unwrap ( ) ;
93
+ // This sets the overall dictionary size, which is also how much memory (baseline)
94
+ // is needed for decompression.
95
+ lzma_ops. dict_size ( 64 * 1024 * 1024 ) ;
96
+ // Use the best match finder for compression ratio.
97
+ lzma_ops. match_finder ( xz2:: stream:: MatchFinder :: BinaryTree4 ) ;
98
+ lzma_ops. mode ( xz2:: stream:: Mode :: Normal ) ;
99
+ // Set nice len to the maximum for best compression ratio
100
+ lzma_ops. nice_len ( 273 ) ;
101
+ // Set depth to a reasonable value, 0 means auto, 1000 is somwhat high but gives
102
+ // good results.
103
+ lzma_ops. depth ( 1000 ) ;
104
+ // 2 is the default and does well for most files
105
+ lzma_ops. position_bits ( 2 ) ;
106
+ // 0 is the default and does well for most files
107
+ lzma_ops. literal_position_bits ( 0 ) ;
108
+ // 3 is the default and does well for most files
109
+ lzma_ops. literal_context_bits ( 3 ) ;
110
+
111
+ filters. lzma2 ( & lzma_ops) ;
112
+
113
+ // FIXME: Do we want a checksum as part of compression?
114
+ let stream = xz2:: stream:: Stream :: new_stream_encoder (
115
+ & filters,
116
+ xz2:: stream:: Check :: None ,
117
+ )
108
118
. unwrap ( ) ;
109
- let xz_out = File :: create ( & xz_recompressed) ?;
110
- destinations. push ( (
111
- "xz" ,
112
- Box :: new ( xz2:: write:: XzEncoder :: new_stream (
113
- std:: io:: BufWriter :: new ( xz_out) ,
114
- stream,
115
- ) ) ,
116
- ) ) ;
117
- }
118
-
119
- // We only decompress once and then write into each of the compressors before
120
- // moving on.
121
- //
122
- // This code assumes that compression with `write_all` will never fail (i.e., we
123
- // can take arbitrary amounts of data as input). That seems like a reasonable
124
- // assumption though.
125
- let mut decompressor = XzDecoder :: new ( File :: open ( xz_path) ?) ;
126
- let mut buffer = vec ! [ 0u8 ; 4 * 1024 * 1024 ] ;
127
- let mut decompress_time = Duration :: ZERO ;
128
- let mut time_by_dest = vec ! [ Duration :: ZERO ; destinations. len( ) ] ;
129
- loop {
130
- let start = Instant :: now ( ) ;
131
- let length = decompressor. read ( & mut buffer) ?;
132
- decompress_time += start. elapsed ( ) ;
133
- if length == 0 {
134
- break ;
135
- }
136
- for ( idx, ( _, destination) ) in destinations. iter_mut ( ) . enumerate ( ) {
137
- let start = std:: time:: Instant :: now ( ) ;
138
- destination. write_all ( & buffer[ ..length] ) ?;
139
- time_by_dest[ idx] += start. elapsed ( ) ;
119
+ let xz_out = File :: create ( & xz_recompressed) ?;
120
+ destinations. push ( (
121
+ "xz" ,
122
+ Box :: new ( xz2:: write:: XzEncoder :: new_stream (
123
+ std:: io:: BufWriter :: new ( xz_out) ,
124
+ stream,
125
+ ) ) ,
126
+ ) ) ;
127
+ }
128
+
129
+ // We only decompress once and then write into each of the compressors before
130
+ // moving on.
131
+ //
132
+ // This code assumes that compression with `write_all` will never fail (i.e., we
133
+ // can take arbitrary amounts of data as input). That seems like a reasonable
134
+ // assumption though.
135
+ let mut decompressor = XzDecoder :: new ( File :: open ( & xz_path) ?) ;
136
+ let mut buffer = vec ! [ 0u8 ; 4 * 1024 * 1024 ] ;
137
+ let mut decompress_time = Duration :: ZERO ;
138
+ let mut time_by_dest = vec ! [ Duration :: ZERO ; destinations. len( ) ] ;
139
+ loop {
140
+ let start = Instant :: now ( ) ;
141
+ let length = decompressor. read ( & mut buffer) ?;
142
+ decompress_time += start. elapsed ( ) ;
143
+ if length == 0 {
144
+ break ;
145
+ }
146
+ for ( idx, ( _, destination) ) in destinations. iter_mut ( ) . enumerate ( ) {
147
+ let start = std:: time:: Instant :: now ( ) ;
148
+ destination. write_all ( & buffer[ ..length] ) ?;
149
+ time_by_dest[ idx] += start. elapsed ( ) ;
150
+ }
151
+ }
152
+
153
+ let mut compression_times = String :: new ( ) ;
154
+ for ( idx, ( name, _) ) in destinations. iter ( ) . enumerate ( ) {
155
+ write ! (
156
+ compression_times,
157
+ ", {:.2?} {} compression" ,
158
+ time_by_dest[ idx] , name
159
+ ) ?;
160
+ }
161
+ println ! (
162
+ "recompressed {}: {:.2?} total, {:.2?} decompression{}" ,
163
+ xz_path. display( ) ,
164
+ file_start. elapsed( ) ,
165
+ decompress_time,
166
+ compression_times
167
+ ) ;
168
+
169
+ if recompress_xz {
170
+ fs:: rename ( & xz_recompressed, xz_path) ?;
171
+ }
140
172
}
141
- }
142
-
143
- let mut compression_times = String :: new ( ) ;
144
- for ( idx, ( name, _) ) in destinations. iter ( ) . enumerate ( ) {
145
- write ! (
146
- compression_times,
147
- ", {:.2?} {} compression" ,
148
- time_by_dest[ idx] , name
149
- ) ?;
150
- }
151
- println ! (
152
- "recompressed {}: {:.2?} total, {:.2?} decompression{}" ,
153
- xz_path. display( ) ,
154
- file_start. elapsed( ) ,
155
- decompress_time,
156
- compression_times
157
- ) ;
158
-
159
- if recompress_xz {
160
- fs:: rename ( & xz_recompressed, xz_path) ?;
161
- }
162
-
163
- Ok :: < ( ) , anyhow:: Error > ( ( ) )
164
- } )
165
- . collect :: < anyhow:: Result < Vec < ( ) > > > ( ) ?;
173
+
174
+ Ok :: < _ , anyhow:: Error > ( ( ) )
175
+ } ) ) ;
176
+ }
177
+
178
+ for task in tasks {
179
+ task. join ( ) . expect ( "no panics" ) ?;
180
+ }
181
+
182
+ Ok :: < _ , anyhow:: Error > ( ( ) )
183
+ } ) ?;
166
184
167
185
println ! (
168
186
"finished recompressing {} files in {:.2?}" ,
169
- to_recompress . len ( ) ,
187
+ total_length ,
170
188
recompress_start. elapsed( ) ,
171
189
) ;
172
190
Ok ( ( ) )
0 commit comments