diff --git a/Cargo.toml b/Cargo.toml index 06e0349a..b3e65803 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,9 +32,9 @@ rubato = "0.14" smallvec = "1.11" symphonia = { version = "0.5", default-features = false } vecmath = "1.0" +assert_no_alloc = { git = "https://github.com/Windfisch/rust-assert-no-alloc/", default-features = false, features = ["backtrace"] } [dev-dependencies] -alloc_counter = "0.0.4" env_logger = "0.10" iai = "0.1.1" rand = "0.8" diff --git a/examples/audio_buffer_source_events.rs b/examples/audio_buffer_source_events.rs index 18d69e2f..d330236d 100644 --- a/examples/audio_buffer_source_events.rs +++ b/examples/audio_buffer_source_events.rs @@ -41,6 +41,7 @@ fn main() { let now = context.current_time(); src.start_at(now); src.stop_at(now + 1.); + drop(src); std::thread::sleep(std::time::Duration::from_secs(4)); } diff --git a/src/buffer.rs b/src/buffer.rs index 71824186..cd97c4ae 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -66,8 +66,8 @@ pub struct AudioBufferOptions { /// #[derive(Clone, Debug)] pub struct AudioBuffer { - pub(crate) channels: Vec, - pub(crate) sample_rate: f32, + channels: Vec, + sample_rate: f32, } impl AudioBuffer { @@ -91,6 +91,14 @@ impl AudioBuffer { } } + /// Creates an invalid, but non-allocating AudioBuffer to be used as placeholder + pub(crate) fn tombstone() -> Self { + Self { + channels: Default::default(), + sample_rate: Default::default(), + } + } + /// Convert raw samples to an AudioBuffer /// /// The outer Vec determine the channels. The inner Vecs should have the same length. diff --git a/src/lib.rs b/src/lib.rs index 58c5a939..981de94c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,9 @@ #![warn(clippy::clone_on_ref_ptr)] #![deny(trivial_numeric_casts)] +#[global_allocator] +static A: assert_no_alloc::AllocDisabler = assert_no_alloc::AllocDisabler; + use std::error::Error; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; diff --git a/src/node/audio_buffer_source.rs b/src/node/audio_buffer_source.rs index 4fdeb39b..1b012467 100644 --- a/src/node/audio_buffer_source.rs +++ b/src/node/audio_buffer_source.rs @@ -210,6 +210,7 @@ impl AudioBufferSourceNode { loop_state: loop_state.clone(), render_state: AudioBufferRendererState::default(), ended_triggered: false, + buffer_deallocator: Some(llq::Node::new(Box::new(AudioBuffer::tombstone()))), }; let mut node = Self { @@ -371,6 +372,7 @@ struct AudioBufferSourceRenderer { loop_state: LoopState, render_state: AudioBufferRendererState, ended_triggered: bool, + buffer_deallocator: Option>>, } impl AudioBufferSourceRenderer { @@ -387,6 +389,26 @@ impl AudioBufferSourceRenderer { ControlMessage::LoopEnd(loop_end) => self.loop_state.end = *loop_end, } } + + fn on_end(&mut self, scope: &RenderScope) { + if self.ended_triggered { + return; // only run once + } + self.ended_triggered = true; + + // notify the control thread + scope.send_ended_event(); + + // deallocate the AudioBuffer asynchronously - not in the render thread + if let Some(buffer) = self.buffer.take() { + // the holder contains a tombstone AudioBuffer that can be dropped without deallocation + let mut holder = self.buffer_deallocator.take().unwrap(); + // replace the contents of the holder with the actual buffer + *holder.downcast_mut().unwrap() = buffer; + // ship the buffer to the deallocator thread + scope.deallocate_async(holder); + } + } } impl AudioProcessor for AudioBufferSourceRenderer { @@ -453,13 +475,7 @@ impl AudioProcessor for AudioBufferSourceRenderer { || self.render_state.buffer_time_elapsed >= self.duration { output.make_silent(); // also converts to mono - - // @note: we need this check because this is called a until the program - // ends, such as if the node was never removed from the graph - if !self.ended_triggered { - scope.send_ended_event(); - self.ended_triggered = true; - } + self.on_end(scope); return false; } @@ -471,19 +487,13 @@ impl AudioProcessor for AudioBufferSourceRenderer { if !is_looping { if computed_playback_rate > 0. && buffer_time >= buffer_duration { output.make_silent(); // also converts to mono - if !self.ended_triggered { - scope.send_ended_event(); - self.ended_triggered = true; - } + self.on_end(scope); return false; } if computed_playback_rate < 0. && buffer_time < 0. { output.make_silent(); // also converts to mono - if !self.ended_triggered { - scope.send_ended_event(); - self.ended_triggered = true; - } + self.on_end(scope); return false; } } @@ -764,11 +774,7 @@ impl AudioProcessor for AudioBufferSourceRenderer { std::mem::swap(current_buffer, buffer); } else { // Creating the tombstone buffer does not cause allocations. - let tombstone_buffer = AudioBuffer { - channels: Default::default(), - sample_rate: Default::default(), - }; - self.buffer = Some(std::mem::replace(buffer, tombstone_buffer)); + self.buffer = Some(std::mem::replace(buffer, AudioBuffer::tombstone())); } return; }; diff --git a/src/param.rs b/src/param.rs index ec794ceb..8eaaf1ca 100644 --- a/src/param.rs +++ b/src/param.rs @@ -128,7 +128,7 @@ struct AudioParamEventTimeline { impl AudioParamEventTimeline { fn new() -> Self { Self { - inner: Vec::new(), + inner: Vec::with_capacity(5), dirty: false, } } diff --git a/src/render/garbage_collector.rs b/src/render/garbage_collector.rs new file mode 100644 index 00000000..5371d1be --- /dev/null +++ b/src/render/garbage_collector.rs @@ -0,0 +1,60 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use std::any::Any; +use std::time::Duration; + +type AnyChannel = llq::Producer>; + +#[derive(Clone, Default)] +pub(crate) struct GarbageCollector { + gc: Option>>, +} + +impl GarbageCollector { + pub fn deallocate_async(&self, value: llq::Node>) { + if let Some(gc) = self.gc.as_ref() { + gc.borrow_mut().push(value); + } + } + + pub fn spawn_garbage_collector_thread(&mut self) { + if self.gc.is_none() { + let (gc_producer, gc_consumer) = llq::Queue::new().split(); + spawn_garbage_collector_thread(gc_consumer); + self.gc = Some(Rc::new(RefCell::new(gc_producer))); + } + } +} + +// Controls the polling frequency of the garbage collector thread. +const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100); + +// Poison pill that terminates the garbage collector thread. +#[derive(Debug)] +pub(crate) struct TerminateGarbageCollectorThread; + +// Spawns a sidecar thread of the `RenderThread` for dropping resources. +fn spawn_garbage_collector_thread(consumer: llq::Consumer>) { + let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer)); +} + +fn run_garbage_collector_thread(mut consumer: llq::Consumer>) { + log::debug!("Entering garbage collector thread"); + loop { + if let Some(node) = consumer.pop() { + if node + .as_ref() + .downcast_ref::() + .is_some() + { + log::info!("Terminating garbage collector thread"); + break; + } + // Implicitly drop the received node. + } else { + std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT); + } + } + log::info!("Exiting garbage collector thread"); +} diff --git a/src/render/graph.rs b/src/render/graph.rs index 34b07111..fdfde75f 100644 --- a/src/render/graph.rs +++ b/src/render/graph.rs @@ -10,6 +10,9 @@ use super::{Alloc, AudioParamValues, AudioProcessor, AudioRenderQuantum, NodeCol use crate::node::ChannelConfig; use crate::render::RenderScope; +const INITIAL_GRAPH_SIZE: usize = 16; +const INITIAL_CHANNEL_DATA_COUNT: usize = INITIAL_GRAPH_SIZE * 4; + /// Connection between two audio nodes struct OutgoingEdge { /// index of the current Nodes output port @@ -27,9 +30,9 @@ pub struct Node { /// Renderer: converts inputs to outputs processor: Box, /// Reusable input buffers - inputs: Vec, + inputs: SmallVec<[AudioRenderQuantum; 2]>, /// Reusable output buffers, consumed by subsequent Nodes in this graph - outputs: Vec, + outputs: SmallVec<[AudioRenderQuantum; 2]>, /// Channel configuration: determines up/down-mixing of inputs channel_config: ChannelConfig, /// Outgoing edges: tuple of outcoming node reference, our output index and their input index @@ -97,14 +100,14 @@ pub(crate) struct Graph { impl Graph { pub fn new(reclaim_id_channel: llq::Producer) -> Self { Graph { - nodes: NodeCollection::new(), - alloc: Alloc::with_capacity(64), + nodes: NodeCollection::with_capacity(INITIAL_GRAPH_SIZE), + alloc: Alloc::with_capacity(INITIAL_CHANNEL_DATA_COUNT), reclaim_id_channel, - ordered: vec![], - marked: vec![], - marked_temp: vec![], - in_cycle: vec![], - cycle_breakers: vec![], + ordered: Vec::with_capacity(INITIAL_GRAPH_SIZE), + marked: Vec::with_capacity(INITIAL_GRAPH_SIZE), + marked_temp: Vec::with_capacity(INITIAL_GRAPH_SIZE), + in_cycle: Vec::with_capacity(INITIAL_GRAPH_SIZE), + cycle_breakers: Vec::with_capacity(INITIAL_GRAPH_SIZE), } } @@ -127,8 +130,8 @@ impl Graph { // set input and output buffers to single channel of silence, will be upmixed when // necessary - let inputs = vec![AudioRenderQuantum::from(self.alloc.silence()); number_of_inputs]; - let outputs = vec![AudioRenderQuantum::from(self.alloc.silence()); number_of_outputs]; + let inputs = smallvec![AudioRenderQuantum::from(self.alloc.silence()); number_of_inputs]; + let outputs = smallvec![AudioRenderQuantum::from(self.alloc.silence()); number_of_outputs]; self.nodes.insert( index, @@ -487,6 +490,7 @@ impl Graph { #[cfg(test)] mod tests { use super::*; + use crate::render::GarbageCollector; #[derive(Debug, Clone)] struct TestNode { @@ -682,6 +686,7 @@ mod tests { sample_rate: 48000., node_id: std::cell::Cell::new(AudioNodeId(0)), event_sender: None, + garbage_collector: GarbageCollector::default(), }; graph.render(&scope); @@ -733,6 +738,7 @@ mod tests { sample_rate: 48000., node_id: std::cell::Cell::new(AudioNodeId(0)), event_sender: None, + garbage_collector: GarbageCollector::default(), }; graph.render(&scope); diff --git a/src/render/mod.rs b/src/render/mod.rs index b05642f7..e0ccdb58 100644 --- a/src/render/mod.rs +++ b/src/render/mod.rs @@ -10,9 +10,12 @@ pub(crate) use thread::*; // public mods mod processor; pub use processor::*; + mod quantum; +pub use quantum::*; mod node_collection; pub(crate) use node_collection::NodeCollection; -pub use quantum::*; +mod garbage_collector; +pub(crate) use garbage_collector::{GarbageCollector, TerminateGarbageCollectorThread}; diff --git a/src/render/node_collection.rs b/src/render/node_collection.rs index 69204abb..6c735a08 100644 --- a/src/render/node_collection.rs +++ b/src/render/node_collection.rs @@ -9,11 +9,11 @@ pub(crate) struct NodeCollection { } impl NodeCollection { - pub fn new() -> Self { + pub fn with_capacity(capacity: usize) -> Self { let mut instance = Self { - nodes: Vec::with_capacity(64), + nodes: Vec::with_capacity(capacity), }; - instance.ensure_capacity(64); + instance.ensure_capacity(capacity); instance } diff --git a/src/render/processor.rs b/src/render/processor.rs index 8fee65bb..cd9e93a1 100644 --- a/src/render/processor.rs +++ b/src/render/processor.rs @@ -3,7 +3,7 @@ use crate::context::{AudioNodeId, AudioParamId}; use crate::events::{ErrorEvent, EventDispatch}; use crate::{Event, RENDER_QUANTUM_SIZE}; -use super::{graph::Node, AudioRenderQuantum, NodeCollection}; +use super::{graph::Node, AudioRenderQuantum, GarbageCollector, NodeCollection}; use crossbeam_channel::Sender; use std::cell::Cell; @@ -24,6 +24,7 @@ pub struct RenderScope { pub(crate) node_id: Cell, pub(crate) event_sender: Option>, + pub(crate) garbage_collector: GarbageCollector, } impl RenderScope { @@ -61,6 +62,10 @@ impl RenderScope { let _ = sender.try_send(EventDispatch::processor_error(self.node_id.get(), event)); } } + + pub(crate) fn deallocate_async(&self, value: llq::Node>) { + self.garbage_collector.deallocate_async(value) + } } /// Interface for audio processing code that runs on the audio rendering thread. diff --git a/src/render/quantum.rs b/src/render/quantum.rs index 6f7a2479..e1ac7256 100644 --- a/src/render/quantum.rs +++ b/src/render/quantum.rs @@ -681,7 +681,7 @@ mod tests { let alloc = Alloc::with_capacity(2); assert_eq!(alloc.pool_size(), 2); - alloc_counter::deny_alloc(|| { + assert_no_alloc::assert_no_alloc(|| { { // take a buffer out of the pool let a = alloc.allocate(); @@ -712,7 +712,7 @@ mod tests { let a = alloc.allocate(); let b = alloc.allocate(); - let c = alloc_counter::allow_alloc(|| { + let c = assert_no_alloc::permit_alloc(|| { // we can allocate beyond the pool size let c = alloc.allocate(); assert_eq!(alloc.pool_size(), 0); @@ -728,7 +728,7 @@ mod tests { }; // dropping c will cause a re-allocation: the pool capacity is extended - alloc_counter::allow_alloc(move || { + assert_no_alloc::permit_alloc(move || { std::mem::drop(c); }); diff --git a/src/render/thread.rs b/src/render/thread.rs index 606fb492..537b0989 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -1,21 +1,19 @@ //! Communicates with the control thread and ships audio samples to the hardware -use std::any::Any; use std::cell::Cell; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use crossbeam_channel::{Receiver, Sender}; use dasp_sample::FromSample; -use super::AudioRenderQuantum; +use super::{AudioRenderQuantum, GarbageCollector, RenderScope}; use crate::buffer::{AudioBuffer, AudioBufferOptions}; use crate::context::AudioNodeId; use crate::events::EventDispatch; use crate::message::ControlMessage; use crate::node::ChannelInterpretation; -use crate::render::RenderScope; use crate::{AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE}; use super::graph::Graph; @@ -32,7 +30,7 @@ pub(crate) struct RenderThread { buffer_offset: Option<(usize, AudioRenderQuantum)>, load_value_sender: Option>, event_sender: Option>, - garbage_collector: Option>>, + garbage_collector: GarbageCollector, } // SAFETY: @@ -62,7 +60,7 @@ impl RenderThread { buffer_offset: None, load_value_sender: None, event_sender: None, - garbage_collector: None, + garbage_collector: GarbageCollector::default(), } } @@ -76,11 +74,7 @@ impl RenderThread { } pub(crate) fn spawn_garbage_collector_thread(&mut self) { - if self.garbage_collector.is_none() { - let (gc_producer, gc_consumer) = llq::Queue::new().split(); - spawn_garbage_collector_thread(gc_consumer); - self.garbage_collector = Some(gc_producer); - } + self.garbage_collector.spawn_garbage_collector_thread() } fn handle_control_messages(&mut self) { @@ -139,14 +133,16 @@ impl RenderThread { return; // no further handling of ctrl msgs } Startup { graph } => { + // Obtaining the current thread id invokes an allocation (on OSX) so let's take + // this hit on audio graph startup, so subsequent calls (needed for crossbeam) + // don't need to. + assert_no_alloc::permit_alloc(|| std::thread::current().id()); debug_assert!(self.graph.is_none()); self.graph = Some(graph); } NodeMessage { id, mut msg } => { self.graph.as_mut().unwrap().route_message(id, msg.as_mut()); - if let Some(gc) = self.garbage_collector.as_mut() { - gc.push(msg) - } + self.garbage_collector.deallocate_async(msg); } } } @@ -183,6 +179,7 @@ impl RenderThread { sample_rate: self.sample_rate, event_sender: self.event_sender.clone(), node_id: Cell::new(AudioNodeId(0)), // placeholder value + garbage_collector: self.garbage_collector.clone(), }; // Render audio graph @@ -213,12 +210,17 @@ impl RenderThread { let render_start = Instant::now(); // Perform actual rendering - - // For x64 and aarch, process with denormal floats disabled (for performance, #194) - #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))] - no_denormals::no_denormals(|| self.render_inner(output_buffer)); - #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64")))] - self.render_inner(output_buffer); + assert_no_alloc::assert_no_alloc(|| { + // For x64 and aarch, process with denormal floats disabled (for performance, #194) + #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))] + no_denormals::no_denormals(|| self.render_inner(output_buffer)); + #[cfg(not(any( + target_arch = "x86", + target_arch = "x86_64", + target_arch = "aarch64" + )))] + self.render_inner(output_buffer); + }); // calculate load value and ship to control thread if let Some(load_value_sender) = &self.load_value_sender { @@ -292,6 +294,7 @@ impl RenderThread { sample_rate: self.sample_rate, event_sender: self.event_sender.clone(), node_id: Cell::new(AudioNodeId(0)), // placeholder value + garbage_collector: self.garbage_collector.clone(), }; // render audio graph, clone it in case we need to mutate/store the value later @@ -329,41 +332,10 @@ impl RenderThread { impl Drop for RenderThread { fn drop(&mut self) { - if let Some(gc) = self.garbage_collector.as_mut() { - gc.push(llq::Node::new(Box::new(TerminateGarbageCollectorThread))) - } + self.garbage_collector + .deallocate_async(llq::Node::new(Box::new( + super::TerminateGarbageCollectorThread, + ))); log::info!("Audio render thread has been dropped"); } } - -// Controls the polling frequency of the garbage collector thread. -const GARBAGE_COLLECTOR_THREAD_TIMEOUT: Duration = Duration::from_millis(100); - -// Poison pill that terminates the garbage collector thread. -#[derive(Debug)] -struct TerminateGarbageCollectorThread; - -// Spawns a sidecar thread of the `RenderThread` for dropping resources. -fn spawn_garbage_collector_thread(consumer: llq::Consumer>) { - let _join_handle = std::thread::spawn(move || run_garbage_collector_thread(consumer)); -} - -fn run_garbage_collector_thread(mut consumer: llq::Consumer>) { - log::info!("Entering garbage collector thread"); - loop { - if let Some(node) = consumer.pop() { - if node - .as_ref() - .downcast_ref::() - .is_some() - { - log::info!("Terminating garbage collector thread"); - break; - } - // Implicitly drop the received node. - } else { - std::thread::sleep(GARBAGE_COLLECTOR_THREAD_TIMEOUT); - } - } - log::info!("Exiting garbage collector thread"); -}