Skip to content

Commit ef1e8d1

Browse files
committed
Make rayon independent of worker threading
Moves it to a seperate module. Rayon now spawn in-place closures with no explicit communication instead for workers. This can parallelize by spawning multiple rows of data at the same time.
1 parent 50766ba commit ef1e8d1

File tree

5 files changed

+147
-84
lines changed

5 files changed

+147
-84
lines changed

src/decoder.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,14 +560,17 @@ impl<R: Read> Decoder<R> {
560560
let coefficients_per_mcu_row = usize::from(component.block_size.width)
561561
* usize::from(component.vertical_sampling_factor)
562562
* 64;
563-
for mcu_y in 0..frame.mcu_size.height {
564-
let row_coefficients = {
563+
564+
let mut tasks = (0..frame.mcu_size.height)
565+
.map(|mcu_y| {
565566
let offset = usize::from(mcu_y) * coefficients_per_mcu_row;
566-
self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec()
567-
};
567+
let row_coefficients = self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec();
568+
(i, row_coefficients)
569+
});
568570

569-
worker.append_row((i, row_coefficients))?;
570-
}
571+
// FIXME: additional potential work stealing opportunities for rayon case if we
572+
// also internally can parallelize over components.
573+
worker.append_rows(&mut tasks)?;
571574
planes[i] = worker.get_result(i)?;
572575
}
573576
}
@@ -871,6 +874,8 @@ impl<R: Read> Decoder<R> {
871874
)
872875
};
873876

877+
// FIXME: additional potential work stealing opportunities for rayon case if we
878+
// also internally can parallelize over components.
874879
worker.append_row((i, row_coefficients))?;
875880
}
876881
}

src/worker/immediate.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use crate::parser::Component;
1010
use super::{RowData, Worker};
1111

1212
pub struct ImmediateWorker {
13-
offsets: [usize; MAX_COMPONENTS],
14-
results: Vec<Vec<u8>>,
15-
components: Vec<Option<Component>>,
16-
quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
13+
pub(crate) offsets: [usize; MAX_COMPONENTS],
14+
pub(crate) results: Vec<Vec<u8>>,
15+
pub(crate) components: Vec<Option<Component>>,
16+
pub(crate) quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
1717
}
1818

1919
pub fn with_immediate<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {

src/worker/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod immediate;
22
mod multithreaded;
3+
#[cfg(feature = "rayon")]
4+
mod rayon;
35

46
use alloc::sync::Arc;
57
use alloc::vec::Vec;
@@ -16,6 +18,15 @@ pub trait Worker {
1618
fn start(&mut self, row_data: RowData) -> Result<()>;
1719
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
1820
fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
21+
/// Default implementation for spawning multiple tasks.
22+
fn append_rows(&mut self, row: &mut dyn Iterator<Item=(usize, Vec<i16>)>)
23+
-> Result<()>
24+
{
25+
for item in row {
26+
self.append_row(item)?;
27+
}
28+
Ok(())
29+
}
1930
}
2031

2132
pub enum PreferWorkerKind {
@@ -26,6 +37,9 @@ pub enum PreferWorkerKind {
2637
/// Execute something with a worker system.
2738
pub fn with_worker<T>(prefer: PreferWorkerKind, f: impl FnOnce(&mut dyn Worker) -> T) -> T {
2839
match prefer {
40+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
41+
#[cfg(feature = "rayon")]
42+
PreferWorkerKind::Multithreaded => self::rayon::with_rayon(f),
2943
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
3044
PreferWorkerKind::Multithreaded => self::multithreaded::with_multithreading(f),
3145
_ => self::immediate::with_immediate(f),

src/worker/multithreaded.rs

Lines changed: 1 addition & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,7 @@ use super::{RowData, Worker};
1111
use super::immediate::ImmediateWorker;
1212

1313
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
14-
#[cfg(not(feature = "rayon"))]
15-
return self::enter_threads(f);
16-
17-
#[cfg(feature = "rayon")]
18-
return jpeg_rayon::enter(|mut worker| {
19-
f(&mut worker)
20-
});
14+
self::enter_threads(f)
2115
}
2216

2317
enum WorkerMsg {
@@ -133,70 +127,3 @@ fn enter_threads<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
133127
let mut worker = StdThreadWorker(MpscWorker::default());
134128
f(&mut worker)
135129
}
136-
137-
138-
#[cfg(feature = "rayon")]
139-
mod jpeg_rayon {
140-
use crate::error::Result;
141-
use super::{MpscWorker, RowData};
142-
143-
pub struct Scoped<'r, 'scope> {
144-
fifo: &'r rayon::ScopeFifo<'scope>,
145-
inner: MpscWorker,
146-
}
147-
148-
pub fn enter<T>(f: impl FnOnce(Scoped) -> T) -> T {
149-
// Note: Must be at least two threads. Otherwise, we may deadlock, due to ordering
150-
// constraints that we can not impose properly. Note that `append_row` creates a new task
151-
// while in `get_result` we wait for all tasks of a component. The only way for rayon to
152-
// impose this wait __and get a result__ is by ending an in_place_scope.
153-
//
154-
// However, the ordering of tasks is not as FIFO as the name would suggest. Indeed, even
155-
// though tasks are spawned in `start` _before_ the task spawned in `get_result`, the
156-
// `in_place_scope_fifo` will wait for ITS OWN results in fifo order. This implies, unless
157-
// there is some other thread capable of stealing the worker the work task will in fact not
158-
// get executed and the result will wait forever. It is impossible to otherwise schedule
159-
// the worker tasks specifically (e.g. join handle would be cool *cough* if you read this
160-
// and work on rayon) before while yielding from the current thread.
161-
//
162-
// So: we need at least one more worker thread that is _not_ occupied.
163-
let threads = rayon::ThreadPoolBuilder::new().num_threads(4).build().unwrap();
164-
165-
threads.in_place_scope_fifo(|fifo| {
166-
f(Scoped { fifo, inner: MpscWorker::default() })
167-
})
168-
}
169-
170-
impl super::Worker for Scoped<'_, '_> {
171-
fn start(&mut self, row_data: RowData) -> Result<()> {
172-
let fifo = &mut self.fifo;
173-
self.inner.start_with(row_data, |_| {
174-
let (tx, worker) = super::create_worker();
175-
fifo.spawn_fifo(move |_| {
176-
worker()
177-
});
178-
Ok(tx)
179-
})
180-
}
181-
182-
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
183-
self.inner.append_row(row)
184-
}
185-
186-
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
187-
self.inner.get_result_with(index, |rx| {
188-
let mut result = vec![];
189-
let deliver_result = &mut result;
190-
191-
rayon::in_place_scope_fifo(|scope| {
192-
scope.spawn_fifo(move |_| {
193-
*deliver_result = rx.recv().expect("jpeg-decoder worker thread error");
194-
});
195-
});
196-
197-
result
198-
})
199-
}
200-
}
201-
}
202-

src/worker/rayon.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
use core::convert::TryInto;
2+
use crate::error::Result;
3+
use crate::idct::dequantize_and_idct_block;
4+
5+
use std::sync::Mutex;
6+
7+
use super::{RowData, Worker};
8+
use crate::worker::immediate::ImmediateWorker;
9+
10+
pub struct Scoped {
11+
inner: Mutex<ImmediateWorker>,
12+
}
13+
14+
pub fn with_rayon<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
15+
rayon::in_place_scope(|_| {
16+
let inner = ImmediateWorker::new_immediate();
17+
f(&mut Scoped { inner: Mutex::new(inner) })
18+
})
19+
}
20+
21+
impl Scoped {
22+
pub fn append_row_locked(
23+
mutex: &Mutex<ImmediateWorker>,
24+
(index, data): (usize, Vec<i16>),
25+
result_offset: usize,
26+
) {
27+
// Convert coefficients from a MCU row to samples.
28+
let quantization_table;
29+
let block_count;
30+
let line_stride;
31+
let block_size;
32+
let dct_scale;
33+
34+
{
35+
let inner = mutex.lock().unwrap();
36+
let component = inner.components[index].as_ref().unwrap();
37+
quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
38+
39+
block_size = component.block_size;
40+
block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
41+
line_stride = block_size.width as usize * component.dct_scale;
42+
dct_scale = component.dct_scale;
43+
}
44+
45+
assert_eq!(data.len(), block_count * 64);
46+
47+
let mut output_buffer = [0; 64];
48+
for i in 0..block_count {
49+
let x = (i % block_size.width as usize) * dct_scale;
50+
let y = (i / block_size.width as usize) * dct_scale;
51+
52+
let coefficients: &[i16; 64] = &data[i * 64..(i + 1) * 64].try_into().unwrap();
53+
54+
// Write to a temporary intermediate buffer, a 8x8 'image'.
55+
dequantize_and_idct_block(dct_scale, coefficients, &*quantization_table, 8, &mut output_buffer);
56+
57+
// Lock the mutex only for this write back, not the main computation.
58+
// FIXME: we are only copying image data. Can we use some atomic backing buffer and a
59+
// `Relaxed` write instead?
60+
let mut write_back = mutex.lock().unwrap();
61+
let write_back = &mut write_back.results[index][result_offset + y * line_stride + x..];
62+
63+
let buffered_lines = output_buffer.chunks_mut(8);
64+
let back_lines = write_back.chunks_mut(line_stride);
65+
66+
for (buf, back) in buffered_lines.zip(back_lines).take(dct_scale) {
67+
back[..dct_scale].copy_from_slice(&buf[..dct_scale]);
68+
}
69+
}
70+
}
71+
}
72+
73+
impl super::Worker for Scoped {
74+
fn start(&mut self, row_data: RowData) -> Result<()> {
75+
self.inner.get_mut().unwrap().start_immediate(row_data);
76+
Ok(())
77+
}
78+
79+
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
80+
self.inner.get_mut().unwrap().append_row_immediate(row);
81+
Ok(())
82+
}
83+
84+
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
85+
let result = self.inner.get_mut().unwrap().get_result_immediate(index);
86+
Ok(result)
87+
}
88+
89+
// Magic sauce, these _may_ run in parallel.
90+
fn append_rows(&mut self, iter: &mut dyn Iterator<Item=(usize, Vec<i16>)>)
91+
-> Result<()>
92+
{
93+
rayon::in_place_scope(|scope| {
94+
let mut inner = self.inner.lock().unwrap();
95+
// First we schedule everything, making sure their index is right etc.
96+
for (index, data) in iter {
97+
let component = inner.components[index].as_ref().unwrap();
98+
99+
let block_size = component.block_size;
100+
let block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
101+
let dct_scale = component.dct_scale;
102+
103+
let result_offset = inner.offsets[index];
104+
inner.offsets[index] += block_count * dct_scale * dct_scale;
105+
106+
let mutex = &self.inner;
107+
scope.spawn(move |_| {
108+
Scoped::append_row_locked(mutex, (index, data), result_offset)
109+
});
110+
}
111+
112+
// Then the mutex is released, allowing all tasks to run.
113+
});
114+
115+
Ok(())
116+
}
117+
}

0 commit comments

Comments
 (0)