Skip to content

Commit ac9f2e2

Browse files
committed
Cleanup: review all worker modules
1 parent 321c042 commit ac9f2e2

File tree

3 files changed

+15
-31
lines changed

3 files changed

+15
-31
lines changed

src/worker/immediate.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use crate::parser::Component;
1010
use super::{RowData, Worker};
1111

1212
pub struct ImmediateWorker {
13-
pub(crate) offsets: [usize; MAX_COMPONENTS],
14-
pub(crate) results: Vec<Vec<u8>>,
15-
pub(crate) components: Vec<Option<Component>>,
16-
pub(crate) quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
13+
offsets: [usize; MAX_COMPONENTS],
14+
results: Vec<Vec<u8>>,
15+
components: Vec<Option<Component>>,
16+
quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
1717
}
1818

1919
pub fn with_immediate<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {

src/worker/multithreaded.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use super::{RowData, Worker};
1111
use super::immediate::ImmediateWorker;
1212

1313
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
14-
self::enter_threads(f)
14+
let mut worker = MpscWorker::default();
15+
f(&mut worker)
1516
}
1617

1718
enum WorkerMsg {
@@ -25,8 +26,6 @@ pub struct MpscWorker {
2526
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS]
2627
}
2728

28-
pub struct StdThreadWorker(MpscWorker);
29-
3029
impl MpscWorker {
3130
fn start_with(
3231
&mut self,
@@ -43,17 +42,15 @@ impl MpscWorker {
4342
// we do the "take out value and put it back in once we're done" dance here
4443
// and in all other message-passing methods because there's not that many rows
4544
// and this should be cheaper than spawning MAX_COMPONENTS many threads up front
46-
let sender = mem::replace(&mut self.senders[component], None).unwrap();
45+
let sender = self.senders[component].as_mut().unwrap();
4746
sender.send(WorkerMsg::Start(row_data)).expect("jpeg-decoder worker thread error");
48-
self.senders[component] = Some(sender);
4947
Ok(())
5048
}
5149

5250
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
5351
let component = row.0;
54-
let sender = mem::replace(&mut self.senders[component], None).unwrap();
52+
let sender = self.senders[component].as_mut().unwrap();
5553
sender.send(WorkerMsg::AppendRow(row.1)).expect("jpeg-decoder worker thread error");
56-
self.senders[component] = Some(sender);
5754
Ok(())
5855
}
5956

@@ -63,21 +60,21 @@ impl MpscWorker {
6360
collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>,
6461
) -> Result<Vec<u8>> {
6562
let (tx, rx) = mpsc::channel();
66-
let sender = mem::replace(&mut self.senders[index], None).unwrap();
63+
let sender = mem::take(&mut self.senders[index]).unwrap();
6764
sender.send(WorkerMsg::GetResult(tx)).expect("jpeg-decoder worker thread error");
6865
Ok(collect(rx))
6966
}
7067
}
7168

72-
impl Worker for StdThreadWorker {
69+
impl Worker for MpscWorker {
7370
fn start(&mut self, row_data: RowData) -> Result<()> {
74-
self.0.start_with(row_data, spawn_worker_thread)
71+
self.start_with(row_data, spawn_worker_thread)
7572
}
7673
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
77-
self.0.append_row(row)
74+
MpscWorker::append_row(self, row)
7875
}
7976
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
80-
self.0.get_result_with(index, collect_worker_thread)
77+
self.get_result_with(index, collect_worker_thread)
8178
}
8279
}
8380

@@ -117,13 +114,6 @@ fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
117114
Ok(tx)
118115
}
119116

120-
121117
fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> {
122118
rx.recv().expect("jpeg-decoder worker thread error")
123119
}
124-
125-
#[allow(dead_code)]
126-
fn enter_threads<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
127-
let mut worker = StdThreadWorker(MpscWorker::default());
128-
f(&mut worker)
129-
}

src/worker/rayon.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,6 @@ impl ImmediateWorker {
101101
&mut output_buffer,
102102
);
103103

104-
// Lock the mutex only for this write back, not the main computation.
105-
// FIXME: we are only copying image data. Can we use some atomic backing buffer and a
106-
// `Relaxed` write instead?
107104
let write_back = &mut result_block[y * line_stride + x..];
108105

109106
let buffered_lines = output_buffer.chunks_mut(8);
@@ -123,7 +120,7 @@ impl super::Worker for Scoped {
123120
}
124121

125122
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
126-
let ref mut inner = self.inner;
123+
let inner = &mut self.inner;
127124
let (index, data) = row;
128125

129126
let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone();
@@ -142,7 +139,7 @@ impl super::Worker for Scoped {
142139

143140
// Magic sauce, these _may_ run in parallel.
144141
fn append_rows(&mut self, iter: &mut dyn Iterator<Item = (usize, Vec<i16>)>) -> Result<()> {
145-
let ref mut inner = self.inner;
142+
let inner = &mut self.inner;
146143
rayon::in_place_scope(|scope| {
147144
let metadatas = [
148145
inner.component_metadata(0),
@@ -151,7 +148,6 @@ impl super::Worker for Scoped {
151148
inner.component_metadata(3),
152149
];
153150

154-
let inner = &mut *inner;
155151
let [res0, res1, res2, res3] = &mut inner.results;
156152

157153
// Lazily get the blocks. Note: if we've already collected results from a component
@@ -183,8 +179,6 @@ impl super::Worker for Scoped {
183179
)
184180
});
185181
}
186-
187-
// Then the mutex is released, allowing all tasks to run.
188182
});
189183

190184
Ok(())

0 commit comments

Comments
 (0)