From b45b193bf24fbffd045c76eb91c07a2669069c3e Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 27 Jun 2025 14:11:06 +0200 Subject: [PATCH 1/2] execute_plan: don't build temporary Vec of rows --- crates/client-api-messages/src/websocket.rs | 232 +++++++++++------- crates/core/src/subscription/mod.rs | 13 +- .../module_subscription_manager.rs | 2 + 3 files changed, 153 insertions(+), 94 deletions(-) diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index b2a92e71bb0..d63e7dccf32 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use bytestring::ByteString; use core::{ fmt::Debug, + mem, ops::{Deref, Range}, }; use enum_as_inner::EnumAsInner; @@ -40,8 +41,19 @@ use std::{ pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb"; pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb"; +/// A list of rows being built. +pub trait RowListBuilder: Default { + type FinishedList; + + /// Push a row to the list in a serialized format. + fn push(&mut self, row: impl ToBsatn + Serialize); + + /// Finish the in flight list, throwing away the capability to mutate. + fn finish(self) -> Self::FinishedList; +} + pub trait RowListLen { - /// Returns the length of the list. + /// Returns the length, in number of rows, not bytes, of the row list. fn len(&self) -> usize; /// Returns whether the list is empty or not. fn is_empty(&self) -> bool { @@ -86,8 +98,19 @@ pub trait WebsocketFormat: Sized { + Clone + Default; + /// The builder for [`Self::List`]. + type ListBuilder: RowListBuilder; + /// Encodes the `elems` to a list in the format and also returns the length of the list. - fn encode_list(elems: impl Iterator) -> (Self::List, u64); + fn encode_list(elems: impl Iterator) -> (Self::List, u64) { + let mut num_rows = 0; + let mut list = Self::ListBuilder::default(); + for elem in elems { + num_rows += 1; + list.push(elem); + } + (list.finish(), num_rows) + } /// The type used to encode query updates. /// This type exists so that some formats, e.g., BSATN, can compress an update. @@ -758,15 +781,7 @@ impl WebsocketFormat for JsonFormat { type Single = ByteString; type List = Vec; - - fn encode_list(elems: impl Iterator) -> (Self::List, u64) { - let mut count = 0; - let list = elems - .map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into()) - .inspect(|_| count += 1) - .collect(); - (list, count) - } + type ListBuilder = Self::List; type QueryUpdate = QueryUpdate; @@ -775,6 +790,17 @@ impl WebsocketFormat for JsonFormat { } } +impl RowListBuilder for Vec { + type FinishedList = Self; + fn push(&mut self, row: impl ToBsatn + Serialize) { + let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into(); + self.push(value); + } + fn finish(self) -> Self::FinishedList { + self + } +} + #[derive(Clone, Copy, Default, Debug, SpacetimeType)] #[sats(crate = spacetimedb_lib)] pub struct BsatnFormat; @@ -783,33 +809,7 @@ impl WebsocketFormat for BsatnFormat { type Single = Box<[u8]>; type List = BsatnRowList; - - fn encode_list(mut elems: impl Iterator) -> (Self::List, u64) { - // For an empty list, the size of a row is unknown, so use `RowOffsets`. - let Some(first) = elems.next() else { - return (BsatnRowList::row_offsets(), 0); - }; - // We have at least one row. Determine the static size from that, if available. - let (mut list, mut scratch) = match first.static_bsatn_size() { - Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)), - None => (BsatnRowListBuilder::row_offsets(), Vec::new()), - }; - // Add the first element and then the rest. - // We assume that the schema of rows yielded by `elems` stays the same, - // so once the size is fixed, it will stay that way. - let mut count = 0; - let mut push = |elem: R| { - elem.to_bsatn_extend(&mut scratch).unwrap(); - list.push(&scratch); - scratch.clear(); - count += 1; - }; - push(first); - for elem in elems { - push(elem); - } - (list.finish(), count) - } + type ListBuilder = BsatnRowListBuilder; type QueryUpdate = CompressableQueryUpdate; @@ -896,20 +896,14 @@ type RowSize = u16; type RowOffset = u64; /// A packed list of BSATN-encoded rows. -#[derive(SpacetimeType, Debug, Clone)] +#[derive(SpacetimeType, Debug, Clone, Default)] #[sats(crate = spacetimedb_lib)] -pub struct BsatnRowList> { +pub struct BsatnRowList { /// A size hint about `rows_data` /// intended to facilitate parallel decode purposes on large initial updates. - size_hint: RowSizeHint, + size_hint: RowSizeHint, /// The flattened byte array for a list of rows. - rows_data: B, -} - -impl Default for BsatnRowList { - fn default() -> Self { - Self::row_offsets() - } + rows_data: Bytes, } /// NOTE(centril, 1.0): We might want to add a `None` variant to this @@ -917,17 +911,23 @@ impl Default for BsatnRowList { /// The use-case for this is clients who are bandwidth limited and where every byte counts. #[derive(SpacetimeType, Debug, Clone)] #[sats(crate = spacetimedb_lib)] -pub enum RowSizeHint { +pub enum RowSizeHint { /// Each row in `rows_data` is of the same fixed size as specified here. FixedSize(RowSize), /// The offsets into `rows_data` defining the boundaries of each row. /// Only stores the offset to the start of each row. /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. /// The behavior of this is identical to that of `PackedStr`. - RowOffsets(I), + RowOffsets(Arc<[RowOffset]>), +} + +impl Default for RowSizeHint { + fn default() -> Self { + Self::RowOffsets([].into()) + } } -impl> RowSizeHint { +impl RowSizeHint { fn index_to_range(&self, index: usize, data_end: usize) -> Option> { match self { Self::FixedSize(size) => { @@ -952,37 +952,17 @@ impl> RowSizeHint { } } -impl BsatnRowList { - pub fn fixed(row_size: RowSize) -> Self { - Self { - size_hint: RowSizeHint::FixedSize(row_size), - rows_data: <_>::default(), - } - } - - /// Returns a new empty list using indices - pub fn row_offsets() -> Self - where - I: From<[RowOffset; 0]>, - { - Self { - size_hint: RowSizeHint::RowOffsets([].into()), - rows_data: <_>::default(), - } - } -} - -impl, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList { - /// Returns the length of the row list. +impl RowListLen for BsatnRowList { fn len(&self) -> usize { match &self.size_hint { + // `size != 0` is always the case for `FixedSize`. RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize, RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(), } } } -impl, I> ByteListLen for BsatnRowList { +impl ByteListLen for BsatnRowList { /// Returns the uncompressed size of the list in bytes fn num_bytes(&self) -> usize { self.rows_data.as_ref().len() @@ -1022,26 +1002,100 @@ impl Iterator for BsatnRowListIter<'_> { } /// A [`BsatnRowList`] that can be added to. -pub type BsatnRowListBuilder = BsatnRowList, Vec>; - -impl BsatnRowListBuilder { - /// Adds `row`, BSATN-encoded to this list. - #[inline] - pub fn push(&mut self, row: &[u8]) { - if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint { - offsets.push(self.rows_data.len() as u64); - } - self.rows_data.extend_from_slice(row); +#[derive(Default)] +pub struct BsatnRowListBuilder { + /// A size hint about `rows_data` + /// intended to facilitate parallel decode purposes on large initial updates. + size_hint: RowSizeHintBuilder, + /// The flattened byte array for a list of rows. + rows_data: Vec, +} + +/// A [`RowSizeHint`] under construction. +pub enum RowSizeHintBuilder { + /// We haven't seen any rows yet. + Empty, + /// Each row in `rows_data` is of the same fixed size as specified here + /// but we don't know whether the size fits in `RowSize` + /// and we don't know whether future rows will also have this size. + FixedSizeDyn(usize), + /// Each row in `rows_data` is of the same fixed size as specified here + /// and we know that this will be the case for future rows as well. + FixedSizeStatic(RowSize), + /// The offsets into `rows_data` defining the boundaries of each row. + /// Only stores the offset to the start of each row. + /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. + /// The behavior of this is identical to that of `PackedStr`. + RowOffsets(Vec), +} + +impl Default for RowSizeHintBuilder { + fn default() -> Self { + Self::Empty } +} - /// Finish the in flight list, throwing away the capability to mutate. - pub fn finish(self) -> BsatnRowList { +impl RowListBuilder for BsatnRowListBuilder { + type FinishedList = BsatnRowList; + + fn push(&mut self, row: impl ToBsatn + Serialize) { + use RowSizeHintBuilder::*; + + // Record the length before. It will be the starting offset of `row`. + let len_before = self.rows_data.len(); + // BSATN-encode the row directly to the buffer. + row.to_bsatn_extend(&mut self.rows_data).unwrap(); + + let encoded_len = || self.rows_data.len() - len_before; + let push_row_offset = |mut offsets: Vec<_>| { + offsets.push(len_before as u64); + RowOffsets(offsets) + }; + + let hint = mem::replace(&mut self.size_hint, Empty); + self.size_hint = match hint { + // Static size that is unchanging. + h @ FixedSizeStatic(_) => h, + // Dynamic size that is unchanging. + h @ FixedSizeDyn(size) if size == encoded_len() => h, + // Size mismatch for the dynamic fixed size. + // Now we must construct `RowOffsets` for all rows thus far. + // We know that `size != 0` here, as this was excluded when we had `Empty`. + FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)), + // Once there's a size for each row, we'll just add to it. + RowOffsets(offsets) => push_row_offset(offsets), + // First time a row is seen. Use `encoded_len()` as the hint. + // If we have a static layout, we'll always have a fixed size. + // Otherwise, let's start out with a potentially fixed size. + // In either case, if `encoded_len() == 0`, we have to store offsets, + // as we cannot recover the number of elements otherwise. + Empty => match row.static_bsatn_size() { + Some(0) => push_row_offset(Vec::new()), + Some(size) => FixedSizeStatic(size), + None => match encoded_len() { + 0 => push_row_offset(Vec::new()), + size => FixedSizeDyn(size), + }, + }, + }; + } + + fn finish(self) -> Self::FinishedList { let Self { size_hint, rows_data } = self; - let rows_data = rows_data.into(); let size_hint = match size_hint { - RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs), - RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), + RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()), + RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs), + RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) { + Ok(fs) => RowSizeHint::FixedSize(fs), + Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()), + }, + RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), }; + let rows_data = rows_data.into(); BsatnRowList { size_hint, rows_data } } } + +fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec { + (0..num_rows).map(|i| i * size).map(|o| o as u64).collect() +} diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 55093bf0e03..41e017dfcc0 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -5,7 +5,8 @@ use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ - ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat, + ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate, + WebsocketFormat, }; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; @@ -99,20 +100,22 @@ where Tx: Datastore + DeltaStore, F: WebsocketFormat, { - let mut rows = vec![]; + let mut count = 0; + let mut list = F::ListBuilder::default(); let mut metrics = ExecutionMetrics::default(); for fragment in plan_fragments { fragment.execute(tx, &mut metrics, &mut |row| { - rows.push(row); + count += 1; + list.push(row); Ok(()) })?; } - let (list, n) = F::encode_list(rows.into_iter()); + let list = list.finish(); metrics.bytes_scanned += list.num_bytes(); metrics.bytes_sent_to_clients += list.num_bytes(); - Ok((list, n, metrics)) + Ok((list, count, metrics)) } /// When collecting a table update are we inserting or deleting rows? diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 4120f294b81..54e982c97bf 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -1194,6 +1194,8 @@ impl SubscriptionManager { ) -> SingleQueryUpdate { let (update, num_rows, num_bytes) = memory .get_or_insert_with(|| { + // TODO(centril): consider pushing the encoding of each row into + // `eval_delta` instead, to avoid building the temporary `Vec`s in `UpdatesRelValue`. let encoded = updates.encode::(); // The first time we insert into this map, we call encode. // This is when we serialize the rows to BSATN/JSON. From 88cb22c61766285c2bc0b664c111e6f13498b4d1 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 4 Jul 2025 11:32:10 +0200 Subject: [PATCH 2/2] subscriptions: reuse buffers in `ServerMessage` via global pool --- crates/bench/benches/subscription.rs | 21 +++-- crates/client-api-messages/src/websocket.rs | 38 ++++++-- crates/client-api/src/routes/subscribe.rs | 6 +- crates/core/src/client.rs | 1 + crates/core/src/client/client_connection.rs | 4 + crates/core/src/client/consume_each_list.rs | 80 ++++++++++++++++ crates/core/src/client/messages.rs | 7 ++ crates/core/src/host/host_controller.rs | 18 +++- crates/core/src/host/module_host.rs | 13 ++- .../core/src/subscription/execution_unit.rs | 5 +- crates/core/src/subscription/mod.rs | 21 +++-- .../subscription/module_subscription_actor.rs | 40 ++++++-- .../module_subscription_manager.rs | 16 +++- crates/core/src/subscription/query.rs | 5 +- .../src/subscription/row_list_builder_pool.rs | 91 +++++++++++++++++++ crates/core/src/subscription/subscription.rs | 5 +- crates/core/src/worker_metrics/mod.rs | 50 +++++++++- crates/execution/src/lib.rs | 14 +-- crates/sats/src/bsatn.rs | 55 ++++++++++- crates/standalone/src/lib.rs | 5 + crates/standalone/src/subcommands/start.rs | 1 + crates/table/src/static_layout.rs | 40 +++----- crates/table/src/table.rs | 7 +- crates/vm/src/relation.rs | 5 +- 24 files changed, 455 insertions(+), 93 deletions(-) create mode 100644 crates/core/src/client/consume_each_list.rs create mode 100644 crates/core/src/subscription/row_list_builder_pool.rs diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index 5fe56e1a9e5..c3e7b5ba549 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -1,4 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use spacetimedb::client::consume_each_list::ConsumeEachBuffer; use spacetimedb::error::DBError; use spacetimedb::execution_context::Workload; use spacetimedb::host::module_host::DatabaseTableUpdate; @@ -6,6 +7,7 @@ use spacetimedb::identity::AuthCtx; use spacetimedb::messages::websocket::BsatnFormat; use spacetimedb::sql::ast::SchemaViewer; use spacetimedb::subscription::query::compile_read_only_queryset; +use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use spacetimedb::subscription::subscription::ExecutionSet; use spacetimedb::subscription::tx::DeltaTx; use spacetimedb::subscription::{collect_table_update, TableUpdateType}; @@ -119,6 +121,8 @@ fn eval(c: &mut Criterion) { let ins_rhs = insert_op(rhs, "location", new_rhs_row); let update = [&ins_lhs, &ins_rhs]; + let bsatn_rlb_pool = black_box(BsatnRowListBuilderPool::new()); + // A benchmark runner for the new query engine let bench_query = |c: &mut Criterion, name, sql| { c.bench_function(name, |b| { @@ -134,13 +138,17 @@ fn eval(c: &mut Criterion) { let tx = DeltaTx::from(&tx); b.iter(|| { - drop(black_box(collect_table_update::<_, BsatnFormat>( + let updates = black_box(collect_table_update::<_, BsatnFormat>( &plans, table_id, table_name.clone(), &tx, TableUpdateType::Subscribe, - ))) + &bsatn_rlb_pool, + )); + if let Ok((updates, _)) = updates { + updates.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); + } }) }); }; @@ -152,12 +160,9 @@ fn eval(c: &mut Criterion) { let query: ExecutionSet = query.into(); b.iter(|| { - drop(black_box(query.eval::( - &raw.db, - &tx, - None, - Compression::None, - ))) + let updates = + black_box(query.eval::(&raw.db, &tx, &bsatn_rlb_pool, None, Compression::None)); + updates.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); }) }); }; diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index d63e7dccf32..a9b6719a17c 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -15,7 +15,7 @@ //! rather than using an external mirror of this schema. use crate::energy::EnergyQuanta; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use core::{ fmt::Debug, @@ -41,8 +41,15 @@ use std::{ pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb"; pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb"; +/// A source of row list builders for a given [`WebsocketFormat`]. +pub trait RowListBuilderSource { + /// Returns a row list builder from the source `self`. + fn take_row_list_builder(&self) -> F::ListBuilder; +} + /// A list of rows being built. -pub trait RowListBuilder: Default { +pub trait RowListBuilder { + /// The type of a finished list returned by [`RowListBuilder::finish`]. type FinishedList; /// Push a row to the list in a serialized format. @@ -98,13 +105,17 @@ pub trait WebsocketFormat: Sized { + Clone + Default; - /// The builder for [`Self::List`]. + /// The builder for [`WebsocketFormat::List`]. type ListBuilder: RowListBuilder; /// Encodes the `elems` to a list in the format and also returns the length of the list. - fn encode_list(elems: impl Iterator) -> (Self::List, u64) { + /// + /// Needs to be provided with an empty [`WebsocketFormat::ListBuilder`]. + fn encode_list( + mut list: Self::ListBuilder, + elems: impl Iterator, + ) -> (Self::List, u64) { let mut num_rows = 0; - let mut list = Self::ListBuilder::default(); for elem in elems { num_rows += 1; list.push(elem); @@ -116,7 +127,7 @@ pub trait WebsocketFormat: Sized { /// This type exists so that some formats, e.g., BSATN, can compress an update. type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send; - /// Convert a `QueryUpdate` into `Self::QueryUpdate`. + /// Convert a `QueryUpdate` into [`WebsocketFormat::QueryUpdate`]. /// This allows some formats to e.g., compress the update. fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate; } @@ -976,6 +987,11 @@ impl BsatnRowList { let data_range = self.size_hint.index_to_range(index, data_end)?; Some(self.rows_data.slice(data_range)) } + + /// Consumes the list and returns the parts. + pub fn into_inner(self) -> (RowSizeHint, Bytes) { + (self.size_hint, self.rows_data) + } } /// An iterator over all the elements in a [`BsatnRowList`]. @@ -1008,7 +1024,7 @@ pub struct BsatnRowListBuilder { /// intended to facilitate parallel decode purposes on large initial updates. size_hint: RowSizeHintBuilder, /// The flattened byte array for a list of rows. - rows_data: Vec, + rows_data: BytesMut, } /// A [`RowSizeHint`] under construction. @@ -1096,6 +1112,14 @@ impl RowListBuilder for BsatnRowListBuilder { } } +impl BsatnRowListBuilder { + /// Returns a new builder using an empty [`BytesMut`] for the `rows_data` buffer. + pub fn new_from_bytes(rows_data: BytesMut) -> Self { + let size_hint = <_>::default(); + Self { size_hint, rows_data } + } +} + fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec { (0..num_rows).map(|i| i * size).map(|o| o as u64).collect() } diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 85bd8acb42d..d7d63f57be1 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -234,6 +234,8 @@ async fn ws_client_actor_inner( let mut closed = false; let mut rx_buf = Vec::new(); + let bsatn_rlb_pool = &client.module.subscriptions().bsatn_rlb_pool.clone(); + let mut msg_buffer = SerializeBuffer::new(client.config); loop { rx_buf.clear(); @@ -292,7 +294,7 @@ async fn ws_client_actor_inner( // Serialize the message, report metrics, // and keep a handle to the buffer. - let (msg_alloc, msg_data) = serialize(msg_buffer, msg, client.config); + let (msg_alloc, msg_data) = serialize(bsatn_rlb_pool, msg_buffer, msg, client.config); report_ws_sent_metrics(&addr, workload, num_rows, &msg_data); // Buffer the message without necessarily sending it. @@ -440,7 +442,7 @@ async fn ws_client_actor_inner( if let MessageHandleError::Execution(err) = e { log::error!("reducer execution error: {err:#}"); // Serialize the message and keep a handle to the buffer. - let (msg_alloc, msg_data) = serialize(msg_buffer, err, client.config); + let (msg_alloc, msg_data) = serialize(bsatn_rlb_pool, msg_buffer, err, client.config); let send = async { ws.send(datamsg_to_wsmsg(msg_data)).await }; let send = tokio::time::timeout(SEND_TIMEOUT, send); diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index 5970c6ee31c..f56d1a42061 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -3,6 +3,7 @@ use std::fmt; mod client_connection; mod client_connection_index; +pub mod consume_each_list; mod message_handlers; pub mod messages; diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 1742b81f647..795a64cef3b 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -11,6 +11,7 @@ use crate::error::DBError; use crate::host::module_host::ClientConnectedError; use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult}; use crate::messages::websocket::Subscribe; +use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool; use crate::util::asyncify; use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; @@ -586,6 +587,7 @@ impl ClientConnection { self.sender.clone(), message_id.to_owned(), timer, + JsonRowListBuilderFakePool, |msg: OneOffQueryResponseMessage| msg.into(), ) .await @@ -597,6 +599,7 @@ impl ClientConnection { message_id: &[u8], timer: Instant, ) -> Result<(), anyhow::Error> { + let bsatn_rlb_pool = self.module.replica_ctx().subscriptions.bsatn_rlb_pool.clone(); self.module .one_off_query::( self.id.identity, @@ -604,6 +607,7 @@ impl ClientConnection { self.sender.clone(), message_id.to_owned(), timer, + bsatn_rlb_pool, |msg: OneOffQueryResponseMessage| msg.into(), ) .await diff --git a/crates/core/src/client/consume_each_list.rs b/crates/core/src/client/consume_each_list.rs new file mode 100644 index 00000000000..984570d034c --- /dev/null +++ b/crates/core/src/client/consume_each_list.rs @@ -0,0 +1,80 @@ +use bytes::Bytes; +use spacetimedb_client_api_messages::websocket::{ + BsatnFormat, BsatnRowList, CompressableQueryUpdate, DatabaseUpdate, OneOffQueryResponse, QueryUpdate, + ServerMessage, TableUpdate, UpdateStatus, +}; + +/// Moves each buffer in `self` into a closure. +pub trait ConsumeEachBuffer { + /// Consumes `self`, moving each `Bytes` buffer in `self` into the closure `each`. + fn consume_each_list(self, each: &mut impl FnMut(Bytes)); +} + +impl ConsumeEachBuffer for ServerMessage { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + use ServerMessage::*; + match self { + InitialSubscription(x) => x.database_update.consume_each_list(each), + TransactionUpdate(x) => x.status.consume_each_list(each), + TransactionUpdateLight(x) => x.update.consume_each_list(each), + IdentityToken(_) | SubscriptionError(_) => {} + OneOffQueryResponse(x) => x.consume_each_list(each), + SubscribeApplied(x) => x.rows.table_rows.consume_each_list(each), + UnsubscribeApplied(x) => x.rows.table_rows.consume_each_list(each), + SubscribeMultiApplied(x) => x.update.consume_each_list(each), + UnsubscribeMultiApplied(x) => x.update.consume_each_list(each), + } + } +} + +impl ConsumeEachBuffer for OneOffQueryResponse { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + Vec::from(self.tables) + .into_iter() + .for_each(|x| x.rows.consume_each_list(each)); + } +} + +impl ConsumeEachBuffer for UpdateStatus { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + match self { + Self::Committed(x) => x.consume_each_list(each), + Self::Failed(_) | UpdateStatus::OutOfEnergy => {} + } + } +} + +impl ConsumeEachBuffer for DatabaseUpdate { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + self.tables.into_iter().for_each(|x| x.consume_each_list(each)); + } +} + +impl ConsumeEachBuffer for TableUpdate { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + self.updates.into_iter().for_each(|x| x.consume_each_list(each)); + } +} + +impl ConsumeEachBuffer for CompressableQueryUpdate { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + match self { + Self::Uncompressed(x) => x.consume_each_list(each), + Self::Brotli(bytes) | Self::Gzip(bytes) => each(bytes), + } + } +} + +impl ConsumeEachBuffer for QueryUpdate { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + self.deletes.consume_each_list(each); + self.inserts.consume_each_list(each); + } +} + +impl ConsumeEachBuffer for BsatnRowList { + fn consume_each_list(self, each: &mut impl FnMut(Bytes)) { + let (_, buffer) = self.into_inner(); + each(buffer); + } +} diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index fa801a17bc3..703835ce463 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -1,8 +1,10 @@ use super::{ClientConfig, DataMessage, Protocol}; +use crate::client::consume_each_list::ConsumeEachBuffer; use crate::execution_context::WorkloadType; use crate::host::module_host::{EventStatus, ModuleEvent}; use crate::host::ArgsTuple; use crate::messages::websocket as ws; +use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use bytes::{BufMut, Bytes, BytesMut}; use bytestring::ByteString; use derive_more::From; @@ -125,6 +127,7 @@ impl InUseSerializeBuffer { /// If `protocol` is [`Protocol::Binary`], /// the message will be conditionally compressed by this method according to `compression`. pub fn serialize( + bsatn_rlb_pool: &BsatnRowListBuilderPool, mut buffer: SerializeBuffer, msg: impl ToProtocol, config: ClientConfig, @@ -147,6 +150,10 @@ pub fn serialize( bsatn::to_writer(w.into_inner(), &msg).unwrap() }); + // At this point, we no longer have a use for `msg`, + // so try to reclaim its buffers. + msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); + // Conditionally compress the message. let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) { Compression::None => buffer.uncompressed(), diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index d520ac9ccad..f8062be3b20 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -14,6 +14,7 @@ use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager}; +use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::util::asyncify; use crate::util::jobs::{JobCore, JobCores}; use crate::worker_metrics::WORKER_METRICS; @@ -102,6 +103,8 @@ pub struct HostController { runtimes: Arc, /// The CPU cores that are reserved for ModuleHost operations to run on. db_cores: JobCores, + /// The pool of buffers used to build `BsatnRowList`s in subscriptions. + pub bsatn_rlb_pool: BsatnRowListBuilderPool, } struct HostRuntimes { @@ -187,6 +190,7 @@ impl HostController { runtimes: HostRuntimes::new(Some(&data_dir)), data_dir, page_pool: PagePool::new(default_config.page_pool_max_size), + bsatn_rlb_pool: BsatnRowListBuilderPool::new(), db_cores, } } @@ -308,6 +312,7 @@ impl HostController { // core - there's not a concern that we'll only end up using 1/2 // of the actual cores. self.db_cores.take(), + self.bsatn_rlb_pool.clone(), ) .await } @@ -616,6 +621,7 @@ async fn make_replica_ctx( database: Database, replica_id: u64, relational_db: Arc, + bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result { let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs()))); let send_worker_queue = spawn_send_worker(Some(database.database_identity)); @@ -628,6 +634,7 @@ async fn make_replica_ctx( subscriptions, send_worker_queue, database.owner_identity, + bsatn_rlb_pool, ); // If an error occurs when evaluating a subscription, @@ -716,11 +723,12 @@ async fn launch_module( replica_dir: ReplicaDir, runtimes: Arc, core: JobCore, + bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; let host_type = database.host_type; - let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db) + let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db, bsatn_rlb_pool) .await .map(Arc::new)?; let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); @@ -822,6 +830,7 @@ impl Host { runtimes, durability, page_pool, + bsatn_rlb_pool, .. } = host_controller; let on_panic = host_controller.unregister_fn(replica_id); @@ -890,6 +899,7 @@ impl Host { replica_dir, runtimes.clone(), host_controller.db_cores.take(), + bsatn_rlb_pool.clone(), ) .await?; @@ -950,6 +960,7 @@ impl Host { database: Database, program: Program, core: JobCore, + bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> anyhow::Result> { // Even in-memory databases acquire a lockfile. // Grab a tempdir to put that lockfile in. @@ -983,6 +994,7 @@ impl Host { phony_replica_dir, runtimes.clone(), core, + bsatn_rlb_pool, ) .await?; @@ -1109,7 +1121,9 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let runtimes = HostRuntimes::new(None); let page_pool = PagePool::new(None); let core = JobCore::default(); - let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; + let bsatn_rlb_pool = BsatnRowListBuilderPool::new(); + let module_info = + Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core, bsatn_rlb_pool).await?; // this should always succeed, but sometimes it doesn't let module_def = match Arc::try_unwrap(module_info) { Ok(info) => info.module_def, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c1907b2b0f6..96354d1fddc 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -28,7 +28,9 @@ use derive_more::From; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; -use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::{ + ByteListLen, Compression, OneOffTable, QueryUpdate, RowListBuilderSource, WebsocketFormat, +}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_execution::pipelined::PipelinedProject; @@ -134,9 +136,9 @@ impl UpdatesRelValue<'_> { !(self.deletes.is_empty() && self.inserts.is_empty()) } - pub fn encode(&self) -> (F::QueryUpdate, u64, usize) { - let (deletes, nr_del) = F::encode_list(self.deletes.iter()); - let (inserts, nr_ins) = F::encode_list(self.inserts.iter()); + pub fn encode(&self, rlb_pool: &impl RowListBuilderSource) -> (F::QueryUpdate, u64, usize) { + let (deletes, nr_del) = F::encode_list(rlb_pool.take_row_list_builder(), self.deletes.iter()); + let (inserts, nr_ins) = F::encode_list(rlb_pool.take_row_list_builder(), self.inserts.iter()); let num_rows = nr_del + nr_ins; let num_bytes = deletes.num_bytes() + inserts.num_bytes(); let qu = QueryUpdate { deletes, inserts }; @@ -1033,6 +1035,7 @@ impl ModuleHost { client: Arc, message_id: Vec, timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, // We take this because we only have a way to convert with the concrete types (Bsatn and Json) into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage + Send + 'static, ) -> Result<(), anyhow::Error> { @@ -1080,7 +1083,7 @@ impl ModuleHost { .collect::>(); // Execute the union and return the results - execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx)) + execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx), &rlb_pool) .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) .context("One-off queries are not allowed to modify the database") })(); diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index 188fa1bcb74..d79591545a1 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -9,7 +9,7 @@ use crate::messages::websocket::TableUpdate; use crate::util::slow::SlowQueryLogger; use crate::vm::{build_query, TxMode}; use spacetimedb_client_api_messages::websocket::{ - Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat, + Compression, QueryUpdate, RowListBuilderSource, RowListLen as _, SingleQueryUpdate, WebsocketFormat, }; use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; @@ -240,6 +240,7 @@ impl ExecutionUnit { &self, db: &RelationalDB, tx: &Tx, + rlb_pool: &impl RowListBuilderSource, sql: &str, slow_query_threshold: Option, compression: Compression, @@ -250,7 +251,7 @@ impl ExecutionUnit { let tx = &tx.into(); let mut inserts = build_query(db, tx, &self.eval_plan, &mut NoInMemUsed); let inserts = inserts.iter(); - let (inserts, num_rows) = F::encode_list(inserts); + let (inserts, num_rows) = F::encode_list(rlb_pool.take_row_list_builder(), inserts); (!inserts.is_empty()).then(|| { let deletes = F::List::default(); diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 41e017dfcc0..81e960b74c6 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -5,8 +5,8 @@ use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ - ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate, - WebsocketFormat, + ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, RowListBuilderSource, + SingleQueryUpdate, TableUpdate, WebsocketFormat, }; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; @@ -24,6 +24,7 @@ pub mod execution_unit; pub mod module_subscription_actor; pub mod module_subscription_manager; pub mod query; +pub mod row_list_builder_pool; #[allow(clippy::module_inception)] // it's right this isn't ideal :/ pub mod subscription; pub mod tx; @@ -95,13 +96,17 @@ impl MetricsRecorder for ExecutionCounters { } /// Execute a subscription query -pub fn execute_plan(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> +pub fn execute_plan( + plan_fragments: &[PipelinedProject], + tx: &Tx, + rlb_pool: &impl RowListBuilderSource, +) -> Result<(F::List, u64, ExecutionMetrics)> where Tx: Datastore + DeltaStore, F: WebsocketFormat, { let mut count = 0; - let mut list = F::ListBuilder::default(); + let mut list = rlb_pool.take_row_list_builder(); let mut metrics = ExecutionMetrics::default(); for fragment in plan_fragments { @@ -133,12 +138,13 @@ pub fn collect_table_update( table_name: Box, tx: &Tx, update_type: TableUpdateType, + rlb_pool: &impl RowListBuilderSource, ) -> Result<(TableUpdate, ExecutionMetrics)> where Tx: Datastore + DeltaStore, F: WebsocketFormat, { - execute_plan::(plan_fragments, tx).map(|(rows, num_rows, metrics)| { + execute_plan::(plan_fragments, tx, rlb_pool).map(|(rows, num_rows, metrics)| { let empty = F::List::default(); let qu = match update_type { TableUpdateType::Subscribe => QueryUpdate { @@ -166,6 +172,7 @@ pub fn execute_plans( plans: &[Arc], tx: &Tx, update_type: TableUpdateType, + rlb_pool: &(impl Sync + RowListBuilderSource), ) -> Result<(DatabaseUpdate, ExecutionMetrics), DBError> where Tx: Datastore + DeltaStore + Sync, @@ -186,7 +193,9 @@ where .clone() .optimize() .map(|plan| (sql, PipelinedProject::from(plan))) - .and_then(|(_, plan)| collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type)) + .and_then(|(_, plan)| { + collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type, rlb_pool) + }) .map_err(|err| DBError::WithSql { sql: sql.into(), error: Box::new(DBError::Other(err)), diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index fb44f9d9567..6b1b8065a09 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -20,6 +20,7 @@ use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent}; use crate::messages::websocket::Subscribe; use crate::subscription::execute_plans; use crate::subscription::query::is_subscribe_to_all_tables; +use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool}; use crate::util::prometheus_handle::IntGaugeExt; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; @@ -46,6 +47,7 @@ pub struct ModuleSubscriptions { broadcast_queue: BroadcastQueue, owner_identity: Identity, stats: Arc, + pub bsatn_rlb_pool: BsatnRowListBuilderPool, } #[derive(Debug, Clone)] @@ -164,6 +166,7 @@ impl ModuleSubscriptions { subscriptions: Subscriptions, broadcast_queue: BroadcastQueue, owner_identity: Identity, + bsatn_rlb_pool: BsatnRowListBuilderPool, ) -> Self { let db = &relational_db.database_identity(); let stats = Arc::new(SubscriptionGauges::new(db)); @@ -174,6 +177,7 @@ impl ModuleSubscriptions { broadcast_queue, owner_identity, stats, + bsatn_rlb_pool, } } @@ -194,6 +198,7 @@ impl ModuleSubscriptions { SubscriptionManager::for_test_without_metrics_arc_rwlock(), send_worker_queue, Identity::ZERO, + BsatnRowListBuilderPool::new(), ) } @@ -246,10 +251,24 @@ impl ModuleSubscriptions { let tx = DeltaTx::from(tx); Ok(match sender.config.protocol { - Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type) - .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)), - Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type) - .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)), + Protocol::Binary => collect_table_update( + &plans, + table_id, + table_name.into(), + &tx, + update_type, + &self.bsatn_rlb_pool, + ) + .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)), + Protocol::Text => collect_table_update( + &plans, + table_id, + table_name.into(), + &tx, + update_type, + &JsonRowListBuilderFakePool, + ) + .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)), }?) } @@ -276,11 +295,11 @@ impl ModuleSubscriptions { let tx = DeltaTx::from(tx); match sender.config.protocol { Protocol::Binary => { - let (update, metrics) = execute_plans(queries, &tx, update_type)?; + let (update, metrics) = execute_plans(queries, &tx, update_type, &self.bsatn_rlb_pool)?; Ok((FormatSwitch::Bsatn(update), metrics)) } Protocol::Text => { - let (update, metrics) = execute_plans(queries, &tx, update_type)?; + let (update, metrics) = execute_plans(queries, &tx, update_type, &JsonRowListBuilderFakePool)?; Ok((FormatSwitch::Json(update), metrics)) } } @@ -794,9 +813,9 @@ impl ModuleSubscriptions { let tx = DeltaTx::from(&*tx); let (database_update, metrics) = match sender.config.protocol { - Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe) + Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe, &self.bsatn_rlb_pool) .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?, - Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe) + Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe, &JsonRowListBuilderFakePool) .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?, }; @@ -904,7 +923,8 @@ impl ModuleSubscriptions { match &event.status { EventStatus::Committed(_) => { - update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller); + update_metrics = + subscriptions.eval_updates_sequential(&delta_read_tx, &self.bsatn_rlb_pool, event.clone(), caller); } EventStatus::Failed(_) => { if let Some(client) = caller { @@ -949,6 +969,7 @@ mod tests { use crate::sql::execute::run; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager}; use crate::subscription::query::compile_read_only_query; + use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::subscription::TableUpdateType; use hashbrown::HashMap; use itertools::Itertools; @@ -985,6 +1006,7 @@ mod tests { SubscriptionManager::for_test_without_metrics_arc_rwlock(), send_worker_queue, owner, + BsatnRowListBuilderPool::new(), ); let subscribe = Subscribe { diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 54e982c97bf..a952bede6a4 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -10,6 +10,7 @@ use crate::error::DBError; use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue}; use crate::messages::websocket::{self as ws, TableUpdate}; use crate::subscription::delta::eval_delta; +use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool}; use crate::worker_metrics::WORKER_METRICS; use core::mem; use hashbrown::hash_map::OccupiedError; @@ -17,8 +18,8 @@ use hashbrown::{HashMap, HashSet}; use parking_lot::RwLock; use prometheus::IntGauge; use spacetimedb_client_api_messages::websocket::{ - BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate, - WebsocketFormat, + BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, RowListBuilderSource, + SingleQueryUpdate, WebsocketFormat, }; use spacetimedb_data_structures::map::{Entry, IntMap}; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -1102,6 +1103,7 @@ impl SubscriptionManager { pub fn eval_updates_sequential( &self, tx: &DeltaTx, + bsatn_rlb_pool: &BsatnRowListBuilderPool, event: Arc, caller: Option>, ) -> ExecutionMetrics { @@ -1191,12 +1193,13 @@ impl SubscriptionManager { updates: &UpdatesRelValue<'_>, memory: &mut Option<(F::QueryUpdate, u64, usize)>, metrics: &mut ExecutionMetrics, + rlb_pool: &impl RowListBuilderSource, ) -> SingleQueryUpdate { let (update, num_rows, num_bytes) = memory .get_or_insert_with(|| { // TODO(centril): consider pushing the encoding of each row into // `eval_delta` instead, to avoid building the temporary `Vec`s in `UpdatesRelValue`. - let encoded = updates.encode::(); + let encoded = updates.encode::(rlb_pool); // The first time we insert into this map, we call encode. // This is when we serialize the rows to BSATN/JSON. // Hence this is where we increment `bytes_scanned`. @@ -1241,11 +1244,13 @@ impl SubscriptionManager { &delta_updates, &mut ops_bin_uncompressed, &mut acc.metrics, + bsatn_rlb_pool, )), Protocol::Text => Json(memo_encode::( &delta_updates, &mut ops_json, &mut acc.metrics, + &JsonRowListBuilderFakePool, )), }; ClientUpdate { @@ -1589,6 +1594,7 @@ mod tests { use crate::host::module_host::DatabaseTableUpdate; use crate::sql::ast::SchemaViewer; use crate::subscription::module_subscription_manager::ClientQueryId; + use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::{ client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName}, db::relational_db::{tests_utils::TestDB, RelationalDB}, @@ -2443,8 +2449,10 @@ mod tests { timer: None, }); + let bsatn_rlb_pool = BsatnRowListBuilderPool::new(); + db.with_read_only(Workload::Update, |tx| { - subscriptions.eval_updates_sequential(&(&*tx).into(), event, Some(Arc::new(client0))) + subscriptions.eval_updates_sequential(&(&*tx).into(), &bsatn_rlb_pool, event, Some(Arc::new(client0))) }); runtime.block_on(async move { diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 95b750a99df..96a2aa37e78 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -155,6 +155,7 @@ mod tests { use crate::sql::execute::collect_result; use crate::sql::execute::tests::run_for_testing; use crate::subscription::module_subscription_manager::QueriedTableIndexIds; + use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::subscription::subscription::{legacy_get_all, ExecutionSet}; use crate::subscription::tx::DeltaTx; use crate::vm::tests::create_table_with_rows; @@ -353,7 +354,9 @@ mod tests { total_tables: usize, rows: &[ProductValue], ) -> ResultTest<()> { - let result = s.eval::(db, tx, None, Compression::Brotli).tables; + let result = s + .eval::(db, tx, &BsatnRowListBuilderPool::new(), None, Compression::Brotli) + .tables; assert_eq!( result.len(), total_tables, diff --git a/crates/core/src/subscription/row_list_builder_pool.rs b/crates/core/src/subscription/row_list_builder_pool.rs new file mode 100644 index 00000000000..ef86a0495ed --- /dev/null +++ b/crates/core/src/subscription/row_list_builder_pool.rs @@ -0,0 +1,91 @@ +use bytes::{Bytes, BytesMut}; +use core::sync::atomic::{AtomicUsize, Ordering}; +use derive_more::Deref; +use spacetimedb_client_api_messages::websocket::{ + BsatnFormat, BsatnRowListBuilder, JsonFormat, RowListBuilderSource, WebsocketFormat, +}; +use spacetimedb_data_structures::object_pool::{Pool, PooledObject}; +use spacetimedb_memory_usage::MemoryUsage; + +/// The default buffer capacity, currently 4 KiB. +const DEFAULT_BUFFER_CAPACITY: usize = 4096; + +/// The pool can store at most 4 MiB worth of buffers. +/// NOTE(centril): This hasn't been measured yet, +/// but this should be a fairly good initial guestimate +/// as the server would need to handle half as many tables in total. +/// If there are two queries mentioning the same table, +/// that counts as two tables. +const DEFAULT_POOL_CAPACITY: usize = 1024; + +/// New-type for `BytesMut` to deal with the orphan check. +pub struct PooledBuffer(BytesMut); + +impl MemoryUsage for PooledBuffer { + fn heap_usage(&self) -> usize { + self.0.heap_usage() + } +} + +impl PooledObject for PooledBuffer { + type ResidentBytesStorage = AtomicUsize; + + fn resident_object_bytes(storage: &Self::ResidentBytesStorage, _: usize) -> usize { + storage.load(Ordering::Relaxed) + } + + fn add_to_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize) { + storage.fetch_add(bytes, Ordering::Relaxed); + } + + fn sub_from_resident_object_bytes(storage: &Self::ResidentBytesStorage, bytes: usize) { + storage.fetch_sub(bytes, Ordering::Relaxed); + } +} + +/// The pool for [`BsatnRowListBuilder`]s. +#[derive(Clone, Deref, Debug)] +pub struct BsatnRowListBuilderPool { + pool: Pool, +} + +impl BsatnRowListBuilderPool { + /// Returns a new pool with the default maximum capacity. + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + let pool = Pool::new(DEFAULT_POOL_CAPACITY); + Self { pool } + } + + /// Tries to reclaim the allocation of `buffer` into the pool + /// to be used when building a new list. + /// + /// In most calls, this method will do nothing, + /// as `buffer` will be shared between clients subscribing to the same query. + /// It's only on the last client that the refcount will be 1 + /// which will then cause `put` to add the allocation into the buffer. + pub fn try_put(&self, buffer: Bytes) { + if let Ok(bytes) = buffer.try_into_mut() { + self.put(PooledBuffer(bytes)); + } + } +} + +impl RowListBuilderSource for BsatnRowListBuilderPool { + fn take_row_list_builder(&self) -> BsatnRowListBuilder { + let PooledBuffer(buffer) = self.pool.take( + |buffer| buffer.0.clear(), + || PooledBuffer(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), + ); + BsatnRowListBuilder::new_from_bytes(buffer) + } +} + +/// The "pool" for the builder for the [`JsonFormat`]. +pub(crate) struct JsonRowListBuilderFakePool; + +impl RowListBuilderSource for JsonRowListBuilderFakePool { + fn take_row_list_builder(&self) -> ::ListBuilder { + Vec::new() + } +} diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index f98a7970eb3..e6724c58e6b 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -32,7 +32,7 @@ use crate::sql::ast::SchemaViewer; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::{Compression, RowListBuilderSource, WebsocketFormat}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::identity::AuthCtx; @@ -516,6 +516,7 @@ impl ExecutionSet { &self, db: &RelationalDB, tx: &Tx, + rlb_pool: &impl RowListBuilderSource, slow_query_threshold: Option, compression: Compression, ) -> ws::DatabaseUpdate { @@ -524,7 +525,7 @@ impl ExecutionSet { .exec_units // if you need eval to run single-threaded for debugging, change this to .iter() .iter() - .filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) + .filter_map(|unit| unit.eval(db, tx, rlb_pool, &unit.sql, slow_query_threshold, compression)) .collect(); ws::DatabaseUpdate { tables } } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index f7beedcdcfb..432a2cf328d 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,5 +1,5 @@ -use crate::execution_context::WorkloadType; use crate::hash::Hash; +use crate::{execution_context::WorkloadType, subscription::row_list_builder_pool::BsatnRowListBuilderPool}; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_lib::{ConnectionId, Identity}; @@ -81,6 +81,31 @@ metrics_group!( #[labels(node_id: str)] pub page_pool_pages_returned: IntGaugeVec, + #[name = bsatn_rlb_pool_resident_bytes] + #[help = "Total memory used by the `BsatnRowListBuilderPool`"] + #[labels(node_id: str)] + pub bsatn_rlb_pool_resident_bytes: IntGaugeVec, + + #[name = bsatn_rlb_pool_dropped] + #[help = "Total number of buffers dropped by the `BsatnRowListBuilderPool`"] + #[labels(node_id: str)] + pub bsatn_rlb_pool_dropped: IntGaugeVec, + + #[name = bsatn_rlb_pool_new_allocated] + #[help = "Total number of fresh buffers allocated by the `BsatnRowListBuilderPool`"] + #[labels(node_id: str)] + pub bsatn_rlb_pool_new_allocated: IntGaugeVec, + + #[name = bsatn_rlb_pool_reused] + #[help = "Total number of buffers reused by the `BsatnRowListBuilderPool`"] + #[labels(node_id: str)] + pub bsatn_rlb_pool_reused: IntGaugeVec, + + #[name = bsatn_rlb_pool_returned] + #[help = "Total number of buffers returned to the `BsatnRowListBuilderPool`"] + #[labels(node_id: str)] + pub bsatn_rlb_pool_returned: IntGaugeVec, + #[name = tokio_num_workers] #[help = "Number of core tokio workers"] #[labels(node_id: str)] @@ -337,6 +362,29 @@ pub fn spawn_page_pool_stats(node_id: String, page_pool: PagePool) { }); } +static SPAWN_BSATN_RLB_POOL_GUARD: Once = Once::new(); +pub fn spawn_bsatn_rlb_pool_stats(node_id: String, pool: BsatnRowListBuilderPool) { + SPAWN_BSATN_RLB_POOL_GUARD.call_once(|| { + spawn(async move { + let resident_bytes = WORKER_METRICS.bsatn_rlb_pool_resident_bytes.with_label_values(&node_id); + let dropped_pages = WORKER_METRICS.bsatn_rlb_pool_dropped.with_label_values(&node_id); + let new_pages = WORKER_METRICS.bsatn_rlb_pool_new_allocated.with_label_values(&node_id); + let reused_pages = WORKER_METRICS.bsatn_rlb_pool_reused.with_label_values(&node_id); + let returned_pages = WORKER_METRICS.bsatn_rlb_pool_returned.with_label_values(&node_id); + + loop { + resident_bytes.set(pool.heap_usage() as i64); + dropped_pages.set(pool.dropped_count() as i64); + new_pages.set(pool.new_allocated_count() as i64); + reused_pages.set(pool.reused_count() as i64); + returned_pages.set(pool.reused_count() as i64); + + sleep(Duration::from_secs(10)).await; + } + }); + }); +} + // How frequently to update the tokio stats. #[cfg(all(target_has_atomic = "64", tokio_unstable))] const TOKIO_STATS_INTERVAL: Duration = Duration::from_secs(10); diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 4d281d29fd0..42994b93500 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -5,14 +5,14 @@ use std::{ use anyhow::{anyhow, Result}; use iter::PlanIter; -use spacetimedb_lib::{ - bsatn::{EncodeError, ToBsatn}, - query::Delta, - sats::impl_serialize, - AlgebraicValue, ProductValue, -}; +use spacetimedb_lib::query::Delta; use spacetimedb_physical_plan::plan::{ProjectField, ProjectPlan, TupleField}; use spacetimedb_primitives::{IndexId, TableId}; +use spacetimedb_sats::{ + bsatn::{BufReservedFill, EncodeError, ToBsatn}, + buffer::BufWriter, + impl_serialize, AlgebraicValue, ProductValue, +}; use spacetimedb_table::{ blob_store::BlobStore, static_assert_size, @@ -152,7 +152,7 @@ impl ToBsatn for Row<'_> { } } - fn to_bsatn_extend(&self, buf: &mut Vec) -> std::result::Result<(), EncodeError> { + fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> std::result::Result<(), EncodeError> { match self { Self::Ptr(ptr) => ptr.to_bsatn_extend(buf), Self::Ref(val) => val.to_bsatn_extend(buf), diff --git a/crates/sats/src/bsatn.rs b/crates/sats/src/bsatn.rs index d1909db952f..aab203f8e75 100644 --- a/crates/sats/src/bsatn.rs +++ b/crates/sats/src/bsatn.rs @@ -1,7 +1,10 @@ +use core::mem::MaybeUninit; + use crate::buffer::{BufReader, BufWriter, CountWriter}; use crate::de::{BasicSmallVecVisitor, Deserialize, DeserializeSeed, Deserializer as _}; use crate::ser::Serialize; use crate::{ProductValue, Typespace, WithTypespace}; +use bytes::BytesMut; use ser::BsatnError; use smallvec::SmallVec; @@ -106,6 +109,52 @@ codec_funcs!(val: crate::AlgebraicValue); codec_funcs!(val: crate::ProductValue); codec_funcs!(val: crate::SumValue); +/// Provides a view over a buffer that an reserve an additional `len` bytes +/// and then provide those as an uninitialized buffer to write into. +pub trait BufReservedFill { + /// Reserves space for `len` in `self` and then runs `fill` to fill it, + /// adding `len` to the total length of `self`. + /// + /// # Safety + /// + /// `fill` must initialize every byte in the slice. + unsafe fn reserve_and_fill(&mut self, len: usize, fill: impl FnOnce(&mut [MaybeUninit])); +} + +impl BufReservedFill for Vec { + unsafe fn reserve_and_fill(&mut self, len: usize, fill: impl FnOnce(&mut [MaybeUninit])) { + // Get an uninitialized slice within `self` of `len` bytes. + let start = self.len(); + self.reserve(len); + let sink = &mut self.spare_capacity_mut()[..len]; + + // Run the filling logic. + fill(sink); + + // SAFETY: Caller promised that `sink` was fully initialized, + // which entails that we initialized `start .. start + len`, + // so now we have initialized up to `start + len`. + unsafe { self.set_len(start + len) } + } +} + +impl BufReservedFill for BytesMut { + unsafe fn reserve_and_fill(&mut self, len: usize, fill: impl FnOnce(&mut [MaybeUninit])) { + // Get an uninitialized slice within `self` of `len` bytes. + let start = self.len(); + self.reserve(len); + let sink = &mut self.spare_capacity_mut()[..len]; + + // Run the filling logic. + fill(sink); + + // SAFETY: Caller promised that `sink` was fully initialized, + // which entails that we initialized `start .. start + len`, + // so now we have initialized up to `start + len`. + unsafe { self.set_len(start + len) } + } +} + /// Types that can be encoded to BSATN. /// /// Implementations of this trait may be more efficient than directly calling [`to_vec`]. @@ -117,7 +166,7 @@ pub trait ToBsatn { /// BSATN-encode the row referred to by `self` into `buf`, /// pushing `self`'s bytes onto the end of `buf`, similar to [`Vec::extend`]. - fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError>; + fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> Result<(), BsatnError>; /// Returns the static size of the type of this object. /// @@ -129,7 +178,7 @@ impl ToBsatn for &T { fn to_bsatn_vec(&self) -> Result, BsatnError> { T::to_bsatn_vec(*self) } - fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError> { + fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> Result<(), BsatnError> { T::to_bsatn_extend(*self, buf) } fn static_bsatn_size(&self) -> Option { @@ -142,7 +191,7 @@ impl ToBsatn for ProductValue { to_vec(self) } - fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError> { + fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> Result<(), BsatnError> { to_writer(buf, self) } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 6021694c9a7..a8a800154c8 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -20,6 +20,7 @@ use spacetimedb::host::{ }; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, Node, Replica}; +use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb_client_api::auth::{self, LOCALHOST}; @@ -100,6 +101,10 @@ impl StandaloneEnv { pub fn page_pool(&self) -> &PagePool { &self.host_controller.page_pool } + + pub fn bsatn_rlb_pool(&self) -> &BsatnRowListBuilderPool { + &self.host_controller.bsatn_rlb_pool + } } struct StandaloneDurabilityProvider { diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 54ae5a1cefa..b16bda39fb0 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -148,6 +148,7 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); worker_metrics::spawn_tokio_stats(listen_addr.clone()); worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); + worker_metrics::spawn_bsatn_rlb_pool_stats(listen_addr.clone(), ctx.bsatn_rlb_pool().clone()); let mut db_routes = DatabaseRoutes::default(); db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); diff --git a/crates/table/src/static_layout.rs b/crates/table/src/static_layout.rs index b414189d3a4..de633e203d2 100644 --- a/crates/table/src/static_layout.rs +++ b/crates/table/src/static_layout.rs @@ -29,6 +29,7 @@ use core::mem::MaybeUninit; use core::ptr; use smallvec::SmallVec; use spacetimedb_data_structures::slim_slice::SlimSmallSliceBox; +use spacetimedb_sats::bsatn::BufReservedFill; use spacetimedb_sats::layout::{ AlgebraicTypeLayout, HasLayout, PrimitiveType, ProductTypeElementLayout, ProductTypeLayoutView, RowTypeLayout, SumTypeLayout, SumTypeVariantLayout, @@ -86,22 +87,11 @@ impl StaticLayout { /// As a consequence of this, for every `field` in `self.fields`, /// `row[field.bflatn_offset .. field.bflatn_offset + length]` will be initialized. pub(crate) unsafe fn serialize_row_into_vec(&self, row: &Bytes) -> Vec { - // Create an uninitialized buffer `buf` of the correct length. - let bsatn_len = self.bsatn_length as usize; - let mut buf = Vec::with_capacity(bsatn_len); - let sink = buf.spare_capacity_mut(); + let mut buf = Vec::new(); - // (1) Write the row into the slice using a series of `memcpy`s. - // SAFETY: - // - Caller promised that `row` is valid for `self`. - // - `sink` was constructed with exactly the correct length above. - unsafe { - self.serialize_row_into(sink, row); - } + // SAFETY: Forward caller requirements. + unsafe { self.serialize_row_extend(&mut buf, row) }; - // SAFETY: In (1), we initialized `0..len` - // as `row` was valid for `self` per caller requirements. - unsafe { buf.set_len(bsatn_len) } buf } @@ -113,26 +103,18 @@ impl StaticLayout { /// for which `self` was computed. /// As a consequence of this, for every `field` in `self.fields`, /// `row[field.bflatn_offset .. field.bflatn_offset + length]` will be initialized. - pub(crate) unsafe fn serialize_row_extend(&self, buf: &mut Vec, row: &Bytes) { - // Get an uninitialized slice within `buf` of the correct length. - let start = buf.len(); + pub(crate) unsafe fn serialize_row_extend(&self, buf: &mut impl BufReservedFill, row: &Bytes) { let len = self.bsatn_length as usize; - buf.reserve(len); - let sink = &mut buf.spare_capacity_mut()[..len]; - - // (1) Write the row into the slice using a series of `memcpy`s. + // Writes the row into the slice using a series of `memcpy`s. // SAFETY: // - Caller promised that `row` is valid for `self`. // - `sink` was constructed with exactly the correct length above. - unsafe { + let filler = |sink: &mut _| unsafe { self.serialize_row_into(sink, row); - } - - // SAFETY: In (1), we initialized `start .. start + len` - // as `row` was valid for `self` per caller requirements - // and we had initialized up to `start` before, - // so now we have initialized up to `start + len`. - unsafe { buf.set_len(start + len) } + }; + // SAFETY: + // The closure `filler` will write exactly `len` bytes. + unsafe { buf.reserve_and_fill(len, filler) }; } #[allow(unused)] diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index eb419702319..897e32eded9 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -27,13 +27,14 @@ use core::{mem, ops::RangeBounds}; use derive_more::{Add, AddAssign, From, Sub, SubAssign}; use enum_as_inner::EnumAsInner; use smallvec::SmallVec; -use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned}; use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId}; use spacetimedb_sats::layout::{AlgebraicTypeLayout, PrimitiveType, RowTypeLayout, Size}; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{ algebraic_value::ser::ValueSerializer, - bsatn::{self, ser::BsatnError, ToBsatn}, + bsatn::{self, ser::BsatnError, BufReservedFill, DecodeError, ToBsatn}, + buffer::BufWriter, + de::DeserializeOwned, i256, product_value::InvalidFieldError, satn::Satn, @@ -1745,7 +1746,7 @@ impl ToBsatn for RowRef<'_> { /// /// This method will use a [`StaticLayout`] if one is available, /// and may therefore be faster than calling [`bsatn::to_writer`]. - fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError> { + fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> Result<(), BsatnError> { if let Some(static_layout) = self.static_layout() { // Use fast path, by first fetching the row data and then using the static layout. let row = self.get_row_data(); diff --git a/crates/vm/src/relation.rs b/crates/vm/src/relation.rs index 1c96413e986..4d34c0c1f86 100644 --- a/crates/vm/src/relation.rs +++ b/crates/vm/src/relation.rs @@ -1,7 +1,8 @@ use core::hash::{Hash, Hasher}; use spacetimedb_execution::Row; use spacetimedb_lib::db::auth::StAccess; -use spacetimedb_sats::bsatn::{ser::BsatnError, ToBsatn}; +use spacetimedb_sats::bsatn::{ser::BsatnError, BufReservedFill, ToBsatn}; +use spacetimedb_sats::buffer::BufWriter; use spacetimedb_sats::product_value::ProductValue; use spacetimedb_sats::{impl_serialize, AlgebraicValue}; use spacetimedb_schema::relation::{ColExpr, ColExprRef, Header}; @@ -171,7 +172,7 @@ impl ToBsatn for RelValue<'_> { RelValue::ProjRef(this) => (*this).to_bsatn_vec(), } } - fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError> { + fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> Result<(), BsatnError> { match self { RelValue::Row(this) => this.to_bsatn_extend(buf), RelValue::Projection(this) => this.to_bsatn_extend(buf),