Skip to content

Commit b877d20

Browse files
committed
Formatting and isolating rayon feature completely
1 parent 5581f7d commit b877d20

File tree

3 files changed

+75
-45
lines changed

3 files changed

+75
-45
lines changed

src/decoder.rs

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
1-
use alloc::borrow::ToOwned;
2-
use alloc::sync::Arc;
3-
use alloc::vec::Vec;
4-
use alloc::{format, vec};
5-
use core::cmp;
6-
use core::mem;
7-
use core::ops::Range;
8-
use std::convert::TryInto;
9-
use std::io::Read;
10-
use crate::read_u8;
111
use crate::error::{Error, Result, UnsupportedFeature};
122
use crate::huffman::{fill_default_mjpeg_tables, HuffmanDecoder, HuffmanTable};
133
use crate::marker::Marker;
@@ -16,8 +6,18 @@ use crate::parser::{
166
AdobeColorTransform, AppData, CodingProcess, Component, Dimensions, EntropyCoding, FrameInfo,
177
IccChunk, ScanInfo,
188
};
9+
use crate::read_u8;
1910
use crate::upsampler::Upsampler;
20-
use crate::worker::{PreferWorkerKind, RowData, Worker, with_worker};
11+
use crate::worker::{with_worker, PreferWorkerKind, RowData, Worker};
12+
use alloc::borrow::ToOwned;
13+
use alloc::sync::Arc;
14+
use alloc::vec::Vec;
15+
use alloc::{format, vec};
16+
use core::cmp;
17+
use core::mem;
18+
use core::ops::Range;
19+
use std::convert::TryInto;
20+
use std::io::Read;
2121

2222
pub const MAX_COMPONENTS: usize = 4;
2323

@@ -202,7 +202,8 @@ impl<R: Read> Decoder<R> {
202202
pub fn read_info(&mut self) -> Result<()> {
203203
with_worker(PreferWorkerKind::Multithreaded, |worker| {
204204
self.decode_internal(true, worker)
205-
}).map(|_| ())
205+
})
206+
.map(|_| ())
206207
}
207208

208209
/// Configure the decoder to scale the image during decoding.
@@ -395,8 +396,7 @@ impl<R: Read> Decoder<R> {
395396
}
396397
}
397398

398-
let (marker, data) =
399-
self.decode_scan(&frame, &scan, worker, &finished)?;
399+
let (marker, data) = self.decode_scan(&frame, &scan, worker, &finished)?;
400400

401401
if let Some(data) = data {
402402
for (i, plane) in data
@@ -542,12 +542,16 @@ impl<R: Read> Decoder<R> {
542542
let frame = self.frame.as_ref().unwrap();
543543

544544
if {
545-
let required_mem = frame.components.len()
545+
let required_mem = frame
546+
.components
547+
.len()
546548
.checked_mul(frame.output_size.width.into())
547549
.and_then(|m| m.checked_mul(frame.output_size.height.into()));
548550
required_mem.map_or(true, |m| self.decoding_buffer_size_limit < m)
549551
} {
550-
return Err(Error::Format("size of decoded image exceeds maximum allowed size".to_owned()));
552+
return Err(Error::Format(
553+
"size of decoded image exceeds maximum allowed size".to_owned(),
554+
));
551555
}
552556

553557
// If we're decoding a progressive jpeg and a component is unfinished, render what we've got
@@ -579,12 +583,12 @@ impl<R: Read> Decoder<R> {
579583
* usize::from(component.vertical_sampling_factor)
580584
* 64;
581585

582-
let mut tasks = (0..frame.mcu_size.height)
583-
.map(|mcu_y| {
584-
let offset = usize::from(mcu_y) * coefficients_per_mcu_row;
585-
let row_coefficients = self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec();
586-
(i, row_coefficients)
587-
});
586+
let mut tasks = (0..frame.mcu_size.height).map(|mcu_y| {
587+
let offset = usize::from(mcu_y) * coefficients_per_mcu_row;
588+
let row_coefficients =
589+
self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec();
590+
(i, row_coefficients)
591+
});
588592

589593
// FIXME: additional potential work stealing opportunities for rayon case if we
590594
// also internally can parallelize over components.
@@ -825,7 +829,9 @@ impl<R: Read> Decoder<R> {
825829
&mut mcu_row_coefficients[i][block_offset..block_offset + 64]
826830
} else {
827831
&mut dummy_block[..64]
828-
}.try_into().unwrap();
832+
}
833+
.try_into()
834+
.unwrap();
829835

830836
if scan.successive_approximation_high == 0 {
831837
decode_block(
@@ -1174,7 +1180,10 @@ fn compute_image(
11741180
}
11751181
}
11761182

1177-
#[cfg(feature = "rayon")]
1183+
#[cfg(all(
1184+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
1185+
feature = "rayon"
1186+
))]
11781187
fn compute_image_parallel(
11791188
components: &[Component],
11801189
data: Vec<Vec<u8>>,
@@ -1206,7 +1215,10 @@ fn compute_image_parallel(
12061215
Ok(image)
12071216
}
12081217

1209-
#[cfg(not(feature = "rayon"))]
1218+
#[cfg(any(
1219+
any(target_arch = "asmjs", target_arch = "wasm32"),
1220+
not(feature = "rayon")
1221+
))]
12101222
fn compute_image_parallel(
12111223
components: &[Component],
12121224
data: Vec<Vec<u8>>,
@@ -1255,7 +1267,7 @@ fn choose_color_convert_func(
12551267
None => {
12561268
// Assume CMYK because no APP14 marker was found
12571269
Ok(color_convert_line_cmyk)
1258-
},
1270+
}
12591271
}
12601272
}
12611273
_ => panic!(),

src/worker/mod.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
mod immediate;
22
mod multithreaded;
3-
#[cfg(feature = "rayon")]
3+
#[cfg(all(
4+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
5+
feature = "rayon"
6+
))]
47
mod rayon;
58

6-
use alloc::sync::Arc;
7-
use alloc::vec::Vec;
89
use crate::error::Result;
910
use crate::parser::Component;
11+
use alloc::sync::Arc;
12+
use alloc::vec::Vec;
1013

1114
pub struct RowData {
1215
pub index: usize,
@@ -19,30 +22,31 @@ pub trait Worker {
1922
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
2023
fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
2124
/// Default implementation for spawning multiple tasks.
22-
fn append_rows(&mut self, row: &mut dyn Iterator<Item=(usize, Vec<i16>)>)
23-
-> Result<()>
24-
{
25+
fn append_rows(&mut self, row: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
2526
for item in row {
2627
self.append_row(item)?;
2728
}
2829
Ok(())
2930
}
3031
}
3132

33+
#[allow(dead_code)]
3234
pub enum PreferWorkerKind {
3335
Immediate,
3436
Multithreaded,
3537
}
3638

37-
3839
/// Execute something with a worker system.
3940
pub fn with_worker<T>(prefer: PreferWorkerKind, f: impl FnOnce(&mut dyn Worker) -> T) -> T {
4041
match prefer {
41-
#[cfg(all(not(any(target_arch = "asmjs", target_arch = "wasm32")), feature = "rayon"))]
42+
#[cfg(all(
43+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
44+
feature = "rayon"
45+
))]
4246
PreferWorkerKind::Multithreaded => self::rayon::with_rayon(f),
47+
#[allow(unreachable_patterns)]
4348
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
4449
PreferWorkerKind::Multithreaded => self::multithreaded::with_multithreading(f),
4550
_ => self::immediate::with_immediate(f),
4651
}
4752
}
48-

src/worker/multithreaded.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1+
#![cfg(all(
2+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
3+
feature = "rayon"
4+
))]
5+
16
//! This module implements per-component parallelism.
27
//! It should be possible to implement per-row parallelism as well,
38
//! which should also boost performance of grayscale images
49
//! and allow scaling to more cores.
510
//! However, that would be more complex, so we use this as a starting point.
611
7-
use std::{mem, sync::mpsc::{self, Receiver, Sender}};
12+
use super::immediate::ImmediateWorker;
13+
use super::{RowData, Worker};
814
use crate::decoder::MAX_COMPONENTS;
915
use crate::error::Result;
10-
use super::{RowData, Worker};
11-
use super::immediate::ImmediateWorker;
16+
use std::{
17+
mem,
18+
sync::mpsc::{self, Receiver, Sender},
19+
};
1220

1321
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
1422
let mut worker = MpscWorker::default();
@@ -23,7 +31,7 @@ enum WorkerMsg {
2331

2432
#[derive(Default)]
2533
pub struct MpscWorker {
26-
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS]
34+
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS],
2735
}
2836

2937
impl MpscWorker {
@@ -43,14 +51,18 @@ impl MpscWorker {
4351
// and in all other message-passing methods because there's not that many rows
4452
// and this should be cheaper than spawning MAX_COMPONENTS many threads up front
4553
let sender = self.senders[component].as_mut().unwrap();
46-
sender.send(WorkerMsg::Start(row_data)).expect("jpeg-decoder worker thread error");
54+
sender
55+
.send(WorkerMsg::Start(row_data))
56+
.expect("jpeg-decoder worker thread error");
4757
Ok(())
4858
}
4959

5060
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
5161
let component = row.0;
5262
let sender = self.senders[component].as_mut().unwrap();
53-
sender.send(WorkerMsg::AppendRow(row.1)).expect("jpeg-decoder worker thread error");
63+
sender
64+
.send(WorkerMsg::AppendRow(row.1))
65+
.expect("jpeg-decoder worker thread error");
5466
Ok(())
5567
}
5668

@@ -61,7 +73,9 @@ impl MpscWorker {
6173
) -> Result<Vec<u8>> {
6274
let (tx, rx) = mpsc::channel();
6375
let sender = mem::take(&mut self.senders[index]).unwrap();
64-
sender.send(WorkerMsg::GetResult(tx)).expect("jpeg-decoder worker thread error");
76+
sender
77+
.send(WorkerMsg::GetResult(tx))
78+
.expect("jpeg-decoder worker thread error");
6579
Ok(collect(rx))
6680
}
6781
}
@@ -91,14 +105,14 @@ fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
91105
// to attempt to access nonexistent components
92106
data.index = 0;
93107
worker.start_immediate(data);
94-
},
108+
}
95109
WorkerMsg::AppendRow(row) => {
96110
worker.append_row_immediate((0, row));
97-
},
111+
}
98112
WorkerMsg::GetResult(chan) => {
99113
let _ = chan.send(worker.get_result_immediate(0));
100114
break;
101-
},
115+
}
102116
}
103117
}
104118
};

0 commit comments

Comments
 (0)