Skip to content

Commit 59c3f86

Browse files
committed
Replace rayon locking with lifetime logic
Through very careful use of borrows, where we take the right operations in the right sequence, we can use the borrow checker to assert that all tasks write to disjunct result slices. This avoids the need of runtime borrow checking—which required synchronization.
1 parent 7be07d1 commit 59c3f86

File tree

1 file changed

+63
-21
lines changed

1 file changed

+63
-21
lines changed

src/worker/rayon.rs

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ struct ImmediateWorker {
2020
quantization_tables: [Option<Arc<[u16; 64]>>; MAX_COMPONENTS],
2121
}
2222

23+
#[derive(Clone, Copy)]
2324
struct ComponentMetadata {
25+
block_width: usize,
2426
block_count: usize,
2527
line_stride: usize,
2628
dct_scale: usize,
@@ -53,38 +55,39 @@ impl ImmediateWorker {
5355
core::mem::take(&mut self.results[index])
5456
}
5557

56-
pub fn component_metadata(&self, index: usize) -> ComponentMetadata {
57-
let component = self.components[index].as_ref().unwrap();
58+
pub fn component_metadata(&self, index: usize) -> Option<ComponentMetadata> {
59+
let component = self.components[index].as_ref()?;
5860
let block_size = component.block_size;
61+
let block_width = block_size.width as usize;
5962
let block_count = block_size.width as usize * component.vertical_sampling_factor as usize;
6063
let line_stride = block_size.width as usize * component.dct_scale;
6164
let dct_scale = component.dct_scale;
6265

63-
ComponentMetadata {
66+
Some(ComponentMetadata {
67+
block_width,
6468
block_count,
6569
line_stride,
6670
dct_scale,
67-
}
71+
})
6872
}
6973

7074
pub fn append_row_locked(
71-
mutex: &Mutex<ImmediateWorker>,
75+
quantization_table: Arc<[u16; 64]>,
76+
metadata: ComponentMetadata,
7277
(index, data): (usize, Vec<i16>),
7378
result_offset: usize,
79+
result_block: &mut [u8],
7480
) {
7581
// Convert coefficients from a MCU row to samples.
76-
let quantization_table;
7782
let block_count;
7883
let line_stride;
79-
let block_size;
84+
let block_width;
8085
let dct_scale;
8186

8287
{
83-
let inner = mutex.lock().unwrap();
84-
quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
85-
block_size = inner.components[index].as_ref().unwrap().block_size;
86-
let metadata = inner.component_metadata(index);
88+
let metadata = metadata;
8789

90+
block_width = metadata.block_width;
8891
block_count = metadata.block_count;
8992
line_stride = metadata.line_stride;
9093
dct_scale = metadata.dct_scale;
@@ -94,8 +97,8 @@ impl ImmediateWorker {
9497

9598
let mut output_buffer = [0; 64];
9699
for i in 0..block_count {
97-
let x = (i % block_size.width as usize) * dct_scale;
98-
let y = (i / block_size.width as usize) * dct_scale;
100+
let x = (i % block_width) * dct_scale;
101+
let y = (i / block_width) * dct_scale;
99102

100103
let coefficients: &[i16; 64] = &data[i * 64..(i + 1) * 64].try_into().unwrap();
101104

@@ -111,8 +114,7 @@ impl ImmediateWorker {
111114
// Lock the mutex only for this write back, not the main computation.
112115
// FIXME: we are only copying image data. Can we use some atomic backing buffer and a
113116
// `Relaxed` write instead?
114-
let mut write_back = mutex.lock().unwrap();
115-
let write_back = &mut write_back.results[index][result_offset + y * line_stride + x..];
117+
let write_back = &mut result_block[y * line_stride + x..];
116118

117119
let buffered_lines = output_buffer.chunks_mut(8);
118120
let back_lines = write_back.chunks_mut(line_stride);
@@ -131,18 +133,29 @@ impl super::Worker for Scoped {
131133
}
132134

133135
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
136+
let quantization_table;
137+
let metadata;
134138
let (index, data) = row;
135139
let result_offset;
140+
let result_block;
136141

137142
{
138143
let mut inner = self.inner.get_mut().unwrap();
139-
let metadata = inner.component_metadata(index);
144+
quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
145+
metadata = inner.component_metadata(index).unwrap();
140146

141147
result_offset = inner.offsets[index];
148+
result_block = &mut inner.results[index][inner.offsets[index]..];
142149
inner.offsets[index] += metadata.bytes_used();
143150
}
144151

145-
ImmediateWorker::append_row_locked(&self.inner, (index, data), result_offset);
152+
ImmediateWorker::append_row_locked(
153+
quantization_table,
154+
metadata,
155+
(index, data),
156+
result_offset,
157+
result_block,
158+
);
146159
Ok(())
147160
}
148161

@@ -153,18 +166,47 @@ impl super::Worker for Scoped {
153166

154167
// Magic sauce, these _may_ run in parallel.
155168
fn append_rows(&mut self, iter: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
169+
let inner = self.inner.get_mut().unwrap();
156170
rayon::in_place_scope(|scope| {
157-
let mut inner = self.inner.lock().unwrap();
171+
let metadatas = [
172+
inner.component_metadata(0),
173+
inner.component_metadata(1),
174+
inner.component_metadata(2),
175+
inner.component_metadata(3),
176+
];
177+
178+
let inner = &mut *inner;
179+
let [res0, res1, res2, res3] = &mut inner.results;
180+
181+
// Lazily get the blocks. Note: if we've already collected results from a component
182+
// then the result vector has already been deallocated/taken. But no more tasks should
183+
// be created for it.
184+
let mut result_blocks = [
185+
res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []),
186+
res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []),
187+
res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []),
188+
res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []),
189+
];
190+
158191
// First we schedule everything, making sure their index is right etc.
159192
for (index, data) in iter {
160-
let metadata = inner.component_metadata(index);
193+
let metadata = metadatas[index].unwrap();
194+
let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
161195

162196
let result_offset = inner.offsets[index];
197+
let (result_block, tail) = core::mem::replace(&mut result_blocks[index], &mut [])
198+
.split_at_mut(metadata.bytes_used());
199+
result_blocks[index] = tail;
163200
inner.offsets[index] += metadata.bytes_used();
164201

165-
let mutex = &self.inner;
166202
scope.spawn(move |_| {
167-
ImmediateWorker::append_row_locked(mutex, (index, data), result_offset)
203+
ImmediateWorker::append_row_locked(
204+
quantization_table,
205+
metadata,
206+
(index, data),
207+
result_offset,
208+
result_block,
209+
)
168210
});
169211
}
170212

0 commit comments

Comments
 (0)