Skip to content

Commit bcbd722

Browse files
committed
Support WebAssembly and asm.js
This adds support for both WebAssembly and asm.js by abstracting the worker thread into an ImmediateWorker and a ThreadWorker which uses the ImmediateWorker. The ThreadWorker is exposed as the worker to be used on every platform other than WebAssembly and asm.js. There the ImmediateWorker itself is exposed instead, because threading isn't supported on those platforms just yet. Resolves #75
1 parent 35c080d commit bcbd722

File tree

6 files changed

+161
-86
lines changed

6 files changed

+161
-86
lines changed

src/decoder.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use std::io::Read;
1010
use std::mem;
1111
use std::ops::Range;
1212
use std::sync::Arc;
13-
use std::sync::mpsc::{self, Sender};
14-
use worker_thread::{RowData, spawn_worker_thread, WorkerMsg};
13+
use worker::{RowData, PlatformWorker, Worker};
1514

1615
pub const MAX_COMPONENTS: usize = 4;
1716

@@ -133,7 +132,7 @@ impl<R: Read> Decoder<R> {
133132

134133
let mut previous_marker = Marker::SOI;
135134
let mut pending_marker = None;
136-
let mut worker_chan = None;
135+
let mut worker = None;
137136
let mut scans_processed = 0;
138137
let mut planes = vec![Vec::new(); self.frame.as_ref().map_or(0, |frame| frame.components.len())];
139138

@@ -193,8 +192,8 @@ impl<R: Read> Decoder<R> {
193192
if self.frame.is_none() {
194193
return Err(Error::Format("scan encountered before frame".to_owned()));
195194
}
196-
if worker_chan.is_none() {
197-
worker_chan = Some(spawn_worker_thread()?);
195+
if worker.is_none() {
196+
worker = Some(PlatformWorker::new()?);
198197
}
199198

200199
let frame = self.frame.clone().unwrap();
@@ -216,7 +215,7 @@ impl<R: Read> Decoder<R> {
216215
}
217216

218217
let is_final_scan = scan.component_indices.iter().all(|&i| self.coefficients_finished[i] == !0);
219-
let (marker, data) = self.decode_scan(&frame, &scan, worker_chan.as_ref().unwrap(), is_final_scan)?;
218+
let (marker, data) = self.decode_scan(&frame, &scan, worker.as_mut().unwrap(), is_final_scan)?;
220219

221220
if let Some(data) = data {
222221
for (i, plane) in data.into_iter().enumerate().filter(|&(_, ref plane)| !plane.is_empty()) {
@@ -357,7 +356,7 @@ impl<R: Read> Decoder<R> {
357356
fn decode_scan(&mut self,
358357
frame: &FrameInfo,
359358
scan: &ScanInfo,
360-
worker_chan: &Sender<WorkerMsg>,
359+
worker: &mut PlatformWorker,
361360
produce_data: bool)
362361
-> Result<(Option<Marker>, Option<Vec<Vec<u8>>>)> {
363362
assert!(scan.component_indices.len() <= MAX_COMPONENTS);
@@ -394,7 +393,7 @@ impl<R: Read> Decoder<R> {
394393
quantization_table: self.quantization_tables[component.quantization_table_index].clone().unwrap(),
395394
};
396395

397-
worker_chan.send(WorkerMsg::Start(row_data))?;
396+
worker.start(row_data)?;
398397
}
399398
}
400399

@@ -516,7 +515,7 @@ impl<R: Read> Decoder<R> {
516515
mem::replace(&mut mcu_row_coefficients[i], vec![0i16; coefficients_per_mcu_row])
517516
};
518517

519-
worker_chan.send(WorkerMsg::AppendRow((i, row_coefficients)))?;
518+
worker.append_row((i, row_coefficients))?;
520519
}
521520
}
522521
}
@@ -528,10 +527,7 @@ impl<R: Read> Decoder<R> {
528527
let mut data = vec![Vec::new(); frame.components.len()];
529528

530529
for (i, &component_index) in scan.component_indices.iter().enumerate() {
531-
let (tx, rx) = mpsc::channel();
532-
worker_chan.send(WorkerMsg::GetResult((i, tx)))?;
533-
534-
data[component_index] = rx.recv()?;
530+
data[component_index] = worker.get_result(i)?;
535531
}
536532

537533
Ok((marker, Some(data)))

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ mod idct;
4242
mod marker;
4343
mod parser;
4444
mod upsampler;
45-
mod worker_thread;
45+
mod worker;

src/worker/immediate.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use decoder::MAX_COMPONENTS;
2+
use error::Result;
3+
use idct::dequantize_and_idct_block;
4+
use std::mem;
5+
use std::sync::Arc;
6+
use parser::Component;
7+
use super::{RowData, Worker};
8+
9+
pub struct ImmediateWorker {
10+
offsets: [usize; MAX_COMPONENTS],
11+
results: Vec<Vec<u8>>,
12+
components: Vec<Option<Component>>,
13+
quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
14+
}
15+
16+
impl ImmediateWorker {
17+
pub fn new_immediate() -> ImmediateWorker {
18+
ImmediateWorker {
19+
offsets: [0; MAX_COMPONENTS],
20+
results: vec![Vec::new(); MAX_COMPONENTS],
21+
components: vec![None; MAX_COMPONENTS],
22+
quantization_tables: vec![None; MAX_COMPONENTS],
23+
}
24+
}
25+
pub fn start_immediate(&mut self, data: RowData) {
26+
assert!(self.results[data.index].is_empty());
27+
28+
self.offsets[data.index] = 0;
29+
self.results[data.index].resize(data.component.block_size.width as usize * data.component.block_size.height as usize * 64, 0u8);
30+
self.components[data.index] = Some(data.component);
31+
self.quantization_tables[data.index] = Some(data.quantization_table);
32+
}
33+
pub fn append_row_immediate(&mut self, (index, data): (usize, Vec<i16>)) {
34+
// Convert coefficients from a MCU row to samples.
35+
36+
let component = self.components[index].as_ref().unwrap();
37+
let quantization_table = self.quantization_tables[index].as_ref().unwrap();
38+
let block_count = component.block_size.width as usize * component.vertical_sampling_factor as usize;
39+
let line_stride = component.block_size.width as usize * 8;
40+
41+
assert_eq!(data.len(), block_count * 64);
42+
43+
for i in 0..block_count {
44+
let x = (i % component.block_size.width as usize) * 8;
45+
let y = (i / component.block_size.width as usize) * 8;
46+
dequantize_and_idct_block(&data[i * 64..(i + 1) * 64],
47+
quantization_table,
48+
line_stride,
49+
&mut self.results[index][self.offsets[index] + y * line_stride + x..]);
50+
}
51+
52+
self.offsets[index] += data.len();
53+
}
54+
pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> {
55+
mem::replace(&mut self.results[index], Vec::new())
56+
}
57+
}
58+
59+
impl Worker for ImmediateWorker {
60+
fn new() -> Result<Self> {
61+
Ok(ImmediateWorker::new_immediate())
62+
}
63+
fn start(&mut self, data: RowData) -> Result<()> {
64+
self.start_immediate(data);
65+
Ok(())
66+
}
67+
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
68+
self.append_row_immediate(row);
69+
Ok(())
70+
}
71+
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
72+
Ok(self.get_result_immediate(index))
73+
}
74+
}

src/worker/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
mod threaded;
2+
mod immediate;
3+
4+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
5+
pub use self::threaded::ThreadedWorker as PlatformWorker;
6+
#[cfg(any(target_arch = "asmjs", target_arch = "wasm32"))]
7+
pub use self::immediate::ImmediateWorker as PlatformWorker;
8+
9+
use error::Result;
10+
use parser::Component;
11+
use std::sync::Arc;
12+
13+
pub struct RowData {
14+
pub index: usize,
15+
pub component: Component,
16+
pub quantization_table: Arc<[u16; 64]>,
17+
}
18+
19+
pub trait Worker: Sized {
20+
fn new() -> Result<Self>;
21+
fn start(&mut self, row_data: RowData) -> Result<()>;
22+
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
23+
fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
24+
}

src/worker/threaded.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use error::Result;
2+
use std::sync::mpsc::{self, Sender};
3+
use std::thread;
4+
use super::{RowData, Worker};
5+
use super::immediate::ImmediateWorker;
6+
7+
enum WorkerMsg {
8+
Start(RowData),
9+
AppendRow((usize, Vec<i16>)),
10+
GetResult((usize, Sender<Vec<u8>>)),
11+
}
12+
13+
pub struct ThreadedWorker {
14+
sender: Sender<WorkerMsg>,
15+
}
16+
17+
impl Worker for ThreadedWorker {
18+
fn new() -> Result<Self> {
19+
let thread_builder = thread::Builder::new().name("worker thread".to_owned());
20+
let (tx, rx) = mpsc::channel();
21+
22+
thread_builder.spawn(move || {
23+
let mut worker = ImmediateWorker::new_immediate();
24+
25+
while let Ok(message) = rx.recv() {
26+
match message {
27+
WorkerMsg::Start(data) => {
28+
worker.start_immediate(data);
29+
},
30+
WorkerMsg::AppendRow(row) => {
31+
worker.append_row_immediate(row);
32+
},
33+
WorkerMsg::GetResult((index, chan)) => {
34+
let _ = chan.send(worker.get_result_immediate(index));
35+
},
36+
}
37+
}
38+
})?;
39+
40+
Ok(ThreadedWorker { sender: tx })
41+
}
42+
fn start(&mut self, row_data: RowData) -> Result<()> {
43+
Ok(self.sender.send(WorkerMsg::Start(row_data))?)
44+
}
45+
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
46+
Ok(self.sender.send(WorkerMsg::AppendRow(row))?)
47+
}
48+
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
49+
let (tx, rx) = mpsc::channel();
50+
self.sender.send(WorkerMsg::GetResult((index, tx)))?;
51+
Ok(rx.recv()?)
52+
}
53+
}

src/worker_thread.rs

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)