From fd80c1d846e1580903fe7d09822c81bec799c5c4 Mon Sep 17 00:00:00 2001 From: Andreas Molzer Date: Sat, 14 May 2022 17:32:50 +0200 Subject: [PATCH 1/2] Run aarch64 tests with --release These are being emulated, with instruction count adding significantly to the runtime. As it stands this is >20 minutes for a full run which is the bottleneck of our CI times. It should not be necessary to run these with debug flags. The main reason to have the at all, the arch/aarch64 inline SIMD code, is not affected by flags in any case. --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f255fca9..4bdc3cd2 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -61,12 +61,12 @@ jobs: - uses: actions/checkout@v2 - name: build run: > - cargo build --verbose --no-default-features --target aarch64-unknown-linux-musl --features "$FEATURES" + cargo build --verbose --no-default-features --release --target aarch64-unknown-linux-musl --features "$FEATURES" env: CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_LINKER: aarch64-linux-gnu-gcc - name: test run: > - cargo test --tests --benches --no-default-features --target aarch64-unknown-linux-musl --features "$FEATURES" + cargo test --tests --benches --no-default-features --release --target aarch64-unknown-linux-musl --features "$FEATURES" env: FEATURES: ${{ matrix.features }} CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_LINKER: aarch64-linux-gnu-gcc From c8323b7f28f9d16423208e3d60a92c71f05df28c Mon Sep 17 00:00:00 2001 From: Andreas Molzer Date: Sat, 14 May 2022 18:00:25 +0200 Subject: [PATCH 2/2] Change parallelization strategy in rayon Intends to address the issue of effectively serialized sort, where all tasks end up being executed on the main thread instead of being distributed into other workers. We had neglected that most work is scheduled in sync (apppend_row such as in decoder.rs:903 instead of apppend_rows). This meant most were executed with an immediate strategy. The change pushes all items into a bounded task queue that is emptied and actively worked on when it reaches a capacity maximum, as well as when any component result is requested. This is in contrast to std::multithreading where items are worked on while decoding is in progress but task queueing itself has more overhead. decode a 512x512 JPEG time: [1.7317 ms 1.7352 ms 1.7388 ms] change: [-22.895% -22.646% -22.351%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low mild 4 (4.00%) high mild 1 (1.00%) high severe decode a 512x512 progressive JPEG time: [4.7252 ms 4.7364 ms 4.7491 ms] change: [-15.641% -15.349% -15.052%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 1 (1.00%) high mild 2 (2.00%) high severe decode a 512x512 grayscale JPEG time: [873.48 us 877.71 us 882.83 us] change: [-11.470% -10.764% -10.041%] (p = 0.00 < 0.05) Performance has improved. Found 13 outliers among 100 measurements (13.00%) 2 (2.00%) low mild 9 (9.00%) high mild 2 (2.00%) high severe extract metadata from an image time: [1.1033 us 1.1066 us 1.1099 us] change: [-11.608% -9.8026% -8.3965%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 2 (2.00%) low severe 1 (1.00%) low mild 1 (1.00%) high mild Benchmarking decode a 3072x2048 RGB Lossless JPEG: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 36.6s, or reduce sample count to 10. decode a 3072x2048 RGB Lossless JPEG time: [363.07 ms 363.66 ms 364.27 ms] change: [+0.0997% +0.3692% +0.6323%] (p = 0.01 < 0.05) Change within noise threshold. Found 6 outliers among 100 measurements (6.00%) 5 (5.00%) high mild 1 (1.00%) high severe Running unittests (target/release/deps/large_image-0e61f2c2f07410bd) Gnuplot not found, using plotters backend decode a 2268x1512 JPEG time: [28.755 ms 28.879 ms 29.021 ms] change: [-5.7714% -4.9308% -4.0969%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe --- src/worker/rayon.rs | 146 ++++++++++++++++++++++++++++++-------------- 1 file changed, 100 insertions(+), 46 deletions(-) diff --git a/src/worker/rayon.rs b/src/worker/rayon.rs index d48a6d89..460872c6 100644 --- a/src/worker/rayon.rs +++ b/src/worker/rayon.rs @@ -1,6 +1,6 @@ use core::convert::TryInto; -use rayon::iter::{IndexedParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, ParallelDrainRange, ParallelIterator}; use rayon::slice::ParallelSliceMut; use crate::decoder::choose_color_convert_func; @@ -33,13 +33,27 @@ struct ComponentMetadata { dct_scale: usize, } +struct WorkItem { + quantization_table: Arc<[u16; 64]>, + metadata: ComponentMetadata, + data: Vec, +} + +#[derive(Default)] pub struct Scoped { inner: ImmediateWorker, + // Unfinished, synchronous work for each component. Must simulate finishing the synchronously + // as they write to `results` which is tracked externally. + task_queues: [Vec; MAX_COMPONENTS], + sync: usize, + parallel: usize, } pub fn with_rayon(f: impl FnOnce(&mut dyn Worker) -> T) -> T { - let inner = ImmediateWorker::default(); - f(&mut Scoped { inner }) + let mut scope = Scoped::default(); + let result = f(&mut scope); + // dbg!(scope.sync, scope.parallel); + result } impl ImmediateWorker { @@ -118,6 +132,60 @@ impl ImmediateWorker { } } +impl Scoped { + fn task_queue_len(&self) -> usize { + self.task_queues + .iter() + .map(|q| q.len()) + .sum() + } + + fn drain_task_queues(&mut self) { + let inner = &mut self.inner; + let [res0, res1, res2, res3] = &mut inner.results; + + // Lazily get the blocks. Note: if we've already collected results from a component + // then the result vector has already been deallocated/taken. But no more tasks should + // be created for it. + let mut result_blocks = [ + res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []), + res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []), + res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []), + res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []), + ]; + + let result_blocks = &mut result_blocks; + let offsets = &mut inner.offsets; + + let mut work_items = vec![]; + + for index in 0..MAX_COMPONENTS { + let tasks = self.task_queues[index] + .drain(..) + .map(|item| { + let WorkItem { quantization_table, metadata, data } = item; + offsets[index] += metadata.bytes_used(); + let (result_block, tail) = + core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used()); + result_blocks[index] = tail; + (quantization_table, metadata, data, result_block) + }); + work_items.extend(tasks); + } + + work_items.par_drain(..).for_each( + |(quantization_table, metadata, data, result_block)| { + ImmediateWorker::append_row_locked( + quantization_table, + metadata, + data, + result_block, + ) + }, + ); + } +} + impl super::Worker for Scoped { fn start(&mut self, row_data: RowData) -> Result<()> { self.inner.start_immediate(row_data); @@ -125,66 +193,52 @@ impl super::Worker for Scoped { } fn append_row(&mut self, row: (usize, Vec)) -> Result<()> { + self.sync += 1; let inner = &mut self.inner; let (index, data) = row; let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); let metadata = inner.component_metadata(index).unwrap(); - let result_block = &mut inner.results[index][inner.offsets[index]..]; - inner.offsets[index] += metadata.bytes_used(); - ImmediateWorker::append_row_locked(quantization_table, metadata, data, result_block); + self.task_queues[index].push(WorkItem { quantization_table, metadata, data }); Ok(()) } fn get_result(&mut self, index: usize) -> Result> { + self.drain_task_queues(); let result = self.inner.get_result_immediate(index); Ok(result) } // Magic sauce, these _may_ run in parallel. fn append_rows(&mut self, iter: &mut dyn Iterator)>) -> Result<()> { - let inner = &mut self.inner; - rayon::in_place_scope(|scope| { - let metadatas = [ - inner.component_metadata(0), - inner.component_metadata(1), - inner.component_metadata(2), - inner.component_metadata(3), - ]; - - let [res0, res1, res2, res3] = &mut inner.results; - - // Lazily get the blocks. Note: if we've already collected results from a component - // then the result vector has already been deallocated/taken. But no more tasks should - // be created for it. - let mut result_blocks = [ - res0.get_mut(inner.offsets[0]..).unwrap_or(&mut []), - res1.get_mut(inner.offsets[1]..).unwrap_or(&mut []), - res2.get_mut(inner.offsets[2]..).unwrap_or(&mut []), - res3.get_mut(inner.offsets[3]..).unwrap_or(&mut []), - ]; - - // First we schedule everything, making sure their index is right etc. - for (index, data) in iter { - let metadata = metadatas[index].unwrap(); - let quantization_table = inner.quantization_tables[index].as_ref().unwrap().clone(); - - inner.offsets[index] += metadata.bytes_used(); - let (result_block, tail) = - core::mem::take(&mut result_blocks[index]).split_at_mut(metadata.bytes_used()); - result_blocks[index] = tail; - - scope.spawn(move |_| { - ImmediateWorker::append_row_locked( - quantization_table, - metadata, - data, - result_block, - ) - }); + let metadatas = [ + self.inner.component_metadata(0), + self.inner.component_metadata(1), + self.inner.component_metadata(2), + self.inner.component_metadata(3), + ]; + + while { + if self.task_queue_len() > 4096 { + self.drain_task_queues(); } - }); + + let quantization_tables = &mut self.inner.quantization_tables; + let task_queue = &mut self.task_queues; + + let work_items = iter.take(1024).fold(0, move |ctr, (index, data)| { + let metadata = metadatas[index].unwrap(); + let quantization_table = quantization_tables[index].as_ref().unwrap().clone(); + + task_queue[index].push(WorkItem {quantization_table, metadata, data }); + ctr + 1 + }); + + self.parallel += work_items; + + work_items > 0 + } {} Ok(()) }