@@ -17,6 +17,7 @@ use std::{
17
17
sync:: {
18
18
atomic:: { AtomicBool , Ordering } ,
19
19
mpsc:: { sync_channel, Receiver , SyncSender } ,
20
+ Condvar , LazyLock , Mutex , MutexGuard ,
20
21
} ,
21
22
thread,
22
23
} ;
@@ -48,6 +49,16 @@ enum DocKind {
48
49
Outer ,
49
50
}
50
51
52
+ static N_THREADS : LazyLock < usize > = LazyLock :: new ( || {
53
+ std:: cmp:: max (
54
+ 1 ,
55
+ thread:: available_parallelism ( )
56
+ . unwrap ( )
57
+ . get ( )
58
+ . saturating_sub ( 1 ) ,
59
+ )
60
+ } ) ;
61
+
51
62
static CTRLC : AtomicBool = AtomicBool :: new ( false ) ;
52
63
53
64
fn main ( ) -> Result < ( ) > {
@@ -165,7 +176,7 @@ fn format_file(opts: Options, path: impl AsRef<Path>) -> Result<()> {
165
176
. map ( |chunk| chunk. characteristics )
166
177
. collect :: < Vec < _ > > ( ) ;
167
178
168
- let ( sender, receiver) = sync_channel :: < Child > ( 0 ) ;
179
+ let ( sender, receiver) = sync_channel :: < Child > ( * N_THREADS ) ;
169
180
let handle = thread:: spawn ( move || prettier_spawner ( & opts, characteristics, & sender) ) ;
170
181
171
182
let mut rewriter = Rewriter :: new ( & contents) ;
@@ -265,33 +276,38 @@ fn preprocess_line(line: &str) -> (Option<Characteristics>, &str) {
265
276
///
266
277
/// Note that `characteristics` influences the arguments passed to `prettier`. So the `prettier`
267
278
/// instances must be consumed in the same order in which they were spawned.
279
+ #[ allow( clippy:: unnecessary_wraps) ]
268
280
fn prettier_spawner (
269
281
opts : & Options ,
270
282
characteristics : Vec < Characteristics > ,
271
283
sender : & SyncSender < Child > ,
272
284
) -> Result < ( ) > {
273
- let children = characteristics
274
- . into_iter ( )
275
- . map ( |characteristics| {
276
- let mut command = Command :: new ( "prettier" ) ;
277
- command. arg ( "--parser=markdown" ) ;
278
- if let Some ( max_width) = opts. max_width {
279
- command. arg ( "--prose-wrap=always" ) ;
280
- command. arg ( format ! (
281
- "--print-width={}" ,
282
- max_width. saturating_sub( characteristics. indent + 4 )
283
- ) ) ;
284
- }
285
- command. args ( & opts. args ) ;
286
- command
287
- . stdin ( Stdio :: piped ( ) )
288
- . stdout ( Stdio :: piped ( ) )
289
- . stderr ( Stdio :: piped ( ) ) ;
290
- command. spawn ( ) . expect ( "failed to spawn `prettier`" )
291
- } )
292
- . collect :: < Vec < _ > > ( ) ;
293
- for child in children {
294
- sender. send ( child) ?;
285
+ for characteristics in characteristics {
286
+ let mut used_parallelism = lock_used_parallelism_for_incrementing ( ) ;
287
+ let mut command = Command :: new ( "prettier" ) ;
288
+ command. arg ( "--parser=markdown" ) ;
289
+ if let Some ( max_width) = opts. max_width {
290
+ command. arg ( "--prose-wrap=always" ) ;
291
+ command. arg ( format ! (
292
+ "--print-width={}" ,
293
+ max_width. saturating_sub( characteristics. indent + 4 )
294
+ ) ) ;
295
+ }
296
+ command. args ( & opts. args ) ;
297
+ command
298
+ . stdin ( Stdio :: piped ( ) )
299
+ . stdout ( Stdio :: piped ( ) )
300
+ . stderr ( Stdio :: piped ( ) ) ;
301
+ let child = command. spawn ( ) . expect ( "failed to spawn `prettier`" ) ;
302
+ // smoelius: The next send should never fail. The channel is created with a capacity of
303
+ // `N_THREADS`, and no more than `N_THREADS` children exist at any time.
304
+ sender. try_send ( child) . unwrap_or_else ( |error| {
305
+ panic ! (
306
+ "tried to send more than {} children on channel: {error:?}" ,
307
+ * N_THREADS
308
+ )
309
+ } ) ;
310
+ * used_parallelism += 1 ;
295
311
}
296
312
Ok ( ( ) )
297
313
}
@@ -313,11 +329,31 @@ fn format_chunk(receiver: &Receiver<Child>, chunk: &Chunk) -> Result<String> {
313
329
OutputError :: new( output)
314
330
) ;
315
331
332
+ decrement_used_parallelism ( ) ;
333
+
316
334
let docs = String :: from_utf8 ( output. stdout ) ?;
317
335
318
336
Ok ( postprocess_docs ( chunk. characteristics , & docs) )
319
337
}
320
338
339
+ static USED_PARALLELISM : Mutex < usize > = Mutex :: new ( 0 ) ;
340
+ static USED_PARALLELISM_CONDVAR : Condvar = Condvar :: new ( ) ;
341
+
342
+ fn lock_used_parallelism_for_incrementing ( ) -> MutexGuard < ' static , usize > {
343
+ let used_parallelism = USED_PARALLELISM . lock ( ) . unwrap ( ) ;
344
+ USED_PARALLELISM_CONDVAR
345
+ . wait_while ( used_parallelism, |used_parallelism| {
346
+ * used_parallelism >= * N_THREADS
347
+ } )
348
+ . unwrap ( )
349
+ }
350
+
351
+ fn decrement_used_parallelism ( ) {
352
+ let mut used_parallelism = USED_PARALLELISM . lock ( ) . unwrap ( ) ;
353
+ * used_parallelism -= 1 ;
354
+ USED_PARALLELISM_CONDVAR . notify_one ( ) ;
355
+ }
356
+
321
357
fn postprocess_docs ( characteristics : Characteristics , docs : & str ) -> String {
322
358
let Characteristics { indent, kind, .. } = characteristics;
323
359
docs. lines ( )
0 commit comments