Skip to content

Commit 6a8cd7e

Browse files
authored
Extract object_pool module from PagePool (#2920)
1 parent 1166d68 commit 6a8cd7e

File tree

6 files changed

+347
-189
lines changed

6 files changed

+347
-189
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/worker_metrics/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,10 +326,10 @@ pub fn spawn_page_pool_stats(node_id: String, page_pool: PagePool) {
326326

327327
loop {
328328
resident_bytes.set(page_pool.heap_usage() as i64);
329-
dropped_pages.set(page_pool.dropped_pages_count() as i64);
330-
new_pages.set(page_pool.new_pages_allocated_count() as i64);
331-
reused_pages.set(page_pool.pages_reused_count() as i64);
332-
returned_pages.set(page_pool.pages_reused_count() as i64);
329+
dropped_pages.set(page_pool.dropped_count() as i64);
330+
new_pages.set(page_pool.new_allocated_count() as i64);
331+
reused_pages.set(page_pool.reused_count() as i64);
332+
returned_pages.set(page_pool.reused_count() as i64);
333333

334334
sleep(Duration::from_secs(10)).await;
335335
}

crates/data-structures/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ serde = ["dep:serde", "hashbrown/serde"]
1212
[dependencies]
1313
spacetimedb-memory-usage = { workspace = true, optional = true, default-features = false }
1414
ahash.workspace = true
15+
crossbeam-queue.workspace = true
1516
hashbrown.workspace = true
1617
nohash-hasher.workspace = true
1718
serde = { workspace = true, optional = true }

crates/data-structures/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
pub mod error_stream;
55
pub mod map;
66
pub mod nstr;
7+
pub mod object_pool;
78
pub mod slim_slice;
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
use core::any::type_name;
2+
use core::fmt;
3+
use core::sync::atomic::{AtomicUsize, Ordering};
4+
use crossbeam_queue::ArrayQueue;
5+
use std::sync::Arc;
6+
7+
#[cfg(not(feature = "memory-usage"))]
8+
/// An object that can be put into a [`Pool<T>`].
9+
pub trait PooledObject {}
10+
11+
#[cfg(feature = "memory-usage")]
12+
/// An object that can be put into a [`Pool<T>`].
13+
///
14+
/// The trait exposes hooks that the pool needs
15+
/// so that it can e.g., implement `MemoryUsage`.
16+
pub trait PooledObject: spacetimedb_memory_usage::MemoryUsage {
17+
/// The storage for the number of bytes in the pool.
18+
///
19+
/// When each object in the pool takes up the same size, this can be `()`.
20+
/// Otherwise, it will typically be [`AtomicUsize`].
21+
type ResidentBytesStorage: Default;
22+
23+
/// Returns the number of bytes resident in the pool.
24+
///
25+
/// The `storage` is provided as well as the `num_objects` in the pool.
26+
/// Typically, exactly one of these will be used.
27+
fn resident_object_bytes(storage: &Self::ResidentBytesStorage, num_objects: usize) -> usize;
28+
29+
/// Called by the pool to add `bytes` to `storage`, if necessary.
30+
fn add_to_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
31+
32+
/// Called by the pool to subtract `bytes` from `storage`, if necessary.
33+
fn sub_from_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize);
34+
}
35+
36+
/// A pool of some objects of type `T`.
37+
pub struct Pool<T: PooledObject> {
38+
inner: Arc<Inner<T>>,
39+
}
40+
41+
impl<T: PooledObject> fmt::Debug for Pool<T> {
42+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43+
let dropped = self.dropped_count();
44+
let new = self.new_allocated_count();
45+
let reused = self.reused_count();
46+
let returned = self.returned_count();
47+
48+
#[cfg(feature = "memory-usage")]
49+
let bytes = T::resident_object_bytes(&self.inner.resident_object_bytes, self.inner.objects.len());
50+
51+
let mut builder = f.debug_struct(&format!("Pool<{}>", type_name::<T>()));
52+
53+
#[cfg(feature = "memory-usage")]
54+
let builder = builder.field("resident_object_bytes", &bytes);
55+
56+
builder
57+
.field("dropped_count", &dropped)
58+
.field("new_allocated_count", &new)
59+
.field("reused_count", &reused)
60+
.field("returned_count", &returned)
61+
.finish()
62+
}
63+
}
64+
65+
impl<T: PooledObject> Clone for Pool<T> {
66+
fn clone(&self) -> Self {
67+
let inner = self.inner.clone();
68+
Self { inner }
69+
}
70+
}
71+
72+
#[cfg(feature = "memory-usage")]
73+
impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Pool<T> {
74+
fn heap_usage(&self) -> usize {
75+
let Self { inner } = self;
76+
inner.heap_usage()
77+
}
78+
}
79+
80+
impl<T: PooledObject> Pool<T> {
81+
/// Returns a new pool with a maximum capacity of `cap`.
82+
/// This capacity is fixed over the lifetime of the pool.
83+
pub fn new(cap: usize) -> Self {
84+
let inner = Arc::new(Inner::new(cap));
85+
Self { inner }
86+
}
87+
88+
/// Puts back an object into the pool.
89+
pub fn put(&self, object: T) {
90+
self.inner.put(object);
91+
}
92+
93+
/// Puts back an object into the pool.
94+
pub fn put_many(&self, objects: impl Iterator<Item = T>) {
95+
for obj in objects {
96+
self.put(obj);
97+
}
98+
}
99+
100+
/// Takes an object from the pool or creates a new one.
101+
pub fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
102+
self.inner.take(clear, new)
103+
}
104+
105+
/// Returns the number of pages dropped by the pool because the pool was at capacity.
106+
pub fn dropped_count(&self) -> usize {
107+
self.inner.dropped_count.load(Ordering::Relaxed)
108+
}
109+
110+
/// Returns the number of fresh objects allocated through the pool.
111+
pub fn new_allocated_count(&self) -> usize {
112+
self.inner.new_allocated_count.load(Ordering::Relaxed)
113+
}
114+
115+
/// Returns the number of objects reused from the pool.
116+
pub fn reused_count(&self) -> usize {
117+
self.inner.reused_count.load(Ordering::Relaxed)
118+
}
119+
120+
/// Returns the number of objects returned to the pool.
121+
pub fn returned_count(&self) -> usize {
122+
self.inner.returned_count.load(Ordering::Relaxed)
123+
}
124+
}
125+
126+
/// The inner actual page pool containing all the logic.
127+
struct Inner<T: PooledObject> {
128+
objects: ArrayQueue<T>,
129+
dropped_count: AtomicUsize,
130+
new_allocated_count: AtomicUsize,
131+
reused_count: AtomicUsize,
132+
returned_count: AtomicUsize,
133+
134+
#[cfg(feature = "memory-usage")]
135+
resident_object_bytes: T::ResidentBytesStorage,
136+
}
137+
138+
#[cfg(feature = "memory-usage")]
139+
impl<T: PooledObject> spacetimedb_memory_usage::MemoryUsage for Inner<T> {
140+
fn heap_usage(&self) -> usize {
141+
let Self {
142+
objects,
143+
dropped_count,
144+
new_allocated_count,
145+
reused_count,
146+
returned_count,
147+
resident_object_bytes,
148+
} = self;
149+
dropped_count.heap_usage() +
150+
new_allocated_count.heap_usage() +
151+
reused_count.heap_usage() +
152+
returned_count.heap_usage() +
153+
// This is the amount the queue itself takes up on the heap.
154+
objects.capacity() * size_of::<(AtomicUsize, T)>() +
155+
// This is the amount the objects take up on the heap, excluding the static size.
156+
T::resident_object_bytes(resident_object_bytes, objects.len())
157+
}
158+
}
159+
160+
#[inline]
161+
fn inc(atomic: &AtomicUsize) {
162+
atomic.fetch_add(1, Ordering::Relaxed);
163+
}
164+
165+
impl<T: PooledObject> Inner<T> {
166+
/// Creates a new pool capable of holding `cap` objects.
167+
fn new(cap: usize) -> Self {
168+
let objects = ArrayQueue::new(cap);
169+
Self {
170+
objects,
171+
dropped_count: <_>::default(),
172+
new_allocated_count: <_>::default(),
173+
reused_count: <_>::default(),
174+
returned_count: <_>::default(),
175+
176+
#[cfg(feature = "memory-usage")]
177+
resident_object_bytes: <_>::default(),
178+
}
179+
}
180+
181+
/// Puts back an object into the pool.
182+
fn put(&self, object: T) {
183+
#[cfg(feature = "memory-usage")]
184+
let bytes = object.heap_usage();
185+
// Add it to the pool if there's room, or just drop it.
186+
if self.objects.push(object).is_ok() {
187+
#[cfg(feature = "memory-usage")]
188+
T::add_to_resident_object_bytes(&self.resident_object_bytes, bytes);
189+
190+
inc(&self.returned_count);
191+
} else {
192+
inc(&self.dropped_count);
193+
}
194+
}
195+
196+
/// Takes an object from the pool or creates a new one.
197+
///
198+
/// The closure `clear` provides the opportunity to clear the object before use.
199+
/// The closure `new` is called to create a new object when the pool is empty.
200+
fn take(&self, clear: impl FnOnce(&mut T), new: impl FnOnce() -> T) -> T {
201+
self.objects
202+
.pop()
203+
.map(|mut object| {
204+
#[cfg(feature = "memory-usage")]
205+
T::sub_from_resident_object_bytes(&self.resident_object_bytes, object.heap_usage());
206+
207+
inc(&self.reused_count);
208+
clear(&mut object);
209+
object
210+
})
211+
.unwrap_or_else(|| {
212+
inc(&self.new_allocated_count);
213+
new()
214+
})
215+
}
216+
}
217+
218+
#[cfg(test)]
219+
mod tests {
220+
use super::*;
221+
use core::{iter, ptr::addr_eq};
222+
223+
// The type of pools used for testing.
224+
// We want to include a `Box` so that we can do pointer comparisons.
225+
type P = Pool<Box<i32>>;
226+
227+
#[cfg(not(feature = "memory-usage"))]
228+
impl PooledObject for Box<i32> {}
229+
230+
#[cfg(feature = "memory-usage")]
231+
impl PooledObject for Box<i32> {
232+
type ResidentBytesStorage = ();
233+
fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
234+
fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
235+
fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
236+
num_objects * size_of::<i32>()
237+
}
238+
}
239+
240+
fn new() -> P {
241+
P::new(100)
242+
}
243+
244+
fn assert_metrics(pool: &P, dropped: usize, new: usize, reused: usize, returned: usize) {
245+
assert_eq!(pool.dropped_count(), dropped);
246+
assert_eq!(pool.new_allocated_count(), new);
247+
assert_eq!(pool.reused_count(), reused);
248+
assert_eq!(pool.returned_count(), returned);
249+
}
250+
251+
fn take(pool: &P) -> Box<i32> {
252+
pool.take(|_| {}, || Box::new(0))
253+
}
254+
255+
#[test]
256+
fn pool_returns_same_obj() {
257+
let pool = new();
258+
assert_metrics(&pool, 0, 0, 0, 0);
259+
260+
// Create an object and put it back.
261+
let obj1 = take(&pool);
262+
assert_metrics(&pool, 0, 1, 0, 0);
263+
let obj1_ptr = &*obj1 as *const _;
264+
pool.put(obj1);
265+
assert_metrics(&pool, 0, 1, 0, 1);
266+
267+
// Extract an object again.
268+
let obj2 = take(&pool);
269+
assert_metrics(&pool, 0, 1, 1, 1);
270+
let obj2_ptr = &*obj2 as *const _;
271+
// It should be the same as the previous one.
272+
assert!(addr_eq(obj1_ptr, obj2_ptr));
273+
pool.put(obj2);
274+
assert_metrics(&pool, 0, 1, 1, 2);
275+
276+
// Extract an object again.
277+
let obj3 = take(&pool);
278+
assert_metrics(&pool, 0, 1, 2, 2);
279+
let obj3_ptr = &*obj3 as *const _;
280+
// It should be the same as the previous one.
281+
assert!(addr_eq(obj1_ptr, obj3_ptr));
282+
283+
// Manually create an object and put it in.
284+
let obj4 = Box::new(0);
285+
let obj4_ptr = &*obj4 as *const _;
286+
pool.put(obj4);
287+
pool.put(obj3);
288+
assert_metrics(&pool, 0, 1, 2, 4);
289+
// When we take out an object, it should be the same as `obj4` and not `obj1`.
290+
let obj5 = take(&pool);
291+
assert_metrics(&pool, 0, 1, 3, 4);
292+
let obj5_ptr = &*obj5 as *const _;
293+
// Same as obj4.
294+
assert!(!addr_eq(obj5_ptr, obj1_ptr));
295+
assert!(addr_eq(obj5_ptr, obj4_ptr));
296+
}
297+
298+
#[test]
299+
fn pool_drops_past_max_size() {
300+
const N: usize = 3;
301+
let pool = P::new(N);
302+
303+
let pages = iter::repeat_with(|| take(&pool)).take(N + 1).collect::<Vec<_>>();
304+
assert_metrics(&pool, 0, N + 1, 0, 0);
305+
306+
pool.put_many(pages.into_iter());
307+
assert_metrics(&pool, 1, N + 1, 0, N);
308+
assert_eq!(pool.inner.objects.len(), N);
309+
}
310+
}

0 commit comments

Comments
 (0)