From 9411e5e7641f1ee687677afa5a5b7fc390eefe0e Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 4 Jul 2025 11:47:41 +0200 Subject: [PATCH] extract `object_pool` module from PagePool --- Cargo.lock | 1 + crates/core/src/worker_metrics/mod.rs | 8 +- crates/data-structures/Cargo.toml | 1 + crates/data-structures/src/lib.rs | 1 + crates/data-structures/src/object_pool.rs | 310 ++++++++++++++++++++++ crates/table/src/page_pool.rs | 215 +++------------ 6 files changed, 347 insertions(+), 189 deletions(-) create mode 100644 crates/data-structures/src/object_pool.rs diff --git a/Cargo.lock b/Cargo.lock index 2063360a8f4..6c3e66c3aa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5572,6 +5572,7 @@ name = "spacetimedb-data-structures" version = "1.2.0" dependencies = [ "ahash 0.8.12", + "crossbeam-queue", "hashbrown 0.15.3", "nohash-hasher", "serde", diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 0f50b2a07a1..f7beedcdcfb 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -326,10 +326,10 @@ pub fn spawn_page_pool_stats(node_id: String, page_pool: PagePool) { loop { resident_bytes.set(page_pool.heap_usage() as i64); - dropped_pages.set(page_pool.dropped_pages_count() as i64); - new_pages.set(page_pool.new_pages_allocated_count() as i64); - reused_pages.set(page_pool.pages_reused_count() as i64); - returned_pages.set(page_pool.pages_reused_count() as i64); + dropped_pages.set(page_pool.dropped_count() as i64); + new_pages.set(page_pool.new_allocated_count() as i64); + reused_pages.set(page_pool.reused_count() as i64); + returned_pages.set(page_pool.reused_count() as i64); sleep(Duration::from_secs(10)).await; } diff --git a/crates/data-structures/Cargo.toml b/crates/data-structures/Cargo.toml index c9a82701db2..d299ab19ab3 100644 --- a/crates/data-structures/Cargo.toml +++ b/crates/data-structures/Cargo.toml @@ -12,6 +12,7 @@ serde = ["dep:serde", "hashbrown/serde"] [dependencies] spacetimedb-memory-usage = { workspace = true, optional = true, default-features = false } ahash.workspace = true +crossbeam-queue.workspace = true hashbrown.workspace = true nohash-hasher.workspace = true serde = { workspace = true, optional = true } diff --git a/crates/data-structures/src/lib.rs b/crates/data-structures/src/lib.rs index 62ba955dfc5..6a947b319b3 100644 --- a/crates/data-structures/src/lib.rs +++ b/crates/data-structures/src/lib.rs @@ -4,4 +4,5 @@ pub mod error_stream; pub mod map; pub mod nstr; +pub mod object_pool; pub mod slim_slice; diff --git a/crates/data-structures/src/object_pool.rs b/crates/data-structures/src/object_pool.rs new file mode 100644 index 00000000000..c05f864d30d --- /dev/null +++ b/crates/data-structures/src/object_pool.rs @@ -0,0 +1,310 @@ +use core::any::type_name; +use core::fmt; +use core::sync::atomic::{AtomicUsize, Ordering}; +use crossbeam_queue::ArrayQueue; +use std::sync::Arc; + +#[cfg(not(feature = "memory-usage"))] +/// An object that can be put into a [`Pool`]. +pub trait PooledObject {} + +#[cfg(feature = "memory-usage")] +/// An object that can be put into a [`Pool`]. +/// +/// The trait exposes hooks that the pool needs +/// so that it can e.g., implement `MemoryUsage`. +pub trait PooledObject: spacetimedb_memory_usage::MemoryUsage { + /// The storage for the number of bytes in the pool. + /// + /// When each object in the pool takes up the same size, this can be `()`. + /// Otherwise, it will typically be [`AtomicUsize`]. + type ResidentBytesStorage: Default; + + /// Returns the number of bytes resident in the pool. + /// + /// The `storage` is provided as well as the `num_objects` in the pool. + /// Typically, exactly one of these will be used. + fn resident_object_bytes(storage: &Self::ResidentBytesStorage, num_objects: usize) -> usize; + + /// Called by the pool to add `bytes` to `storage`, if necessary. + fn add_to_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize); + + /// Called by the pool to subtract `bytes` from `storage`, if necessary. + fn sub_from_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize); +} + +/// A pool of some objects of type `T`. +pub struct Pool { + inner: Arc>, +} + +impl fmt::Debug for Pool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let dropped = self.dropped_count(); + let new = self.new_allocated_count(); + let reused = self.reused_count(); + let returned = self.returned_count(); + + #[cfg(feature = "memory-usage")] + let bytes = T::resident_object_bytes(&self.inner.resident_object_bytes, self.inner.objects.len()); + + let mut builder = f.debug_struct(&format!("Pool<{}>", type_name::())); + + #[cfg(feature = "memory-usage")] + let builder = builder.field("resident_object_bytes", &bytes); + + builder + .field("dropped_count", &dropped) + .field("new_allocated_count", &new) + .field("reused_count", &reused) + .field("returned_count", &returned) + .finish() + } +} + +impl Clone for Pool { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { inner } + } +} + +#[cfg(feature = "memory-usage")] +impl spacetimedb_memory_usage::MemoryUsage for Pool { + fn heap_usage(&self) -> usize { + let Self { inner } = self; + inner.heap_usage() + } +} + +impl Pool { + /// Returns a new pool with a maximum capacity of `cap`. + /// This capacity is fixed over the lifetime of the pool. + pub fn new(cap: usize) -> Self { + let inner = Arc::new(Inner::new(cap)); + Self { inner } + } + + /// Puts back an object into the pool. + pub fn put(&self, object: T) { + self.inner.put(object); + } + + /// Puts back an object into the pool. + pub fn put_many(&self, objects: impl Iterator) { + for obj in objects { + self.put(obj); + } + } + + /// Takes an object from the pool or creates a new one. + pub fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T { + self.inner.take(clear, new) + } + + /// Returns the number of pages dropped by the pool because the pool was at capacity. + pub fn dropped_count(&self) -> usize { + self.inner.dropped_count.load(Ordering::Relaxed) + } + + /// Returns the number of fresh objects allocated through the pool. + pub fn new_allocated_count(&self) -> usize { + self.inner.new_allocated_count.load(Ordering::Relaxed) + } + + /// Returns the number of objects reused from the pool. + pub fn reused_count(&self) -> usize { + self.inner.reused_count.load(Ordering::Relaxed) + } + + /// Returns the number of objects returned to the pool. + pub fn returned_count(&self) -> usize { + self.inner.returned_count.load(Ordering::Relaxed) + } +} + +/// The inner actual page pool containing all the logic. +struct Inner { + objects: ArrayQueue, + dropped_count: AtomicUsize, + new_allocated_count: AtomicUsize, + reused_count: AtomicUsize, + returned_count: AtomicUsize, + + #[cfg(feature = "memory-usage")] + resident_object_bytes: T::ResidentBytesStorage, +} + +#[cfg(feature = "memory-usage")] +impl spacetimedb_memory_usage::MemoryUsage for Inner { + fn heap_usage(&self) -> usize { + let Self { + objects, + dropped_count, + new_allocated_count, + reused_count, + returned_count, + resident_object_bytes, + } = self; + dropped_count.heap_usage() + + new_allocated_count.heap_usage() + + reused_count.heap_usage() + + returned_count.heap_usage() + + // This is the amount the queue itself takes up on the heap. + objects.capacity() * size_of::<(AtomicUsize, T)>() + + // This is the amount the objects take up on the heap, excluding the static size. + T::resident_object_bytes(resident_object_bytes, objects.len()) + } +} + +#[inline] +fn inc(atomic: &AtomicUsize) { + atomic.fetch_add(1, Ordering::Relaxed); +} + +impl Inner { + /// Creates a new pool capable of holding `cap` objects. + fn new(cap: usize) -> Self { + let objects = ArrayQueue::new(cap); + Self { + objects, + dropped_count: <_>::default(), + new_allocated_count: <_>::default(), + reused_count: <_>::default(), + returned_count: <_>::default(), + + #[cfg(feature = "memory-usage")] + resident_object_bytes: <_>::default(), + } + } + + /// Puts back an object into the pool. + fn put(&self, object: T) { + #[cfg(feature = "memory-usage")] + let bytes = object.heap_usage(); + // Add it to the pool if there's room, or just drop it. + if self.objects.push(object).is_ok() { + #[cfg(feature = "memory-usage")] + T::add_to_resident_object_bytes(&self.resident_object_bytes, bytes); + + inc(&self.returned_count); + } else { + inc(&self.dropped_count); + } + } + + /// Takes an object from the pool or creates a new one. + /// + /// The closure `clear` provides the opportunity to clear the object before use. + /// The closure `new` is called to create a new object when the pool is empty. + fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T { + self.objects + .pop() + .map(|mut object| { + #[cfg(feature = "memory-usage")] + T::sub_from_resident_object_bytes(&self.resident_object_bytes, object.heap_usage()); + + inc(&self.reused_count); + clear(&mut object); + object + }) + .unwrap_or_else(|| { + inc(&self.new_allocated_count); + new() + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use core::{iter, ptr::addr_eq}; + + // The type of pools used for testing. + // We want to include a `Box` so that we can do pointer comparisons. + type P = Pool>; + + #[cfg(not(feature = "memory-usage"))] + impl PooledObject for Box {} + + #[cfg(feature = "memory-usage")] + impl PooledObject for Box { + type ResidentBytesStorage = (); + fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {} + fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {} + fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize { + num_objects * size_of::() + } + } + + fn new() -> P { + P::new(100) + } + + fn assert_metrics(pool: &P, dropped: usize, new: usize, reused: usize, returned: usize) { + assert_eq!(pool.dropped_count(), dropped); + assert_eq!(pool.new_allocated_count(), new); + assert_eq!(pool.reused_count(), reused); + assert_eq!(pool.returned_count(), returned); + } + + fn take(pool: &P) -> Box { + pool.take(|_| {}, || Box::new(0)) + } + + #[test] + fn pool_returns_same_obj() { + let pool = new(); + assert_metrics(&pool, 0, 0, 0, 0); + + // Create an object and put it back. + let obj1 = take(&pool); + assert_metrics(&pool, 0, 1, 0, 0); + let obj1_ptr = &*obj1 as *const _; + pool.put(obj1); + assert_metrics(&pool, 0, 1, 0, 1); + + // Extract an object again. + let obj2 = take(&pool); + assert_metrics(&pool, 0, 1, 1, 1); + let obj2_ptr = &*obj2 as *const _; + // It should be the same as the previous one. + assert!(addr_eq(obj1_ptr, obj2_ptr)); + pool.put(obj2); + assert_metrics(&pool, 0, 1, 1, 2); + + // Extract an object again. + let obj3 = take(&pool); + assert_metrics(&pool, 0, 1, 2, 2); + let obj3_ptr = &*obj3 as *const _; + // It should be the same as the previous one. + assert!(addr_eq(obj1_ptr, obj3_ptr)); + + // Manually create an object and put it in. + let obj4 = Box::new(0); + let obj4_ptr = &*obj4 as *const _; + pool.put(obj4); + pool.put(obj3); + assert_metrics(&pool, 0, 1, 2, 4); + // When we take out an object, it should be the same as `obj4` and not `obj1`. + let obj5 = take(&pool); + assert_metrics(&pool, 0, 1, 3, 4); + let obj5_ptr = &*obj5 as *const _; + // Same as obj4. + assert!(!addr_eq(obj5_ptr, obj1_ptr)); + assert!(addr_eq(obj5_ptr, obj4_ptr)); + } + + #[test] + fn pool_drops_past_max_size() { + const N: usize = 3; + let pool = P::new(N); + + let pages = iter::repeat_with(|| take(&pool)).take(N + 1).collect::>(); + assert_metrics(&pool, 0, N + 1, 0, 0); + + pool.put_many(pages.into_iter()); + assert_metrics(&pool, 1, N + 1, 0, N); + assert_eq!(pool.inner.objects.len(), N); + } +} diff --git a/crates/table/src/page_pool.rs b/crates/table/src/page_pool.rs index 4de3538d361..1eecf95e80f 100644 --- a/crates/table/src/page_pool.rs +++ b/crates/table/src/page_pool.rs @@ -2,26 +2,34 @@ use super::{ indexes::max_rows_in_page, page::{Page, PageHeader}, }; -use core::sync::atomic::{AtomicUsize, Ordering}; -use crossbeam_queue::ArrayQueue; +use derive_more::Deref; +use spacetimedb_data_structures::object_pool::{Pool, PooledObject}; use spacetimedb_sats::bsatn::{self, DecodeError}; use spacetimedb_sats::de::{ DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess, }; use spacetimedb_sats::layout::Size; use spacetimedb_sats::memory_usage::MemoryUsage; -use std::sync::Arc; + +impl PooledObject for Box { + type ResidentBytesStorage = (); + fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize { + // Each page takes up a fixed amount. + num_objects * size_of::() + } + fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {} + fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {} +} /// A page pool of currently unused pages available for use in [`Pages`](super::pages::Pages). -#[derive(Clone)] +#[derive(Clone, Deref)] pub struct PagePool { - inner: Arc, + pool: Pool>, } impl MemoryUsage for PagePool { fn heap_usage(&self) -> usize { - let Self { inner } = self; - inner.heap_usage() + self.pool.heap_usage() } } @@ -46,60 +54,31 @@ impl PagePool { const DEFAULT_MAX_SIZE: usize = 128 * PAGE_SIZE; // 128 pages let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE; - let inner = Arc::new(PagePoolInner::new(queue_size)); - Self { inner } - } - - /// Puts back a [`Page`] into the pool. - pub fn put(&self, page: Box) { - self.inner.put(page); - } - - /// Puts back a [`Page`] into the pool. - pub fn put_many(&self, pages: impl Iterator>) { - for page in pages { - self.put(page); - } + let pool = Pool::new(queue_size); + Self { pool } } /// Takes a [`Page`] from the pool or creates a new one. /// /// The returned page supports fixed rows of size `fixed_row_size`. pub fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box { - self.inner.take_with_fixed_row_size(fixed_row_size) + self.take_with_max_row_count(max_rows_in_page(fixed_row_size)) } /// Takes a [`Page`] from the pool or creates a new one. /// /// The returned page supports a maximum of `max_rows_in_page` rows. fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box { - self.inner.take_with_max_row_count(max_rows_in_page) + self.pool.take( + |page| page.reset_for(max_rows_in_page), + || Page::new_with_max_row_count(max_rows_in_page), + ) } /// Deserialize a page from `buf` but reuse the allocations in the pool. pub fn take_deserialize_from(&self, buf: &[u8]) -> Result, DecodeError> { self.deserialize(bsatn::Deserializer::new(&mut &*buf)) } - - /// Returns the number of pages dropped by the pool because the pool was at capacity. - pub fn dropped_pages_count(&self) -> usize { - self.inner.dropped_pages_count.load(Ordering::Relaxed) - } - - /// Returns the number of fresh pages allocated through the pool. - pub fn new_pages_allocated_count(&self) -> usize { - self.inner.new_pages_allocated_count.load(Ordering::Relaxed) - } - - /// Returns the number of pages reused from the pool. - pub fn pages_reused_count(&self) -> usize { - self.inner.pages_reused_count.load(Ordering::Relaxed) - } - - /// Returns the number of pages returned to the pool. - pub fn pages_returned_count(&self) -> usize { - self.inner.pages_returned_count.load(Ordering::Relaxed) - } } impl<'de> DeserializeSeed<'de> for &PagePool { @@ -142,166 +121,32 @@ impl<'de> ProductVisitor<'de> for &PagePool { } } -/// The inner actual page pool containing all the logic. -struct PagePoolInner { - pages: ArrayQueue>, - dropped_pages_count: AtomicUsize, - new_pages_allocated_count: AtomicUsize, - pages_reused_count: AtomicUsize, - pages_returned_count: AtomicUsize, -} - -impl MemoryUsage for PagePoolInner { - fn heap_usage(&self) -> usize { - let Self { - pages, - dropped_pages_count, - new_pages_allocated_count, - pages_reused_count, - pages_returned_count, - } = self; - dropped_pages_count.heap_usage() + - new_pages_allocated_count.heap_usage() + - pages_reused_count.heap_usage() + - pages_returned_count.heap_usage() + - // This is the amount the queue itself takes up on the heap. - pages.capacity() * size_of::<(AtomicUsize, Box)>() + - // Each page takes up a fixed amount. - pages.len() * size_of::() - } -} - -#[inline] -fn inc(atomic: &AtomicUsize) { - atomic.fetch_add(1, Ordering::Relaxed); -} - -impl PagePoolInner { - /// Creates a new page pool capable of holding `cap` pages. - fn new(cap: usize) -> Self { - let pages = ArrayQueue::new(cap); - Self { - pages, - dropped_pages_count: <_>::default(), - new_pages_allocated_count: <_>::default(), - pages_reused_count: <_>::default(), - pages_returned_count: <_>::default(), - } - } - - /// Puts back a [`Page`] into the pool. - fn put(&self, page: Box) { - // Add it to the pool if there's room, or just drop it. - if self.pages.push(page).is_ok() { - inc(&self.pages_returned_count); - } else { - inc(&self.dropped_pages_count); - } - } - - /// Takes a [`Page`] from the pool or creates a new one. - /// - /// The returned page supports a maximum of `max_rows_in_page` rows. - fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box { - self.pages - .pop() - .map(|mut page| { - inc(&self.pages_reused_count); - page.reset_for(max_rows_in_page); - page - }) - .unwrap_or_else(|| { - inc(&self.new_pages_allocated_count); - Page::new_with_max_row_count(max_rows_in_page) - }) - } - - /// Takes a [`Page`] from the pool or creates a new one. - /// - /// The returned page supports fixed rows of size `fixed_row_size`. - fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box { - self.take_with_max_row_count(max_rows_in_page(fixed_row_size)) - } -} - #[cfg(test)] mod tests { use super::*; - use core::{iter, ptr::addr_eq}; + use core::ptr::addr_eq; fn present_rows_ptr(page: &Page) -> *const () { page.page_header_for_test().present_rows_storage_ptr_for_test() } - fn assert_metrics(pool: &PagePool, dropped: usize, new: usize, reused: usize, returned: usize) { - assert_eq!(pool.dropped_pages_count(), dropped); - assert_eq!(pool.new_pages_allocated_count(), new); - assert_eq!(pool.pages_reused_count(), reused); - assert_eq!(pool.pages_returned_count(), returned); - } - #[test] - fn page_pool_returns_same_page() { + fn page_pool_bitset_reuse() { let pool = PagePool::new_for_test(); - assert_metrics(&pool, 0, 0, 0, 0); - // Create a page and put it back. let page1 = pool.take_with_max_row_count(10); - assert_metrics(&pool, 0, 1, 0, 0); - let page1_ptr = &*page1 as *const _; let page1_pr_ptr = present_rows_ptr(&page1); pool.put(page1); - assert_metrics(&pool, 0, 1, 0, 1); - // Extract a page again. + // Extract another page again, but use a different max row count (64). + // The bitset should be the same, as `10.div_ceil(64) == 64`. let page2 = pool.take_with_max_row_count(64); - assert_metrics(&pool, 0, 1, 1, 1); - let page2_ptr = &*page2 as *const _; - let page2_pr_ptr = present_rows_ptr(&page2); - // It should be the same as the previous one. - assert!(addr_eq(page1_ptr, page2_ptr)); - // And the bitset should also be the same, as `10.div_ceil(64) == 64`. - assert!(addr_eq(page1_pr_ptr, page2_pr_ptr)); + assert!(addr_eq(page1_pr_ptr, present_rows_ptr(&page2))); pool.put(page2); - assert_metrics(&pool, 0, 1, 1, 2); - // Extract a page again, but this time, go beyond the first block. + // Extract a page again, but this time, go beyond the first bitset block. let page3 = pool.take_with_max_row_count(64 + 1); - assert_metrics(&pool, 0, 1, 2, 2); - let page3_ptr = &*page3 as *const _; - let page3_pr_ptr = present_rows_ptr(&page3); - // It should be the same as the previous one. - assert!(addr_eq(page1_ptr, page3_ptr)); - // But the bitset should not be the same, as `65.div_ceil(64) == 2`. - assert!(!addr_eq(page1_pr_ptr, page3_pr_ptr)); - - // Manually create a page and put it in. - let page4 = Page::new_with_max_row_count(10); - let page4_ptr = &*page4 as *const _; - pool.put(page4); - pool.put(page3); - assert_metrics(&pool, 0, 1, 2, 4); - // When we take out a page, it should be the same as `page4` and not `page1`. - let page5 = pool.take_with_max_row_count(10); - assert_metrics(&pool, 0, 1, 3, 4); - let page5_ptr = &*page5 as *const _; - // Same as page4. - assert!(!addr_eq(page5_ptr, page1_ptr)); - assert!(addr_eq(page5_ptr, page4_ptr)); - } - - #[test] - fn page_pool_drops_past_max_size() { - const N: usize = 3; - let pool = PagePool::new(Some(size_of::() * N)); - - let pages = iter::repeat_with(|| pool.take_with_max_row_count(42)) - .take(N + 1) - .collect::>(); - assert_metrics(&pool, 0, N + 1, 0, 0); - - pool.put_many(pages.into_iter()); - assert_metrics(&pool, 1, N + 1, 0, N); - assert_eq!(pool.inner.pages.len(), N); + // The bitset should not be the same, as `65.div_ceil(64) == 2`. + assert!(!addr_eq(page1_pr_ptr, present_rows_ptr(&page3))); } }