Skip to content

Commit beb46c0

Browse files
author
HeroicKatora
authored
Merge pull request #247 from image-rs/worker-choice
Choose worker based on image size
2 parents 09a2c2d + 49139b7 commit beb46c0

File tree

5 files changed

+112
-37
lines changed

5 files changed

+112
-37
lines changed

src/decoder.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::parser::{
88
};
99
use crate::read_u8;
1010
use crate::upsampler::Upsampler;
11-
use crate::worker::{compute_image_parallel, with_worker, PreferWorkerKind, RowData, Worker};
11+
use crate::worker::{compute_image_parallel, PreferWorkerKind, RowData, Worker, WorkerScope};
1212
use alloc::borrow::ToOwned;
1313
use alloc::sync::Arc;
1414
use alloc::vec::Vec;
@@ -196,11 +196,30 @@ impl<R: Read> Decoder<R> {
196196
Some(data)
197197
}
198198

199+
/// Heuristic to avoid starting thread, synchronization if we expect a small amount of
200+
/// parallelism to be utilized.
201+
fn select_worker(frame: &FrameInfo, worker_preference: PreferWorkerKind) -> PreferWorkerKind {
202+
const PARALLELISM_THRESHOLD: u64 = 128 * 128;
203+
204+
match worker_preference {
205+
PreferWorkerKind::Immediate => PreferWorkerKind::Immediate,
206+
PreferWorkerKind::Multithreaded => {
207+
let width: u64 = frame.output_size.width.into();
208+
let height: u64 = frame.output_size.width.into();
209+
if width * height > PARALLELISM_THRESHOLD {
210+
PreferWorkerKind::Multithreaded
211+
} else {
212+
PreferWorkerKind::Immediate
213+
}
214+
},
215+
}
216+
}
217+
199218
/// Tries to read metadata from the image without decoding it.
200219
///
201220
/// If successful, the metadata can be obtained using the `info` method.
202221
pub fn read_info(&mut self) -> Result<()> {
203-
with_worker(PreferWorkerKind::Multithreaded, |worker| {
222+
WorkerScope::with(|worker| {
204223
self.decode_internal(true, worker)
205224
})
206225
.map(|_| ())
@@ -231,15 +250,15 @@ impl<R: Read> Decoder<R> {
231250

232251
/// Decodes the image and returns the decoded pixels if successful.
233252
pub fn decode(&mut self) -> Result<Vec<u8>> {
234-
with_worker(PreferWorkerKind::Multithreaded, |worker| {
253+
WorkerScope::with(|worker| {
235254
self.decode_internal(false, worker)
236255
})
237256
}
238257

239258
fn decode_internal(
240259
&mut self,
241260
stop_after_metadata: bool,
242-
worker: &mut dyn Worker,
261+
worker_scope: &WorkerScope,
243262
) -> Result<Vec<u8>> {
244263
if stop_after_metadata && self.frame.is_some() {
245264
// The metadata has already been read.
@@ -396,7 +415,11 @@ impl<R: Read> Decoder<R> {
396415
}
397416
}
398417

399-
let (marker, data) = self.decode_scan(&frame, &scan, worker, &finished)?;
418+
let preference = Self::select_worker(&frame, PreferWorkerKind::Multithreaded);
419+
420+
let (marker, data) = worker_scope.get_or_init_worker(
421+
preference,
422+
|worker| self.decode_scan(&frame, &scan, worker, &finished))?;
400423

401424
if let Some(data) = data {
402425
for (i, plane) in data
@@ -409,6 +432,7 @@ impl<R: Read> Decoder<R> {
409432
}
410433
}
411434
}
435+
412436
pending_marker = marker;
413437
}
414438

@@ -539,6 +563,27 @@ impl<R: Read> Decoder<R> {
539563
));
540564
}
541565

566+
let frame = self.frame.as_ref().unwrap();
567+
let preference = Self::select_worker(&frame, PreferWorkerKind::Multithreaded);
568+
569+
worker_scope.get_or_init_worker(
570+
preference,
571+
|worker| self.decode_planes(worker, planes, planes_u16)
572+
)
573+
}
574+
575+
fn decode_planes(
576+
&mut self,
577+
worker: &mut dyn Worker,
578+
mut planes: Vec<Vec<u8>>,
579+
planes_u16: Vec<Vec<u16>>,
580+
) -> Result<Vec<u8>> {
581+
if self.frame.is_none() {
582+
return Err(Error::Format(
583+
"end of image encountered before frame".to_owned(),
584+
));
585+
}
586+
542587
let frame = self.frame.as_ref().unwrap();
543588

544589
if {

src/worker/immediate.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,18 @@ pub struct ImmediateWorker {
1616
quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
1717
}
1818

19-
pub fn with_immediate<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
20-
let mut worker = ImmediateWorker::new_immediate();
21-
f(&mut worker)
22-
}
23-
24-
impl ImmediateWorker {
25-
pub fn new_immediate() -> ImmediateWorker {
19+
impl Default for ImmediateWorker {
20+
fn default() -> Self {
2621
ImmediateWorker {
2722
offsets: [0; MAX_COMPONENTS],
2823
results: vec![Vec::new(); MAX_COMPONENTS],
2924
components: vec![None; MAX_COMPONENTS],
3025
quantization_tables: vec![None; MAX_COMPONENTS],
3126
}
3227
}
28+
}
3329

30+
impl ImmediateWorker {
3431
pub fn start_immediate(&mut self, data: RowData) {
3532
assert!(self.results[data.index].is_empty());
3633

src/worker/mod.rs

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use crate::decoder::choose_color_convert_func;
1010
use crate::error::Result;
1111
use crate::parser::{AdobeColorTransform, Component, Dimensions};
1212
use crate::upsampler::Upsampler;
13+
14+
use core::cell::RefCell;
1315
use alloc::sync::Arc;
1416
use alloc::vec::Vec;
1517

@@ -38,18 +40,59 @@ pub enum PreferWorkerKind {
3840
Multithreaded,
3941
}
4042

41-
/// Execute something with a worker system.
42-
pub fn with_worker<T>(prefer: PreferWorkerKind, f: impl FnOnce(&mut dyn Worker) -> T) -> T {
43-
match prefer {
44-
#[cfg(all(
45-
not(any(target_arch = "asmjs", target_arch = "wasm32")),
46-
feature = "rayon"
47-
))]
48-
PreferWorkerKind::Multithreaded => self::rayon::with_rayon(f),
49-
#[allow(unreachable_patterns)]
50-
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
51-
PreferWorkerKind::Multithreaded => self::multithreaded::with_multithreading(f),
52-
_ => self::immediate::with_immediate(f),
43+
#[derive(Default)]
44+
pub struct WorkerScope {
45+
inner: core::cell::RefCell<Option<WorkerScopeInner>>,
46+
}
47+
48+
enum WorkerScopeInner {
49+
#[cfg(all(
50+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
51+
feature = "rayon"
52+
))]
53+
Rayon(rayon::Scoped),
54+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
55+
Multithreaded(multithreaded::MpscWorker),
56+
Immediate(immediate::ImmediateWorker),
57+
}
58+
59+
impl WorkerScope {
60+
pub fn with<T>(with: impl FnOnce(&Self) -> T) -> T {
61+
with(&WorkerScope {
62+
inner: RefCell::default(),
63+
})
64+
}
65+
66+
pub fn get_or_init_worker<T>(
67+
&self,
68+
prefer: PreferWorkerKind,
69+
f: impl FnOnce(&mut dyn Worker) -> T
70+
) -> T {
71+
let mut inner = self.inner.borrow_mut();
72+
let inner = inner.get_or_insert_with(move || {
73+
match prefer {
74+
#[cfg(all(
75+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
76+
feature = "rayon"
77+
))]
78+
PreferWorkerKind::Multithreaded => WorkerScopeInner::Rayon(Default::default()),
79+
#[allow(unreachable_patterns)]
80+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
81+
PreferWorkerKind::Multithreaded => WorkerScopeInner::Multithreaded(Default::default()),
82+
_ => WorkerScopeInner::Immediate(Default::default()),
83+
}
84+
});
85+
86+
f(match &mut *inner {
87+
#[cfg(all(
88+
not(any(target_arch = "asmjs", target_arch = "wasm32")),
89+
feature = "rayon"
90+
))]
91+
WorkerScopeInner::Rayon(worker) => worker,
92+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
93+
WorkerScopeInner::Multithreaded(worker) => worker,
94+
WorkerScopeInner::Immediate(worker) => worker,
95+
})
5396
}
5497
}
5598

src/worker/multithreaded.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@ use std::{
1313
sync::mpsc::{self, Receiver, Sender},
1414
};
1515

16-
#[allow(dead_code)]
17-
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
18-
let mut worker = MpscWorker::default();
19-
f(&mut worker)
20-
}
21-
2216
enum WorkerMsg {
2317
Start(RowData),
2418
AppendRow(Vec<i16>),
@@ -91,7 +85,7 @@ impl Worker for MpscWorker {
9185
fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
9286
let (tx, rx) = mpsc::channel();
9387
let closure = move || {
94-
let mut worker = ImmediateWorker::new_immediate();
88+
let mut worker = ImmediateWorker::default();
9589

9690
while let Ok(message) = rx.recv() {
9791
match message {

src/worker/rayon.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,11 @@ struct ComponentMetadata {
3333
dct_scale: usize,
3434
}
3535

36+
#[derive(Default)]
3637
pub struct Scoped {
3738
inner: ImmediateWorker,
3839
}
3940

40-
pub fn with_rayon<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
41-
let inner = ImmediateWorker::default();
42-
f(&mut Scoped { inner })
43-
}
44-
4541
impl ImmediateWorker {
4642
pub fn start_immediate(&mut self, data: RowData) {
4743
let elements = data.component.block_size.width as usize
@@ -118,7 +114,7 @@ impl ImmediateWorker {
118114
}
119115
}
120116

121-
impl super::Worker for Scoped {
117+
impl Worker for Scoped {
122118
fn start(&mut self, row_data: RowData) -> Result<()> {
123119
self.inner.start_immediate(row_data);
124120
Ok(())

0 commit comments

Comments
 (0)