Skip to content

Commit afd3f03

Browse files
committed
Inline ImmediateWorker to rayon
Sharing of the structure is, at the moment, purely incidental. This has a small performance hit on the implementation of append_row (singular) because it now also goes through the mutex phases.
1 parent 6e17452 commit afd3f03

File tree

1 file changed

+89
-22
lines changed

1 file changed

+89
-22
lines changed

src/worker/rayon.rs

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,74 @@
11
use core::convert::TryInto;
2+
3+
use crate::decoder::MAX_COMPONENTS;
24
use crate::error::Result;
35
use crate::idct::dequantize_and_idct_block;
6+
use crate::parser::Component;
47

5-
use std::sync::Mutex;
8+
use std::sync::{Arc, Mutex};
69

710
use super::{RowData, Worker};
8-
use crate::worker::immediate::ImmediateWorker;
11+
12+
/// Technically similar to `immediate::ImmediateWorker` but we copy it since we may prefer
13+
/// different style of managing the memory allocation, something that multiple actors can access in
14+
/// parallel.
15+
#[derive(Default)]
16+
struct ImmediateWorker {
17+
offsets: [usize; MAX_COMPONENTS],
18+
results: [Vec<u8>; MAX_COMPONENTS],
19+
components: [Option<Component>; MAX_COMPONENTS],
20+
quantization_tables: [Option<Arc<[u16; 64]>>; MAX_COMPONENTS],
21+
}
22+
23+
struct ComponentMetadata {
24+
block_count: usize,
25+
line_stride: usize,
26+
dct_scale: usize,
27+
}
928

1029
pub struct Scoped {
1130
inner: Mutex<ImmediateWorker>,
1231
}
1332

1433
pub fn with_rayon<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
1534
rayon::in_place_scope(|_| {
16-
let inner = ImmediateWorker::new_immediate();
17-
f(&mut Scoped { inner: Mutex::new(inner) })
35+
let inner = ImmediateWorker::default();
36+
f(&mut Scoped {
37+
inner: Mutex::new(inner),
38+
})
1839
})
1940
}
2041

21-
impl Scoped {
42+
impl ImmediateWorker {
43+
pub fn start_immediate(&mut self, data: RowData) {
44+
let elements = data.component.block_size.width as usize
45+
* data.component.block_size.height as usize
46+
* data.component.dct_scale
47+
* data.component.dct_scale;
48+
self.offsets[data.index] = 0;
49+
self.results[data.index].resize(elements, 0u8);
50+
self.components[data.index] = Some(data.component);
51+
self.quantization_tables[data.index] = Some(data.quantization_table);
52+
}
53+
54+
pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> {
55+
core::mem::take(&mut self.results[index])
56+
}
57+
58+
pub fn component_metadata(&self, index: usize) -> ComponentMetadata {
59+
let component = self.components[index].as_ref().unwrap();
60+
let block_size = component.block_size;
61+
let block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
62+
let line_stride = block_size.width as usize * component.dct_scale;
63+
let dct_scale = component.dct_scale;
64+
65+
ComponentMetadata {
66+
block_count,
67+
line_stride,
68+
dct_scale,
69+
}
70+
}
71+
2272
pub fn append_row_locked(
2373
mutex: &Mutex<ImmediateWorker>,
2474
(index, data): (usize, Vec<i16>),
@@ -33,13 +83,13 @@ impl Scoped {
3383

3484
{
3585
let inner = mutex.lock().unwrap();
36-
let component = inner.components[index].as_ref().unwrap();
3786
quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
87+
block_size = inner.components[index].as_ref().unwrap().block_size;
88+
let metadata = inner.component_metadata(index);
3889

39-
block_size = component.block_size;
40-
block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
41-
line_stride = block_size.width as usize * component.dct_scale;
42-
dct_scale = component.dct_scale;
90+
block_count = metadata.block_count;
91+
line_stride = metadata.line_stride;
92+
dct_scale = metadata.dct_scale;
4393
}
4494

4595
assert_eq!(data.len(), block_count * 64);
@@ -52,7 +102,13 @@ impl Scoped {
52102
let coefficients: &[i16; 64] = &data[i * 64..(i + 1) * 64].try_into().unwrap();
53103

54104
// Write to a temporary intermediate buffer, a 8x8 'image'.
55-
dequantize_and_idct_block(dct_scale, coefficients, &*quantization_table, 8, &mut output_buffer);
105+
dequantize_and_idct_block(
106+
dct_scale,
107+
coefficients,
108+
&*quantization_table,
109+
8,
110+
&mut output_buffer,
111+
);
56112

57113
// Lock the mutex only for this write back, not the main computation.
58114
// FIXME: we are only copying image data. Can we use some atomic backing buffer and a
@@ -77,7 +133,18 @@ impl super::Worker for Scoped {
77133
}
78134

79135
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
80-
self.inner.get_mut().unwrap().append_row_immediate(row);
136+
let (index, data) = row;
137+
let result_offset;
138+
139+
{
140+
let mut inner = self.inner.get_mut().unwrap();
141+
let metadata = inner.component_metadata(index);
142+
143+
result_offset = inner.offsets[index];
144+
inner.offsets[index] += metadata.bytes_used();
145+
}
146+
147+
ImmediateWorker::append_row_locked(&self.inner, (index, data), result_offset);
81148
Ok(())
82149
}
83150

@@ -87,25 +154,19 @@ impl super::Worker for Scoped {
87154
}
88155

89156
// Magic sauce, these _may_ run in parallel.
90-
fn append_rows(&mut self, iter: &mut dyn Iterator<Item=(usize, Vec<i16>)>)
91-
-> Result<()>
92-
{
157+
fn append_rows(&mut self, iter: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
93158
rayon::in_place_scope(|scope| {
94159
let mut inner = self.inner.lock().unwrap();
95160
// First we schedule everything, making sure their index is right etc.
96161
for (index, data) in iter {
97-
let component = inner.components[index].as_ref().unwrap();
98-
99-
let block_size = component.block_size;
100-
let block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
101-
let dct_scale = component.dct_scale;
162+
let metadata = inner.component_metadata(index);
102163

103164
let result_offset = inner.offsets[index];
104-
inner.offsets[index] += block_count * dct_scale * dct_scale;
165+
inner.offsets[index] += metadata.bytes_used();
105166

106167
let mutex = &self.inner;
107168
scope.spawn(move |_| {
108-
Scoped::append_row_locked(mutex, (index, data), result_offset)
169+
ImmediateWorker::append_row_locked(mutex, (index, data), result_offset)
109170
});
110171
}
111172

@@ -115,3 +176,9 @@ impl super::Worker for Scoped {
115176
Ok(())
116177
}
117178
}
179+
180+
impl ComponentMetadata {
181+
fn bytes_used(&self) -> usize {
182+
self.block_count * self.dct_scale * self.dct_scale
183+
}
184+
}

0 commit comments

Comments
 (0)