From 4b19874c8850cbf857220de200bcf2be8421d499 Mon Sep 17 00:00:00 2001 From: Otto Date: Tue, 3 Oct 2023 12:30:18 +0200 Subject: [PATCH 1/6] Expose garbage collector thread via RenderScope to all AudioProcessors --- src/render/processor.rs | 7 +++++++ src/render/thread.rs | 14 +++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/render/processor.rs b/src/render/processor.rs index 64d1d321..b92431db 100644 --- a/src/render/processor.rs +++ b/src/render/processor.rs @@ -8,6 +8,7 @@ use super::{graph::Node, AudioRenderQuantum}; use crossbeam_channel::Sender; use rustc_hash::FxHashMap; use std::cell::{Cell, RefCell}; +use std::rc::Rc; use std::any::Any; use std::ops::Deref; @@ -25,6 +26,7 @@ pub struct RenderScope { pub(crate) node_id: Cell, pub(crate) event_sender: Option>, + pub(crate) garbage_collector: Rc>>>, } impl RenderScope { @@ -62,6 +64,11 @@ impl RenderScope { let _ = sender.try_send(EventDispatch::processor_error(self.node_id.get(), event)); } } + + #[allow(dead_code)] + pub(crate) fn deallocate_async(&self, value: llq::Node>) { + self.garbage_collector.borrow_mut().push(value); + } } /// Interface for audio processing code that runs on the audio rendering thread. diff --git a/src/render/thread.rs b/src/render/thread.rs index cd97efc2..a3fe363f 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -1,7 +1,8 @@ //! Communicates with the control thread and ships audio samples to the hardware use std::any::Any; -use std::cell::Cell; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -32,7 +33,7 @@ pub(crate) struct RenderThread { buffer_offset: Option<(usize, AudioRenderQuantum)>, load_value_sender: Option>, event_sender: Option>, - garbage_collector: llq::Producer>, + garbage_collector: Rc>>>, } // SAFETY: @@ -66,7 +67,7 @@ impl RenderThread { buffer_offset: None, load_value_sender, event_sender, - garbage_collector: gc_producer, + garbage_collector: Rc::new(RefCell::new(gc_producer)), } } @@ -129,7 +130,7 @@ impl RenderThread { } NodeMessage { id, mut msg } => { self.graph.as_mut().unwrap().route_message(id, msg.as_mut()); - self.garbage_collector.push(msg); + self.garbage_collector.borrow_mut().push(msg); } } } @@ -166,6 +167,7 @@ impl RenderThread { sample_rate: self.sample_rate, event_sender: self.event_sender.clone(), node_id: Cell::new(AudioNodeId(0)), // placeholder value + garbage_collector: Rc::clone(&self.garbage_collector), }; // render audio graph @@ -264,6 +266,7 @@ impl RenderThread { sample_rate: self.sample_rate, event_sender: self.event_sender.clone(), node_id: Cell::new(AudioNodeId(0)), // placeholder value + garbage_collector: Rc::clone(&self.garbage_collector), }; // render audio graph, clone it in case we need to mutate/store the value later @@ -302,6 +305,7 @@ impl RenderThread { impl Drop for RenderThread { fn drop(&mut self) { self.garbage_collector + .borrow_mut() .push(llq::Node::new(Box::new(TerminateGarbageCollectorThread))); log::info!("Audio render thread has been dropped"); } @@ -320,7 +324,7 @@ fn spawn_garbage_collector_thread(consumer: llq::Consumer>) } fn run_garbage_collector_thread(mut consumer: llq::Consumer>) { - log::info!("Entering garbage collector thread"); + log::debug!("Entering garbage collector thread"); loop { if let Some(node) = consumer.pop() { if node From 23b4d6a6b5449ca789d08b58ff0ba2794a61fb43 Mon Sep 17 00:00:00 2001 From: Otto Date: Thu, 5 Oct 2023 13:52:41 +0200 Subject: [PATCH 2/6] Proof of concept (untested) to prevent deallocation in node cleanup --- src/buffer.rs | 12 +++++++-- src/node/audio_buffer_source.rs | 46 +++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/src/buffer.rs b/src/buffer.rs index 71824186..cd97c4ae 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -66,8 +66,8 @@ pub struct AudioBufferOptions { /// #[derive(Clone, Debug)] pub struct AudioBuffer { - pub(crate) channels: Vec, - pub(crate) sample_rate: f32, + channels: Vec, + sample_rate: f32, } impl AudioBuffer { @@ -91,6 +91,14 @@ impl AudioBuffer { } } + /// Creates an invalid, but non-allocating AudioBuffer to be used as placeholder + pub(crate) fn tombstone() -> Self { + Self { + channels: Default::default(), + sample_rate: Default::default(), + } + } + /// Convert raw samples to an AudioBuffer /// /// The outer Vec determine the channels. The inner Vecs should have the same length. diff --git a/src/node/audio_buffer_source.rs b/src/node/audio_buffer_source.rs index 2fad9a3c..c8a0a9d9 100644 --- a/src/node/audio_buffer_source.rs +++ b/src/node/audio_buffer_source.rs @@ -215,6 +215,7 @@ impl AudioBufferSourceNode { loop_state: loop_state.clone(), render_state: AudioBufferRendererState::default(), ended_triggered: false, + buffer_deallocator: Some(llq::Node::new(Box::new(AudioBuffer::tombstone()))), }; let inner_state = InnerState { @@ -379,6 +380,7 @@ struct AudioBufferSourceRenderer { loop_state: LoopState, render_state: AudioBufferRendererState, ended_triggered: bool, + buffer_deallocator: Option>>, } impl AudioBufferSourceRenderer { @@ -395,6 +397,26 @@ impl AudioBufferSourceRenderer { ControlMessage::LoopEnd(loop_end) => self.loop_state.end = *loop_end, } } + + fn on_end(&mut self, scope: &RenderScope) { + if self.ended_triggered { + return; // only run once + } + self.ended_triggered = true; + + // notify the control thread + scope.send_ended_event(); + + // deallocate the AudioBuffer asynchronously - not in the render thread + if let Some(buffer) = self.buffer.take() { + // the holder contains a tombstone AudioBuffer that can be dropped without deallocation + let mut holder = self.buffer_deallocator.take().unwrap(); + // replace the contents of the holder with the actual buffer + *holder.downcast_mut().unwrap() = buffer; + // ship the buffer to the deallocator thread + scope.deallocate_async(holder); + } + } } impl AudioProcessor for AudioBufferSourceRenderer { @@ -461,13 +483,7 @@ impl AudioProcessor for AudioBufferSourceRenderer { || self.render_state.buffer_time_elapsed >= self.duration { output.make_silent(); // also converts to mono - - // @note: we need this check because this is called a until the program - // ends, such as if the node was never removed from the graph - if !self.ended_triggered { - scope.send_ended_event(); - self.ended_triggered = true; - } + self.on_end(scope); return false; } @@ -479,19 +495,13 @@ impl AudioProcessor for AudioBufferSourceRenderer { if !is_looping { if computed_playback_rate > 0. && buffer_time >= buffer_duration { output.make_silent(); // also converts to mono - if !self.ended_triggered { - scope.send_ended_event(); - self.ended_triggered = true; - } + self.on_end(scope); return false; } if computed_playback_rate < 0. && buffer_time < 0. { output.make_silent(); // also converts to mono - if !self.ended_triggered { - scope.send_ended_event(); - self.ended_triggered = true; - } + self.on_end(scope); return false; } } @@ -772,11 +782,7 @@ impl AudioProcessor for AudioBufferSourceRenderer { std::mem::swap(current_buffer, buffer); } else { // Creating the tombstone buffer does not cause allocations. - let tombstone_buffer = AudioBuffer { - channels: Default::default(), - sample_rate: Default::default(), - }; - self.buffer = Some(std::mem::replace(buffer, tombstone_buffer)); + self.buffer = Some(std::mem::replace(buffer, AudioBuffer::tombstone())); } return; }; From 614abc112be38db01c482c6721cd04541baa1d2d Mon Sep 17 00:00:00 2001 From: Otto Date: Thu, 5 Oct 2023 20:48:38 +0200 Subject: [PATCH 3/6] Experimenting with allocator-free rendering - not meant to be merged The goal is to run `cargo run --example audio_buffer_source_events` without failing for now --- Cargo.toml | 2 +- examples/audio_buffer_source_events.rs | 1 + src/lib.rs | 3 +++ src/param.rs | 2 +- src/render/graph.rs | 28 ++++++++++++++++---------- src/render/quantum.rs | 6 +++--- src/render/thread.rs | 8 +++++++- 7 files changed, 33 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0401de1..4e2566ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,9 +32,9 @@ rustc-hash = "1.1" smallvec = "1.11" symphonia = { version = "0.5", default-features = false } vecmath = "1.0" +assert_no_alloc = { git = "https://github.com/Windfisch/rust-assert-no-alloc/", default-features = false, features = ["backtrace"] } [dev-dependencies] -alloc_counter = "0.0.4" env_logger = "0.10" iai = "0.1.1" rand = "0.8" diff --git a/examples/audio_buffer_source_events.rs b/examples/audio_buffer_source_events.rs index 18d69e2f..d330236d 100644 --- a/examples/audio_buffer_source_events.rs +++ b/examples/audio_buffer_source_events.rs @@ -41,6 +41,7 @@ fn main() { let now = context.current_time(); src.start_at(now); src.stop_at(now + 1.); + drop(src); std::thread::sleep(std::time::Duration::from_secs(4)); } diff --git a/src/lib.rs b/src/lib.rs index 65b71418..198c0a90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,9 @@ #![warn(clippy::clone_on_ref_ptr)] #![deny(trivial_numeric_casts)] +#[global_allocator] +static A: assert_no_alloc::AllocDisabler = assert_no_alloc::AllocDisabler; + use std::error::Error; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; diff --git a/src/param.rs b/src/param.rs index 50f1ffba..564c327e 100644 --- a/src/param.rs +++ b/src/param.rs @@ -128,7 +128,7 @@ struct AudioParamEventTimeline { impl AudioParamEventTimeline { fn new() -> Self { Self { - inner: Vec::new(), + inner: Vec::with_capacity(5), dirty: false, } } diff --git a/src/render/graph.rs b/src/render/graph.rs index 5a24edfb..2be859de 100644 --- a/src/render/graph.rs +++ b/src/render/graph.rs @@ -11,6 +11,9 @@ use super::{Alloc, AudioParamValues, AudioProcessor, AudioRenderQuantum}; use crate::node::ChannelConfig; use crate::render::RenderScope; +const INITIAL_GRAPH_SIZE: usize = 16; +const INITIAL_CHANNEL_DATA_COUNT: usize = INITIAL_GRAPH_SIZE * 4; + /// Connection between two audio nodes struct OutgoingEdge { /// index of the current Nodes output port @@ -26,9 +29,9 @@ pub struct Node { /// Renderer: converts inputs to outputs processor: Box, /// Reusable input buffers - inputs: Vec, + inputs: SmallVec<[AudioRenderQuantum; 2]>, /// Reusable output buffers, consumed by subsequent Nodes in this graph - outputs: Vec, + outputs: SmallVec<[AudioRenderQuantum; 2]>, /// Channel configuration: determines up/down-mixing of inputs channel_config: ChannelConfig, /// Outgoing edges: tuple of outcoming node reference, our output index and their input index @@ -94,14 +97,17 @@ pub(crate) struct Graph { impl Graph { pub fn new() -> Self { + let mut nodes = FxHashMap::default(); + nodes.reserve(16); + Graph { - nodes: FxHashMap::default(), - ordered: vec![], - marked: vec![], - marked_temp: vec![], - in_cycle: vec![], - cycle_breakers: vec![], - alloc: Alloc::with_capacity(64), + nodes, + ordered: Vec::with_capacity(INITIAL_GRAPH_SIZE), + marked: Vec::with_capacity(INITIAL_GRAPH_SIZE), + marked_temp: Vec::with_capacity(INITIAL_GRAPH_SIZE), + in_cycle: Vec::with_capacity(INITIAL_GRAPH_SIZE), + cycle_breakers: Vec::with_capacity(INITIAL_GRAPH_SIZE), + alloc: Alloc::with_capacity(INITIAL_CHANNEL_DATA_COUNT), } } @@ -123,8 +129,8 @@ impl Graph { // set input and output buffers to single channel of silence, will be upmixed when // necessary - let inputs = vec![AudioRenderQuantum::from(self.alloc.silence()); number_of_inputs]; - let outputs = vec![AudioRenderQuantum::from(self.alloc.silence()); number_of_outputs]; + let inputs = smallvec![AudioRenderQuantum::from(self.alloc.silence()); number_of_inputs]; + let outputs = smallvec![AudioRenderQuantum::from(self.alloc.silence()); number_of_outputs]; self.nodes.insert( index, diff --git a/src/render/quantum.rs b/src/render/quantum.rs index e39b718d..322bb3cd 100644 --- a/src/render/quantum.rs +++ b/src/render/quantum.rs @@ -681,7 +681,7 @@ mod tests { let alloc = Alloc::with_capacity(2); assert_eq!(alloc.pool_size(), 2); - alloc_counter::deny_alloc(|| { + assert_no_alloc::assert_no_alloc(|| { { // take a buffer out of the pool let a = alloc.allocate(); @@ -712,7 +712,7 @@ mod tests { let a = alloc.allocate(); let b = alloc.allocate(); - let c = alloc_counter::allow_alloc(|| { + let c = assert_no_alloc::permit_alloc(|| { // we can allocate beyond the pool size let c = alloc.allocate(); assert_eq!(alloc.pool_size(), 0); @@ -728,7 +728,7 @@ mod tests { }; // dropping c will cause a re-allocation: the pool capacity is extended - alloc_counter::allow_alloc(move || { + assert_no_alloc::permit_alloc(move || { std::mem::drop(c); }); diff --git a/src/render/thread.rs b/src/render/thread.rs index a3fe363f..d2892c03 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -125,6 +125,10 @@ impl RenderThread { return; // no further handling of ctrl msgs } Startup { graph } => { + // Obtaining the current thread id invokes an allocation (on OSX) so let's take + // this hit on audio graph startup, so subsequent calls (needed for crossbeam) + // don't need to. + assert_no_alloc::permit_alloc(|| std::thread::current().id()); debug_assert!(self.graph.is_none()); self.graph = Some(graph); } @@ -192,7 +196,9 @@ impl RenderThread { let render_start = Instant::now(); // perform actual rendering - self.render_inner(output_buffer); + assert_no_alloc::assert_no_alloc(|| { + self.render_inner(output_buffer); + }); // calculate load value and ship to control thread if let Some(load_value_sender) = &self.load_value_sender { From c6146a9d4c9688f438ac4b837cf9977a8e2cf434 Mon Sep 17 00:00:00 2001 From: Otto Date: Fri, 27 Oct 2023 13:42:53 +0200 Subject: [PATCH 4/6] Introduce NodeCollection::with_capacity --- src/render/graph.rs | 2 +- src/render/node_collection.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/render/graph.rs b/src/render/graph.rs index b09216f6..2ccab9ce 100644 --- a/src/render/graph.rs +++ b/src/render/graph.rs @@ -100,7 +100,7 @@ pub(crate) struct Graph { impl Graph { pub fn new(reclaim_id_channel: llq::Producer) -> Self { Graph { - nodes: NodeCollection::new(), + nodes: NodeCollection::with_capacity(INITIAL_GRAPH_SIZE), alloc: Alloc::with_capacity(INITIAL_CHANNEL_DATA_COUNT), reclaim_id_channel, ordered: Vec::with_capacity(INITIAL_GRAPH_SIZE), diff --git a/src/render/node_collection.rs b/src/render/node_collection.rs index b5fa4df0..e164d622 100644 --- a/src/render/node_collection.rs +++ b/src/render/node_collection.rs @@ -7,11 +7,11 @@ pub struct NodeCollection { } impl NodeCollection { - pub fn new() -> Self { + pub fn with_capacity(capacity: usize) -> Self { let mut instance = Self { - nodes: Vec::with_capacity(64), + nodes: Vec::with_capacity(capacity), }; - instance.ensure_capacity(64); + instance.ensure_capacity(capacity); instance } From b71f3cc872a61780d2f356885be5917aee458398 Mon Sep 17 00:00:00 2001 From: Otto Date: Fri, 27 Oct 2023 13:43:44 +0200 Subject: [PATCH 5/6] cargo fmt --- src/render/thread.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/render/thread.rs b/src/render/thread.rs index 4501fa77..55f2b188 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -223,7 +223,11 @@ impl RenderThread { // For x64 and aarch, process with denormal floats disabled (for performance, #194) #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))] no_denormals::no_denormals(|| self.render_inner(output_buffer)); - #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))] + #[cfg(not(any( + target_arch = "x86", + target_arch = "x86_64", + target_arch = "aarch64" + )))] self.render_inner(output_buffer); }); @@ -338,7 +342,8 @@ impl RenderThread { impl Drop for RenderThread { fn drop(&mut self) { if let Some(gc) = self.garbage_collector.as_mut() { - gc.borrow_mut().push(llq::Node::new(Box::new(TerminateGarbageCollectorThread))) + gc.borrow_mut() + .push(llq::Node::new(Box::new(TerminateGarbageCollectorThread))) } log::info!("Audio render thread has been dropped"); } From 4a1bb98afbddb8182da1c200bf2ab6d19e28402c Mon Sep 17 00:00:00 2001 From: Otto Date: Fri, 27 Oct 2023 14:01:18 +0200 Subject: [PATCH 6/6] Move GarbageCollector to a separate mod --- src/render/garbage_collector.rs | 60 +++++++++++++++++++++++++++++++ src/render/graph.rs | 5 +-- src/render/mod.rs | 5 ++- src/render/processor.rs | 12 +++---- src/render/thread.rs | 63 ++++++--------------------------- 5 files changed, 82 insertions(+), 63 deletions(-) create mode 100644 src/render/garbage_collector.rs diff --git a/src/render/garbage_collector.rs b/src/render/garbage_collector.rs new file mode 100644 index 00000000..5371d1be --- /dev/null +++ b/src/render/garbage_collector.rs @@ -0,0 +1,60 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use std::any::Any; +use std::time::Duration; + +type AnyChannel = llq::Producer>; + +#[derive(Clone, Default)] +pub(crate) struct GarbageCollector { + gc: Option>>, +} + +impl GarbageCollector { + pub fn deallocate_async(&self, value: llq::Node>) { + if let Some(gc) = self.gc.as_ref() { + gc.borrow_mut().push(value); + } + } + + pub fn spawn_garbage_collector_thread(&mut self) { + if self.gc.is_none() { + let (gc_producer, gc_consumer) = llq::Queue::new().split(); + spawn_garbage_collector_thread(gc_consumer); + self.gc = Some(Rc::new(RefCell::new(gc_producer))); + } + } +} + +// Controls the polling frequency of the garbage collector thread. +const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100); + +// Poison pill that terminates the garbage collector thread. +#[derive(Debug)] +pub(crate) struct TerminateGarbageCollectorThread; + +// Spawns a sidecar thread of the `RenderThread` for dropping resources. +fn spawn_garbage_collector_thread(consumer: llq::Consumer>) { + let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer)); +} + +fn run_garbage_collector_thread(mut consumer: llq::Consumer>) { + log::debug!("Entering garbage collector thread"); + loop { + if let Some(node) = consumer.pop() { + if node + .as_ref() + .downcast_ref::() + .is_some() + { + log::info!("Terminating garbage collector thread"); + break; + } + // Implicitly drop the received node. + } else { + std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT); + } + } + log::info!("Exiting garbage collector thread"); +} diff --git a/src/render/graph.rs b/src/render/graph.rs index 2ccab9ce..a3956401 100644 --- a/src/render/graph.rs +++ b/src/render/graph.rs @@ -501,6 +501,7 @@ impl Graph { #[cfg(test)] mod tests { use super::*; + use crate::render::GarbageCollector; #[derive(Debug, Clone)] struct TestNode { @@ -696,7 +697,7 @@ mod tests { sample_rate: 48000., node_id: std::cell::Cell::new(AudioNodeId(0)), event_sender: None, - garbage_collector: None, + garbage_collector: GarbageCollector::default(), }; graph.render(&scope); @@ -748,7 +749,7 @@ mod tests { sample_rate: 48000., node_id: std::cell::Cell::new(AudioNodeId(0)), event_sender: None, - garbage_collector: None, + garbage_collector: GarbageCollector::default(), }; graph.render(&scope); diff --git a/src/render/mod.rs b/src/render/mod.rs index b05642f7..e0ccdb58 100644 --- a/src/render/mod.rs +++ b/src/render/mod.rs @@ -10,9 +10,12 @@ pub(crate) use thread::*; // public mods mod processor; pub use processor::*; + mod quantum; +pub use quantum::*; mod node_collection; pub(crate) use node_collection::NodeCollection; -pub use quantum::*; +mod garbage_collector; +pub(crate) use garbage_collector::{GarbageCollector, TerminateGarbageCollectorThread}; diff --git a/src/render/processor.rs b/src/render/processor.rs index c75dd406..6d5e4c30 100644 --- a/src/render/processor.rs +++ b/src/render/processor.rs @@ -3,11 +3,10 @@ use crate::context::{AudioNodeId, AudioParamId}; use crate::events::{ErrorEvent, EventDispatch}; use crate::{Event, RENDER_QUANTUM_SIZE}; -use super::{graph::Node, AudioRenderQuantum, NodeCollection}; +use super::{graph::Node, AudioRenderQuantum, GarbageCollector, NodeCollection}; use crossbeam_channel::Sender; -use std::cell::{Cell, RefCell}; -use std::rc::Rc; +use std::cell::Cell; use std::any::Any; use std::ops::Deref; @@ -25,7 +24,7 @@ pub struct RenderScope { pub(crate) node_id: Cell, pub(crate) event_sender: Option>, - pub(crate) garbage_collector: Option>>>>, + pub(crate) garbage_collector: GarbageCollector, } impl RenderScope { @@ -64,11 +63,8 @@ impl RenderScope { } } - #[allow(dead_code)] pub(crate) fn deallocate_async(&self, value: llq::Node>) { - if let Some(gc) = self.garbage_collector.as_ref() { - gc.borrow_mut().push(value); - } + self.garbage_collector.deallocate_async(value) } } diff --git a/src/render/thread.rs b/src/render/thread.rs index 55f2b188..537b0989 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -1,22 +1,19 @@ //! Communicates with the control thread and ships audio samples to the hardware -use std::any::Any; -use std::cell::{Cell, RefCell}; -use std::rc::Rc; +use std::cell::Cell; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use crossbeam_channel::{Receiver, Sender}; use dasp_sample::FromSample; -use super::AudioRenderQuantum; +use super::{AudioRenderQuantum, GarbageCollector, RenderScope}; use crate::buffer::{AudioBuffer, AudioBufferOptions}; use crate::context::AudioNodeId; use crate::events::EventDispatch; use crate::message::ControlMessage; use crate::node::ChannelInterpretation; -use crate::render::RenderScope; use crate::{AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE}; use super::graph::Graph; @@ -33,7 +30,7 @@ pub(crate) struct RenderThread { buffer_offset: Option<(usize, AudioRenderQuantum)>, load_value_sender: Option>, event_sender: Option>, - garbage_collector: Option>>>>, + garbage_collector: GarbageCollector, } // SAFETY: @@ -63,7 +60,7 @@ impl RenderThread { buffer_offset: None, load_value_sender: None, event_sender: None, - garbage_collector: None, + garbage_collector: GarbageCollector::default(), } } @@ -77,11 +74,7 @@ impl RenderThread { } pub(crate) fn spawn_garbage_collector_thread(&mut self) { - if self.garbage_collector.is_none() { - let (gc_producer, gc_consumer) = llq::Queue::new().split(); - spawn_garbage_collector_thread(gc_consumer); - self.garbage_collector = Some(Rc::new(RefCell::new(gc_producer))); - } + self.garbage_collector.spawn_garbage_collector_thread() } fn handle_control_messages(&mut self) { @@ -149,9 +142,7 @@ impl RenderThread { } NodeMessage { id, mut msg } => { self.graph.as_mut().unwrap().route_message(id, msg.as_mut()); - if let Some(gc) = self.garbage_collector.as_ref() { - gc.borrow_mut().push(msg) - } + self.garbage_collector.deallocate_async(msg); } } } @@ -341,42 +332,10 @@ impl RenderThread { impl Drop for RenderThread { fn drop(&mut self) { - if let Some(gc) = self.garbage_collector.as_mut() { - gc.borrow_mut() - .push(llq::Node::new(Box::new(TerminateGarbageCollectorThread))) - } + self.garbage_collector + .deallocate_async(llq::Node::new(Box::new( + super::TerminateGarbageCollectorThread, + ))); log::info!("Audio render thread has been dropped"); } } - -// Controls the polling frequency of the garbage collector thread. -const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100); - -// Poison pill that terminates the garbage collector thread. -#[derive(Debug)] -struct TerminateGarbageCollectorThread; - -// Spawns a sidecar thread of the `RenderThread` for dropping resources. -fn spawn_garbage_collector_thread(consumer: llq::Consumer>) { - let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer)); -} - -fn run_garbage_collector_thread(mut consumer: llq::Consumer>) { - log::debug!("Entering garbage collector thread"); - loop { - if let Some(node) = consumer.pop() { - if node - .as_ref() - .downcast_ref::() - .is_some() - { - log::info!("Terminating garbage collector thread"); - break; - } - // Implicitly drop the received node. - } else { - std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT); - } - } - log::info!("Exiting garbage collector thread"); -}