Skip to content

Commit 2d9fe0b

Browse files
committed
More threads for resizing and blur
1 parent 6b62982 commit 2d9fe0b

File tree

4 files changed

+44
-31
lines changed

4 files changed

+44
-31
lines changed

src/bin/gifski.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ fn bin_main() -> BinResult<()> {
204204
if speed != 1.0 {
205205
return Err("Speed is for videos. It doesn't make sense for images. Use fps only".into());
206206
}
207-
Box::new(png::Lodecoder::new(frames, rate, if fast { 6 } else { 3 }))
207+
Box::new(png::Lodecoder::new(frames, rate, if fast { 6 } else { 3 }.try_into().unwrap()))
208208
};
209209

210210
let mut pb;

src/bin/png.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::num::NonZeroU8;
12
use crate::source::Fps;
23
use crate::source::Source;
34
use crate::BinResult;
@@ -7,11 +8,11 @@ use std::path::PathBuf;
78
pub struct Lodecoder {
89
frames: Vec<PathBuf>,
910
fps: f32,
10-
thread_pool_size: u8,
11+
thread_pool_size: NonZeroU8,
1112
}
1213

1314
impl Lodecoder {
14-
pub fn new(frames: Vec<PathBuf>, params: Fps, thread_pool_size: u8) -> Self {
15+
pub fn new(frames: Vec<PathBuf>, params: Fps, thread_pool_size: NonZeroU8) -> Self {
1516
Self { frames, fps: params.fps, thread_pool_size }
1617
}
1718
}

src/lib.rs

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ pub use minipool::new as private_minipool;
4444

4545
use crossbeam_channel::{Receiver, Sender};
4646
use std::io::prelude::*;
47+
use std::num::NonZeroU8;
4748
use std::thread;
4849

4950
struct InputFrameUnresized {
5051
/// The pixels to resize and encode
5152
frame: ImgVec<RGBA8>,
5253
/// Time in seconds when to display the frame. First frame should start at 0.
5354
presentation_timestamp: f64,
55+
frame_index: usize,
5456
}
5557

5658
struct InputFrame {
@@ -84,6 +86,7 @@ pub struct Settings {
8486
#[non_exhaustive]
8587
struct SettingsExt {
8688
pub s: Settings,
89+
pub max_threads: NonZeroU8,
8790
pub extra_effort: bool,
8891
pub motion_quality: u8,
8992
pub giflossy_quality: u8,
@@ -124,13 +127,13 @@ impl Default for Settings {
124127
/// Note that writing will finish only when the collector is dropped.
125128
/// Collect frames on another thread, or call `drop(collector)` before calling `writer.write()`!
126129
pub struct Collector {
127-
queue: OrdQueue<CatResult<InputFrameUnresized>>,
130+
queue: Sender<CatResult<InputFrameUnresized>>,
128131
}
129132

130133
/// Perform GIF writing
131134
pub struct Writer {
132135
/// Input frame decoder results
133-
queue_iter: Option<OrdQueueIter<CatResult<InputFrameUnresized>>>,
136+
queue_iter: Option<Receiver<CatResult<InputFrameUnresized>>>,
134137
settings: SettingsExt,
135138
}
136139

@@ -195,7 +198,7 @@ pub fn new(settings: Settings) -> CatResult<(Collector, Writer)> {
195198
if settings.width.unwrap_or(0) > 1<<16 || settings.height.unwrap_or(0) > 1<<16 {
196199
return Err(Error::WrongSize("image size too large".into()));
197200
}
198-
let (queue, queue_iter) = ordqueue::new(6); // should be sufficient for denoiser lookahead
201+
let (queue, queue_iter) = crossbeam_channel::bounded(6); // should be sufficient for denoiser lookahead
199202

200203
Ok((
201204
Collector {
@@ -204,6 +207,7 @@ pub fn new(settings: Settings) -> CatResult<(Collector, Writer)> {
204207
Writer {
205208
queue_iter: Some(queue_iter),
206209
settings: SettingsExt {
210+
max_threads: thread::available_parallelism().map(|t| t.get().min(255) as u8).unwrap_or(8).try_into().unwrap(),
207211
motion_quality: settings.quality,
208212
giflossy_quality: settings.quality,
209213
extra_effort: false,
@@ -223,10 +227,12 @@ impl Collector {
223227
/// If the first frame doesn't start at pts=0, the delay will be used for the last frame.
224228
pub fn add_frame_rgba(&self, frame_index: usize, frame: ImgVec<RGBA8>, presentation_timestamp: f64) -> CatResult<()> {
225229
debug_assert!(frame_index == 0 || presentation_timestamp > 0.);
226-
self.queue.push(frame_index, Ok(InputFrameUnresized {
230+
self.queue.send(Ok(InputFrameUnresized {
231+
frame_index,
227232
frame,
228233
presentation_timestamp,
229-
}))
234+
}))?;
235+
Ok(())
230236
}
231237

232238
/// Read and decode a PNG file from disk.
@@ -242,10 +248,12 @@ impl Collector {
242248
.map_err(|err| Error::PNG(format!("Can't load {}: {err}", path.display())))?;
243249

244250
let frame = Img::new(image.buffer, image.width, image.height);
245-
self.queue.push(frame_index, Ok(InputFrameUnresized {
251+
self.queue.send(Ok(InputFrameUnresized {
246252
frame,
247-
presentation_timestamp}
248-
))
253+
presentation_timestamp,
254+
frame_index,
255+
}))?;
256+
Ok(())
249257
}
250258
}
251259

@@ -383,7 +391,7 @@ impl Writer {
383391
///
384392
/// `background` is the previous frame.
385393
fn quantize(image: ImgVec<RGBA8>, importance_map: &[u8], first_frame: bool, needs_transparency: bool, prev_frame_keeps: bool, settings: &SettingsExt) -> CatResult<(Attributes, QuantizationResult, Image<'static>)> {
386-
let SettingsExt {s: settings, extra_effort, motion_quality: _, giflossy_quality: _} = settings;
394+
let SettingsExt {s: settings, extra_effort, motion_quality: _, giflossy_quality: _, max_threads: _ } = settings;
387395
let mut liq = Attributes::new();
388396
if settings.fast {
389397
liq.set_speed(10)?;
@@ -481,13 +489,13 @@ impl Writer {
481489

482490
let settings_ext = self.settings;
483491
let settings = self.settings.s;
484-
let (resize_queue, resize_queue_recv) = crossbeam_channel::bounded(2);
492+
let (diff_queue, diff_queue_recv) = ordqueue::new(2);
485493
let resize_thread = thread::Builder::new().name("resize".into()).spawn(move || {
486-
Self::make_resize(decode_queue_recv, resize_queue, &settings_ext.s)
494+
Self::make_resize(decode_queue_recv, diff_queue, &settings_ext)
487495
})?;
488496
let (quant_queue, quant_queue_recv) = crossbeam_channel::bounded(2);
489497
let diff_thread = thread::Builder::new().name("diff".into()).spawn(move || {
490-
Self::make_diffs(resize_queue_recv, quant_queue, &settings_ext)
498+
Self::make_diffs(diff_queue_recv, quant_queue, &settings_ext)
491499
})?;
492500
let (remap_queue, remap_queue_recv) = ordqueue::new(2);
493501
let quant_thread = thread::Builder::new().name("quant".into()).spawn(move || {
@@ -505,21 +513,25 @@ impl Writer {
505513
combine_res(combine_res(combine_res(res0, res1), combine_res(res2, res3)), res4)
506514
}
507515

508-
fn make_resize(inputs: OrdQueueIter<CatResult<InputFrameUnresized>>, diff_queue: Sender<InputFrame>, settings: &Settings) -> CatResult<()> {
509-
for frame in inputs {
510-
let frame = frame?;
511-
let resized = resized_binary_alpha(frame.frame, settings.width, settings.height)?;
512-
diff_queue.send(InputFrame {
513-
frame_blurred: smart_blur(resized.as_ref()),
516+
fn make_resize(inputs: Receiver<CatResult<InputFrameUnresized>>, diff_queue: OrdQueue<InputFrame>, settings: &SettingsExt) -> CatResult<()> {
517+
minipool::new(settings.max_threads.min(if settings.s.fast { 6 } else { 4 }.try_into().unwrap()), "resize", move |to_remap| {
518+
for frame in inputs {
519+
to_remap.send(frame?)?;
520+
}
521+
Ok(())
522+
}, move |frame| {
523+
let resized = resized_binary_alpha(frame.frame, settings.s.width, settings.s.height)?;
524+
let frame_blurred = smart_blur(resized.as_ref());
525+
diff_queue.push(frame.frame_index, InputFrame {
514526
frame: resized,
527+
frame_blurred,
515528
presentation_timestamp: frame.presentation_timestamp,
516529
})?;
517-
}
518-
Ok(())
519-
530+
Ok(())
531+
})
520532
}
521533

522-
fn make_diffs(inputs: Receiver<InputFrame>, quant_queue: Sender<DiffMessage>, settings: &SettingsExt) -> CatResult<()> {
534+
fn make_diffs(inputs: OrdQueueIter<InputFrame>, quant_queue: Sender<DiffMessage>, settings: &SettingsExt) -> CatResult<()> {
523535
let mut inputs = inputs.into_iter();
524536
let first_frame = inputs.next().ok_or(Error::NoFrames)?;
525537

@@ -591,7 +603,7 @@ impl Writer {
591603
}
592604

593605
fn quantize_frames(inputs: Receiver<DiffMessage>, remap_queue: OrdQueue<RemapMessage>, settings: &SettingsExt) -> CatResult<()> {
594-
minipool::new(3, "quant", move |to_remap| {
606+
minipool::new(settings.max_threads.min(3.try_into().unwrap()), "quant", move |to_remap| {
595607
let mut inputs = inputs.into_iter().peekable();
596608

597609
let DiffMessage {image: first_frame, ..} = inputs.peek().ok_or(Error::NoFrames)?;

src/minipool.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use std::num::NonZeroU8;
12
use std::panic::catch_unwind;
23
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
34
use crossbeam_channel::Sender;
45
use crate::Error;
56

6-
pub fn new<P, C, M, R>(num_threads: u8, name: &str, producer: P, mut consumer: C) -> Result<R, Error> where
7+
pub fn new<P, C, M, R>(num_threads: NonZeroU8, name: &str, producer: P, mut consumer: C) -> Result<R, Error> where
78
M: Send,
89
C: Clone + Send + FnMut(M) -> Result<(), Error> + std::panic::UnwindSafe,
910
P: FnOnce(Sender<M>) -> Result<R, Error>,
@@ -34,12 +35,11 @@ pub fn new<P, C, M, R>(num_threads: u8, name: &str, producer: P, mut consumer: C
3435
Error::ThreadSend
3536
})
3637
};
37-
debug_assert!(num_threads > 0);
38-
let mut handles = Vec::with_capacity(num_threads.into());
39-
for n in 0..num_threads-1 {
38+
let mut handles = Vec::with_capacity(num_threads.get().into());
39+
for n in 0..num_threads.get()-1 {
4040
handles.push(spawn(n, thread.clone())?);
4141
}
42-
handles.push(spawn(num_threads-1, thread)?);
42+
handles.push(spawn(num_threads.get()-1, thread)?);
4343

4444
let res = producer(s).map_err(|e| {
4545
failed.store(true, SeqCst);

0 commit comments

Comments
 (0)