diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f255fca9..4bdc3cd2 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -61,12 +61,12 @@ jobs: - uses: actions/checkout@v2 - name: build run: > - cargo build --verbose --no-default-features --target aarch64-unknown-linux-musl --features "$FEATURES" + cargo build --verbose --no-default-features --release --target aarch64-unknown-linux-musl --features "$FEATURES" env: CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_LINKER: aarch64-linux-gnu-gcc - name: test run: > - cargo test --tests --benches --no-default-features --target aarch64-unknown-linux-musl --features "$FEATURES" + cargo test --tests --benches --no-default-features --release --target aarch64-unknown-linux-musl --features "$FEATURES" env: FEATURES: ${{ matrix.features }} CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_LINKER: aarch64-linux-gnu-gcc diff --git a/src/worker/rayon.rs b/src/worker/rayon.rs index d48a6d89..460872c6 100644 --- a/src/worker/rayon.rs +++ b/src/worker/rayon.rs @@ -1,6 +1,6 @@ use core::convert::TryInto; -use rayon::iter::{IndexedParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, ParallelDrainRange, ParallelIterator}; use rayon::slice::ParallelSliceMut; use crate::decoder::choose_color_convert_func; @@ -33,13 +33,27 @@ struct ComponentMetadata { dct_scale: usize, } +struct WorkItem { + quantization_table: Arc<[u16; 64]>, + metadata: ComponentMetadata, + data: Vec, +} + +#[derive(Default)] pub struct Scoped { inner: ImmediateWorker, + // Unfinished, synchronous work for each component. Must simulate finishing the synchronously + // as they write to `results` which is tracked externally. + task_queues: [Vec; MAX_COMPONENTS], + sync: usize, + parallel: usize, } pub fn with_rayon(f: impl FnOnce(&mut dyn Worker) -> T) -> T { - let inner = ImmediateWorker::default(); - f(&mut Scoped { inner }) + let mut scope = Scoped::default(); + let result = f(&mut scope); + // dbg!(scope.sync, scope.parallel); + result } impl ImmediateWorker { @@ -118,6 +132,60 @@ impl ImmediateWorker { } } +impl Scoped { + fn task_queue_len(&self) -> usize { + self.task_queues + .iter() + .map(|q| q.len()) + .sum() + } + + fn drain_task_queues(&mut self) { + let inner = &mut self.inner; + let [res0, res1, res2, res3] = &mut inner.results; + + // Lazily get the blocks. Note: if we've already collected results from a component + // then the result vector has already been deallocated/taken. But no more tasks should + // be created for it. + let mut result_blocks = [ + res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []), + res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []), + res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []), + res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []), + ]; + + let result_blocks = &mut result_blocks; + let offsets = &mut inner.offsets; + + let mut work_items = vec![]; + + for index in 0..MAX_COMPONENTS { + let tasks = self.task_queues[index] + .drain(..) + .map(|item| { + let WorkItem { quantization_table, metadata, data } = item; + offsets[index] += metadata.bytes_used(); + let (result_block, tail) = + core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used()); + result_blocks[index] = tail; + (quantization_table, metadata, data, result_block) + }); + work_items.extend(tasks); + } + + work_items.par_drain(..).for_each( + |(quantization_table, metadata, data, result_block)| { + ImmediateWorker::append_row_locked( + quantization_table, + metadata, + data, + result_block, + ) + }, + ); + } +} + impl super::Worker for Scoped { fn start(&mut self, row_data: RowData) -> Result<()> { self.inner.start_immediate(row_data); @@ -125,66 +193,52 @@ impl super::Worker for Scoped { } fn append_row(&mut self, row: (usize, Vec)) -> Result<()> { + self.sync += 1; let inner = &mut self.inner; let (index, data) = row; let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); let metadata = inner.component_metadata(index).unwrap(); - let result_block = &mut inner.results[index][inner.offsets[index]..]; - inner.offsets[index] += metadata.bytes_used(); - ImmediateWorker::append_row_locked(quantization_table, metadata, data, result_block); + self.task_queues[index].push(WorkItem { quantization_table, metadata, data }); Ok(()) } fn get_result(&mut self, index: usize) -> Result> { + self.drain_task_queues(); let result = self.inner.get_result_immediate(index); Ok(result) } // Magic sauce, these _may_ run in parallel. fn append_rows(&mut self, iter: &mut dyn Iterator)>) -> Result<()> { - let inner = &mut self.inner; - rayon::in_place_scope(|scope| { - let metadatas = [ - inner.component_metadata(0), - inner.component_metadata(1), - inner.component_metadata(2), - inner.component_metadata(3), - ]; - - let [res0, res1, res2, res3] = &mut inner.results; - - // Lazily get the blocks. Note: if we've already collected results from a component - // then the result vector has already been deallocated/taken. But no more tasks should - // be created for it. - let mut result_blocks = [ - res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []), - res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []), - res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []), - res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []), - ]; - - // First we schedule everything, making sure their index is right etc. - for (index, data) in iter { - let metadata = metadatas[index].unwrap(); - let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); - - inner.offsets[index] += metadata.bytes_used(); - let (result_block, tail) = - core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used()); - result_blocks[index] = tail; - - scope.spawn(move |_| { - ImmediateWorker::append_row_locked( - quantization_table, - metadata, - data, - result_block, - ) - }); + let metadatas = [ + self.inner.component_metadata(0), + self.inner.component_metadata(1), + self.inner.component_metadata(2), + self.inner.component_metadata(3), + ]; + + while { + if self.task_queue_len() > 4096 { + self.drain_task_queues(); } - }); + + let quantization_tables = &mut self.inner.quantization_tables; + let task_queue = &mut self.task_queues; + + let work_items = iter.take(1024).fold(0, move |ctr, (index, data)| { + let metadata = metadatas[index].unwrap(); + let quantization_table = quantization_tables[index].as_ref().unwrap().clone(); + + task_queue[index].push(WorkItem {quantization_table, metadata, data }); + ctr + 1 + }); + + self.parallel += work_items; + + work_items > 0 + } {} Ok(()) }