Skip to content

Commit aa376fa

Browse files
author
HeroicKatora
authored
Merge pull request #240 from gents83/master
Formatting and isolating rayon feature completely
2 parents 5581f7d + 7d843f1 commit aa376fa

File tree

4 files changed

+138
-105
lines changed

4 files changed

+138
-105
lines changed

src/decoder.rs

Lines changed: 31 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
1-
use alloc::borrow::ToOwned;
2-
use alloc::sync::Arc;
3-
use alloc::vec::Vec;
4-
use alloc::{format, vec};
5-
use core::cmp;
6-
use core::mem;
7-
use core::ops::Range;
8-
use std::convert::TryInto;
9-
use std::io::Read;
10-
use crate::read_u8;
111
use crate::error::{Error, Result, UnsupportedFeature};
122
use crate::huffman::{fill_default_mjpeg_tables, HuffmanDecoder, HuffmanTable};
133
use crate::marker::Marker;
@@ -16,8 +6,18 @@ use crate::parser::{
166
AdobeColorTransform, AppData, CodingProcess, Component, Dimensions, EntropyCoding, FrameInfo,
177
IccChunk, ScanInfo,
188
};
9+
use crate::read_u8;
1910
use crate::upsampler::Upsampler;
20-
use crate::worker::{PreferWorkerKind, RowData, Worker, with_worker};
11+
use crate::worker::{compute_image_parallel, with_worker, PreferWorkerKind, RowData, Worker};
12+
use alloc::borrow::ToOwned;
13+
use alloc::sync::Arc;
14+
use alloc::vec::Vec;
15+
use alloc::{format, vec};
16+
use core::cmp;
17+
use core::mem;
18+
use core::ops::Range;
19+
use std::convert::TryInto;
20+
use std::io::Read;
2121

2222
pub const MAX_COMPONENTS: usize = 4;
2323

@@ -202,7 +202,8 @@ impl<R: Read> Decoder<R> {
202202
pub fn read_info(&mut self) -> Result<()> {
203203
with_worker(PreferWorkerKind::Multithreaded, |worker| {
204204
self.decode_internal(true, worker)
205-
}).map(|_| ())
205+
})
206+
.map(|_| ())
206207
}
207208

208209
/// Configure the decoder to scale the image during decoding.
@@ -395,8 +396,7 @@ impl<R: Read> Decoder<R> {
395396
}
396397
}
397398

398-
let (marker, data) =
399-
self.decode_scan(&frame, &scan, worker, &finished)?;
399+
let (marker, data) = self.decode_scan(&frame, &scan, worker, &finished)?;
400400

401401
if let Some(data) = data {
402402
for (i, plane) in data
@@ -542,12 +542,16 @@ impl<R: Read> Decoder<R> {
542542
let frame = self.frame.as_ref().unwrap();
543543

544544
if {
545-
let required_mem = frame.components.len()
545+
let required_mem = frame
546+
.components
547+
.len()
546548
.checked_mul(frame.output_size.width.into())
547549
.and_then(|m| m.checked_mul(frame.output_size.height.into()));
548550
required_mem.map_or(true, |m| self.decoding_buffer_size_limit < m)
549551
} {
550-
return Err(Error::Format("size of decoded image exceeds maximum allowed size".to_owned()));
552+
return Err(Error::Format(
553+
"size of decoded image exceeds maximum allowed size".to_owned(),
554+
));
551555
}
552556

553557
// If we're decoding a progressive jpeg and a component is unfinished, render what we've got
@@ -579,12 +583,12 @@ impl<R: Read> Decoder<R> {
579583
* usize::from(component.vertical_sampling_factor)
580584
* 64;
581585

582-
let mut tasks = (0..frame.mcu_size.height)
583-
.map(|mcu_y| {
584-
let offset = usize::from(mcu_y) * coefficients_per_mcu_row;
585-
let row_coefficients = self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec();
586-
(i, row_coefficients)
587-
});
586+
let mut tasks = (0..frame.mcu_size.height).map(|mcu_y| {
587+
let offset = usize::from(mcu_y) * coefficients_per_mcu_row;
588+
let row_coefficients =
589+
self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec();
590+
(i, row_coefficients)
591+
});
588592

589593
// FIXME: additional potential work stealing opportunities for rayon case if we
590594
// also internally can parallelize over components.
@@ -825,7 +829,9 @@ impl<R: Read> Decoder<R> {
825829
&mut mcu_row_coefficients[i][block_offset..block_offset + 64]
826830
} else {
827831
&mut dummy_block[..64]
828-
}.try_into().unwrap();
832+
}
833+
.try_into()
834+
.unwrap();
829835

830836
if scan.successive_approximation_high == 0 {
831837
decode_block(
@@ -1174,65 +1180,7 @@ fn compute_image(
11741180
}
11751181
}
11761182

1177-
#[cfg(feature = "rayon")]
1178-
fn compute_image_parallel(
1179-
components: &[Component],
1180-
data: Vec<Vec<u8>>,
1181-
output_size: Dimensions,
1182-
is_jfif: bool,
1183-
color_transform: Option<AdobeColorTransform>,
1184-
) -> Result<Vec<u8>> {
1185-
use rayon::prelude::*;
1186-
1187-
let color_convert_func = choose_color_convert_func(components.len(), is_jfif, color_transform)?;
1188-
let upsampler = Upsampler::new(components, output_size.width, output_size.height)?;
1189-
let line_size = output_size.width as usize * components.len();
1190-
let mut image = vec![0u8; line_size * output_size.height as usize];
1191-
1192-
image
1193-
.par_chunks_mut(line_size)
1194-
.with_max_len(1)
1195-
.enumerate()
1196-
.for_each(|(row, line)| {
1197-
upsampler.upsample_and_interleave_row(
1198-
&data,
1199-
row,
1200-
output_size.width as usize,
1201-
line,
1202-
color_convert_func,
1203-
);
1204-
});
1205-
1206-
Ok(image)
1207-
}
1208-
1209-
#[cfg(not(feature = "rayon"))]
1210-
fn compute_image_parallel(
1211-
components: &[Component],
1212-
data: Vec<Vec<u8>>,
1213-
output_size: Dimensions,
1214-
is_jfif: bool,
1215-
color_transform: Option<AdobeColorTransform>,
1216-
) -> Result<Vec<u8>> {
1217-
let color_convert_func = choose_color_convert_func(components.len(), is_jfif, color_transform)?;
1218-
let upsampler = Upsampler::new(components, output_size.width, output_size.height)?;
1219-
let line_size = output_size.width as usize * components.len();
1220-
let mut image = vec![0u8; line_size * output_size.height as usize];
1221-
1222-
for (row, line) in image.chunks_mut(line_size).enumerate() {
1223-
upsampler.upsample_and_interleave_row(
1224-
&data,
1225-
row,
1226-
output_size.width as usize,
1227-
line,
1228-
color_convert_func,
1229-
);
1230-
}
1231-
1232-
Ok(image)
1233-
}
1234-
1235-
fn choose_color_convert_func(
1183+
pub(crate) fn choose_color_convert_func(
12361184
component_count: usize,
12371185
_is_jfif: bool,
12381186
color_transform: Option<AdobeColorTransform>,
@@ -1255,7 +1203,7 @@ fn choose_color_convert_func(
12551203
None => {
12561204
// Assume CMYK because no APP14 marker was found
12571205
Ok(color_convert_line_cmyk)
1258-
},
1206+
}
12591207
}
12601208
}
12611209
_ => panic!(),

src/worker/mod.rs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
mod immediate;
22
mod multithreaded;
3-
#[cfg(feature = "rayon")]
3+
#[cfg(all(
4+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
5+
feature = "rayon"
6+
))]
47
mod rayon;
58

9+
use crate::decoder::choose_color_convert_func;
10+
use crate::error::Result;
11+
use crate::parser::{AdobeColorTransform, Component, Dimensions};
12+
use crate::upsampler::Upsampler;
613
use alloc::sync::Arc;
714
use alloc::vec::Vec;
8-
use crate::error::Result;
9-
use crate::parser::Component;
1015

1116
pub struct RowData {
1217
pub index: usize,
@@ -19,30 +24,66 @@ pub trait Worker {
1924
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
2025
fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
2126
/// Default implementation for spawning multiple tasks.
22-
fn append_rows(&mut self, row: &mut dyn Iterator<Item=(usize, Vec<i16>)>)
23-
-> Result<()>
24-
{
27+
fn append_rows(&mut self, row: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
2528
for item in row {
2629
self.append_row(item)?;
2730
}
2831
Ok(())
2932
}
3033
}
3134

35+
#[allow(dead_code)]
3236
pub enum PreferWorkerKind {
3337
Immediate,
3438
Multithreaded,
3539
}
3640

37-
3841
/// Execute something with a worker system.
3942
pub fn with_worker<T>(prefer: PreferWorkerKind, f: impl FnOnce(&mut dyn Worker) -> T) -> T {
4043
match prefer {
41-
#[cfg(all(not(any(target_arch = "asmjs", target_arch = "wasm32")), feature = "rayon"))]
44+
#[cfg(all(
45+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
46+
feature = "rayon"
47+
))]
4248
PreferWorkerKind::Multithreaded => self::rayon::with_rayon(f),
49+
#[allow(unreachable_patterns)]
4350
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
4451
PreferWorkerKind::Multithreaded => self::multithreaded::with_multithreading(f),
4552
_ => self::immediate::with_immediate(f),
4653
}
4754
}
4855

56+
pub fn compute_image_parallel(
57+
components: &[Component],
58+
data: Vec<Vec<u8>>,
59+
output_size: Dimensions,
60+
is_jfif: bool,
61+
color_transform: Option<AdobeColorTransform>,
62+
) -> Result<Vec<u8>> {
63+
#[cfg(all(
64+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
65+
feature = "rayon"
66+
))]
67+
return rayon::compute_image_parallel(components, data, output_size, is_jfif, color_transform);
68+
69+
#[allow(unreachable_code)]
70+
{
71+
let color_convert_func =
72+
choose_color_convert_func(components.len(), is_jfif, color_transform)?;
73+
let upsampler = Upsampler::new(components, output_size.width, output_size.height)?;
74+
let line_size = output_size.width as usize * components.len();
75+
let mut image = vec![0u8; line_size * output_size.height as usize];
76+
77+
for (row, line) in image.chunks_mut(line_size).enumerate() {
78+
upsampler.upsample_and_interleave_row(
79+
&data,
80+
row,
81+
output_size.width as usize,
82+
line,
83+
color_convert_func,
84+
);
85+
}
86+
87+
Ok(image)
88+
}
89+
}

src/worker/multithreaded.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
//! and allow scaling to more cores.
55
//! However, that would be more complex, so we use this as a starting point.
66
7-
use std::{mem, sync::mpsc::{self, Receiver, Sender}};
7+
use super::immediate::ImmediateWorker;
8+
use super::{RowData, Worker};
89
use crate::decoder::MAX_COMPONENTS;
910
use crate::error::Result;
10-
use super::{RowData, Worker};
11-
use super::immediate::ImmediateWorker;
11+
use std::{
12+
mem,
13+
sync::mpsc::{self, Receiver, Sender},
14+
};
1215

16+
#[allow(dead_code)]
1317
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
1418
let mut worker = MpscWorker::default();
1519
f(&mut worker)
@@ -23,7 +27,7 @@ enum WorkerMsg {
2327

2428
#[derive(Default)]
2529
pub struct MpscWorker {
26-
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS]
30+
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS],
2731
}
2832

2933
impl MpscWorker {
@@ -43,14 +47,18 @@ impl MpscWorker {
4347
// and in all other message-passing methods because there's not that many rows
4448
// and this should be cheaper than spawning MAX_COMPONENTS many threads up front
4549
let sender = self.senders[component].as_mut().unwrap();
46-
sender.send(WorkerMsg::Start(row_data)).expect("jpeg-decoder worker thread error");
50+
sender
51+
.send(WorkerMsg::Start(row_data))
52+
.expect("jpeg-decoder worker thread error");
4753
Ok(())
4854
}
4955

5056
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
5157
let component = row.0;
5258
let sender = self.senders[component].as_mut().unwrap();
53-
sender.send(WorkerMsg::AppendRow(row.1)).expect("jpeg-decoder worker thread error");
59+
sender
60+
.send(WorkerMsg::AppendRow(row.1))
61+
.expect("jpeg-decoder worker thread error");
5462
Ok(())
5563
}
5664

@@ -61,7 +69,9 @@ impl MpscWorker {
6169
) -> Result<Vec<u8>> {
6270
let (tx, rx) = mpsc::channel();
6371
let sender = mem::take(&mut self.senders[index]).unwrap();
64-
sender.send(WorkerMsg::GetResult(tx)).expect("jpeg-decoder worker thread error");
72+
sender
73+
.send(WorkerMsg::GetResult(tx))
74+
.expect("jpeg-decoder worker thread error");
6575
Ok(collect(rx))
6676
}
6777
}
@@ -91,14 +101,14 @@ fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
91101
// to attempt to access nonexistent components
92102
data.index = 0;
93103
worker.start_immediate(data);
94-
},
104+
}
95105
WorkerMsg::AppendRow(row) => {
96106
worker.append_row_immediate((0, row));
97-
},
107+
}
98108
WorkerMsg::GetResult(chan) => {
99109
let _ = chan.send(worker.get_result_immediate(0));
100110
break;
101-
},
111+
}
102112
}
103113
}
104114
};

0 commit comments

Comments
 (0)