From c944598b4889052c57d00fa3a45a424724c42a18 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 27 Jun 2025 14:11:06 +0200 Subject: [PATCH 1/4] execute_plan: don't build temporary Vec of rows --- crates/client-api-messages/src/websocket.rs | 232 +++++++++++------- crates/core/src/subscription/mod.rs | 13 +- .../module_subscription_manager.rs | 2 + 3 files changed, 153 insertions(+), 94 deletions(-) diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index b2a92e71bb0..d63e7dccf32 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use bytestring::ByteString; use core::{ fmt::Debug, + mem, ops::{Deref, Range}, }; use enum_as_inner::EnumAsInner; @@ -40,8 +41,19 @@ use std::{ pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb"; pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb"; +/// A list of rows being built. +pub trait RowListBuilder: Default { + type FinishedList; + + /// Push a row to the list in a serialized format. + fn push(&mut self, row: impl ToBsatn + Serialize); + + /// Finish the in flight list, throwing away the capability to mutate. + fn finish(self) -> Self::FinishedList; +} + pub trait RowListLen { - /// Returns the length of the list. + /// Returns the length, in number of rows, not bytes, of the row list. fn len(&self) -> usize; /// Returns whether the list is empty or not. fn is_empty(&self) -> bool { @@ -86,8 +98,19 @@ pub trait WebsocketFormat: Sized { + Clone + Default; + /// The builder for [`Self::List`]. + type ListBuilder: RowListBuilder; + /// Encodes the `elems` to a list in the format and also returns the length of the list. - fn encode_list(elems: impl Iterator) -> (Self::List, u64); + fn encode_list(elems: impl Iterator) -> (Self::List, u64) { + let mut num_rows = 0; + let mut list = Self::ListBuilder::default(); + for elem in elems { + num_rows += 1; + list.push(elem); + } + (list.finish(), num_rows) + } /// The type used to encode query updates. /// This type exists so that some formats, e.g., BSATN, can compress an update. @@ -758,15 +781,7 @@ impl WebsocketFormat for JsonFormat { type Single = ByteString; type List = Vec; - - fn encode_list(elems: impl Iterator) -> (Self::List, u64) { - let mut count = 0; - let list = elems - .map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into()) - .inspect(|_| count += 1) - .collect(); - (list, count) - } + type ListBuilder = Self::List; type QueryUpdate = QueryUpdate; @@ -775,6 +790,17 @@ impl WebsocketFormat for JsonFormat { } } +impl RowListBuilder for Vec { + type FinishedList = Self; + fn push(&mut self, row: impl ToBsatn + Serialize) { + let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into(); + self.push(value); + } + fn finish(self) -> Self::FinishedList { + self + } +} + #[derive(Clone, Copy, Default, Debug, SpacetimeType)] #[sats(crate = spacetimedb_lib)] pub struct BsatnFormat; @@ -783,33 +809,7 @@ impl WebsocketFormat for BsatnFormat { type Single = Box<[u8]>; type List = BsatnRowList; - - fn encode_list(mut elems: impl Iterator) -> (Self::List, u64) { - // For an empty list, the size of a row is unknown, so use `RowOffsets`. - let Some(first) = elems.next() else { - return (BsatnRowList::row_offsets(), 0); - }; - // We have at least one row. Determine the static size from that, if available. - let (mut list, mut scratch) = match first.static_bsatn_size() { - Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)), - None => (BsatnRowListBuilder::row_offsets(), Vec::new()), - }; - // Add the first element and then the rest. - // We assume that the schema of rows yielded by `elems` stays the same, - // so once the size is fixed, it will stay that way. - let mut count = 0; - let mut push = |elem: R| { - elem.to_bsatn_extend(&mut scratch).unwrap(); - list.push(&scratch); - scratch.clear(); - count += 1; - }; - push(first); - for elem in elems { - push(elem); - } - (list.finish(), count) - } + type ListBuilder = BsatnRowListBuilder; type QueryUpdate = CompressableQueryUpdate; @@ -896,20 +896,14 @@ type RowSize = u16; type RowOffset = u64; /// A packed list of BSATN-encoded rows. -#[derive(SpacetimeType, Debug, Clone)] +#[derive(SpacetimeType, Debug, Clone, Default)] #[sats(crate = spacetimedb_lib)] -pub struct BsatnRowList> { +pub struct BsatnRowList { /// A size hint about `rows_data` /// intended to facilitate parallel decode purposes on large initial updates. - size_hint: RowSizeHint, + size_hint: RowSizeHint, /// The flattened byte array for a list of rows. - rows_data: B, -} - -impl Default for BsatnRowList { - fn default() -> Self { - Self::row_offsets() - } + rows_data: Bytes, } /// NOTE(centril, 1.0): We might want to add a `None` variant to this @@ -917,17 +911,23 @@ impl Default for BsatnRowList { /// The use-case for this is clients who are bandwidth limited and where every byte counts. #[derive(SpacetimeType, Debug, Clone)] #[sats(crate = spacetimedb_lib)] -pub enum RowSizeHint { +pub enum RowSizeHint { /// Each row in `rows_data` is of the same fixed size as specified here. FixedSize(RowSize), /// The offsets into `rows_data` defining the boundaries of each row. /// Only stores the offset to the start of each row. /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. /// The behavior of this is identical to that of `PackedStr`. - RowOffsets(I), + RowOffsets(Arc<[RowOffset]>), +} + +impl Default for RowSizeHint { + fn default() -> Self { + Self::RowOffsets([].into()) + } } -impl> RowSizeHint { +impl RowSizeHint { fn index_to_range(&self, index: usize, data_end: usize) -> Option> { match self { Self::FixedSize(size) => { @@ -952,37 +952,17 @@ impl> RowSizeHint { } } -impl BsatnRowList { - pub fn fixed(row_size: RowSize) -> Self { - Self { - size_hint: RowSizeHint::FixedSize(row_size), - rows_data: <_>::default(), - } - } - - /// Returns a new empty list using indices - pub fn row_offsets() -> Self - where - I: From<[RowOffset; 0]>, - { - Self { - size_hint: RowSizeHint::RowOffsets([].into()), - rows_data: <_>::default(), - } - } -} - -impl, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList { - /// Returns the length of the row list. +impl RowListLen for BsatnRowList { fn len(&self) -> usize { match &self.size_hint { + // `size != 0` is always the case for `FixedSize`. RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize, RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(), } } } -impl, I> ByteListLen for BsatnRowList { +impl ByteListLen for BsatnRowList { /// Returns the uncompressed size of the list in bytes fn num_bytes(&self) -> usize { self.rows_data.as_ref().len() @@ -1022,26 +1002,100 @@ impl Iterator for BsatnRowListIter<'_> { } /// A [`BsatnRowList`] that can be added to. -pub type BsatnRowListBuilder = BsatnRowList, Vec>; - -impl BsatnRowListBuilder { - /// Adds `row`, BSATN-encoded to this list. - #[inline] - pub fn push(&mut self, row: &[u8]) { - if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint { - offsets.push(self.rows_data.len() as u64); - } - self.rows_data.extend_from_slice(row); +#[derive(Default)] +pub struct BsatnRowListBuilder { + /// A size hint about `rows_data` + /// intended to facilitate parallel decode purposes on large initial updates. + size_hint: RowSizeHintBuilder, + /// The flattened byte array for a list of rows. + rows_data: Vec, +} + +/// A [`RowSizeHint`] under construction. +pub enum RowSizeHintBuilder { + /// We haven't seen any rows yet. + Empty, + /// Each row in `rows_data` is of the same fixed size as specified here + /// but we don't know whether the size fits in `RowSize` + /// and we don't know whether future rows will also have this size. + FixedSizeDyn(usize), + /// Each row in `rows_data` is of the same fixed size as specified here + /// and we know that this will be the case for future rows as well. + FixedSizeStatic(RowSize), + /// The offsets into `rows_data` defining the boundaries of each row. + /// Only stores the offset to the start of each row. + /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. + /// The behavior of this is identical to that of `PackedStr`. + RowOffsets(Vec), +} + +impl Default for RowSizeHintBuilder { + fn default() -> Self { + Self::Empty } +} - /// Finish the in flight list, throwing away the capability to mutate. - pub fn finish(self) -> BsatnRowList { +impl RowListBuilder for BsatnRowListBuilder { + type FinishedList = BsatnRowList; + + fn push(&mut self, row: impl ToBsatn + Serialize) { + use RowSizeHintBuilder::*; + + // Record the length before. It will be the starting offset of `row`. + let len_before = self.rows_data.len(); + // BSATN-encode the row directly to the buffer. + row.to_bsatn_extend(&mut self.rows_data).unwrap(); + + let encoded_len = || self.rows_data.len() - len_before; + let push_row_offset = |mut offsets: Vec<_>| { + offsets.push(len_before as u64); + RowOffsets(offsets) + }; + + let hint = mem::replace(&mut self.size_hint, Empty); + self.size_hint = match hint { + // Static size that is unchanging. + h @ FixedSizeStatic(_) => h, + // Dynamic size that is unchanging. + h @ FixedSizeDyn(size) if size == encoded_len() => h, + // Size mismatch for the dynamic fixed size. + // Now we must construct `RowOffsets` for all rows thus far. + // We know that `size != 0` here, as this was excluded when we had `Empty`. + FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)), + // Once there's a size for each row, we'll just add to it. + RowOffsets(offsets) => push_row_offset(offsets), + // First time a row is seen. Use `encoded_len()` as the hint. + // If we have a static layout, we'll always have a fixed size. + // Otherwise, let's start out with a potentially fixed size. + // In either case, if `encoded_len() == 0`, we have to store offsets, + // as we cannot recover the number of elements otherwise. + Empty => match row.static_bsatn_size() { + Some(0) => push_row_offset(Vec::new()), + Some(size) => FixedSizeStatic(size), + None => match encoded_len() { + 0 => push_row_offset(Vec::new()), + size => FixedSizeDyn(size), + }, + }, + }; + } + + fn finish(self) -> Self::FinishedList { let Self { size_hint, rows_data } = self; - let rows_data = rows_data.into(); let size_hint = match size_hint { - RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs), - RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), + RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()), + RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs), + RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) { + Ok(fs) => RowSizeHint::FixedSize(fs), + Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()), + }, + RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), }; + let rows_data = rows_data.into(); BsatnRowList { size_hint, rows_data } } } + +fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec { + (0..num_rows).map(|i| i * size).map(|o| o as u64).collect() +} diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 7e77bfb7afc..3390e2b6f63 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -5,7 +5,8 @@ use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ - ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat, + ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate, + WebsocketFormat, }; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; @@ -97,20 +98,22 @@ where Tx: Datastore + DeltaStore, F: WebsocketFormat, { - let mut rows = vec![]; + let mut count = 0; + let mut list = F::ListBuilder::default(); let mut metrics = ExecutionMetrics::default(); for fragment in plan_fragments { fragment.execute(tx, &mut metrics, &mut |row| { - rows.push(row); + count += 1; + list.push(row); Ok(()) })?; } - let (list, n) = F::encode_list(rows.into_iter()); + let list = list.finish(); metrics.bytes_scanned += list.num_bytes(); metrics.bytes_sent_to_clients += list.num_bytes(); - Ok((list, n, metrics)) + Ok((list, count, metrics)) } /// When collecting a table update are we inserting or deleting rows? diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index e9b8dee0d6c..2ec8e0ecdd8 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -1194,6 +1194,8 @@ impl SubscriptionManager { ) -> SingleQueryUpdate { let (update, num_rows, num_bytes) = memory .get_or_insert_with(|| { + // TODO(centril): consider pushing the encoding of each row into + // `eval_delta` instead, to avoid building the temporary `Vec`s in `UpdatesRelValue`. let encoded = updates.encode::(); // The first time we insert into this map, we call encode. // This is when we serialize the rows to BSATN/JSON. From 712e2f6a5085762545314f807704f49143df8ab2 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 15 Jul 2025 16:23:08 +0200 Subject: [PATCH 2/4] split WebsocketFormat, adding BuildableWebsocketFormat --- crates/client-api-messages/src/websocket.rs | 24 +++++++++++-------- crates/core/src/host/module_host.rs | 8 ++++--- .../core/src/subscription/execution_unit.rs | 4 ++-- crates/core/src/subscription/mod.rs | 10 ++++---- .../module_subscription_manager.rs | 6 ++--- crates/core/src/subscription/subscription.rs | 4 ++-- 6 files changed, 31 insertions(+), 25 deletions(-) diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index d63e7dccf32..682e5a24aa9 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -98,6 +98,12 @@ pub trait WebsocketFormat: Sized { + Clone + Default; + /// The type used to encode query updates. + /// This type exists so that some formats, e.g., BSATN, can compress an update. + type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send; +} + +pub trait BuildableWebsocketFormat: WebsocketFormat { /// The builder for [`Self::List`]. type ListBuilder: RowListBuilder; @@ -112,10 +118,6 @@ pub trait WebsocketFormat: Sized { (list.finish(), num_rows) } - /// The type used to encode query updates. - /// This type exists so that some formats, e.g., BSATN, can compress an update. - type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send; - /// Convert a `QueryUpdate` into `Self::QueryUpdate`. /// This allows some formats to e.g., compress the update. fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate; @@ -779,11 +781,12 @@ pub struct JsonFormat; impl WebsocketFormat for JsonFormat { type Single = ByteString; - type List = Vec; - type ListBuilder = Self::List; - type QueryUpdate = QueryUpdate; +} + +impl BuildableWebsocketFormat for JsonFormat { + type ListBuilder = Self::List; fn into_query_update(qu: QueryUpdate, _: Compression) -> Self::QueryUpdate { qu @@ -807,11 +810,12 @@ pub struct BsatnFormat; impl WebsocketFormat for BsatnFormat { type Single = Box<[u8]>; - type List = BsatnRowList; - type ListBuilder = BsatnRowListBuilder; - type QueryUpdate = CompressableQueryUpdate; +} + +impl BuildableWebsocketFormat for BsatnFormat { + type ListBuilder = BsatnRowListBuilder; fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate { let qu_len_would_have_been = bsatn::to_len(&qu).unwrap(); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 6925b0f2b7f..5ec2829d1aa 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -25,7 +25,9 @@ use derive_more::From; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; -use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::{ + BuildableWebsocketFormat, ByteListLen, Compression, OneOffTable, QueryUpdate, +}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; @@ -134,7 +136,7 @@ impl UpdatesRelValue<'_> { !(self.deletes.is_empty() && self.inserts.is_empty()) } - pub fn encode(&self) -> (F::QueryUpdate, u64, usize) { + pub fn encode(&self) -> (F::QueryUpdate, u64, usize) { let (deletes, nr_del) = F::encode_list(self.deletes.iter()); let (inserts, nr_ins) = F::encode_list(self.inserts.iter()); let num_rows = nr_del + nr_ins; @@ -1096,7 +1098,7 @@ impl ModuleHost { /// This only returns an error if there is a db-level problem. /// An error with the query itself will be sent to the client. #[tracing::instrument(level = "trace", skip_all)] - pub async fn one_off_query( + pub async fn one_off_query( &self, caller_identity: Identity, query: String, diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index 66f43572a32..31eb22e6f57 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -8,7 +8,7 @@ use crate::messages::websocket::TableUpdate; use crate::util::slow::SlowQueryLogger; use crate::vm::{build_query, TxMode}; use spacetimedb_client_api_messages::websocket::{ - Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat, + BuildableWebsocketFormat, Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, }; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_lib::Identity; @@ -236,7 +236,7 @@ impl ExecutionUnit { /// Evaluate this execution unit against the database using the specified format. #[tracing::instrument(level = "trace", skip_all)] - pub fn eval( + pub fn eval( &self, db: &RelationalDB, tx: &Tx, diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 3390e2b6f63..2012f069a47 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -5,8 +5,8 @@ use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ - ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate, - WebsocketFormat, + BuildableWebsocketFormat, ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, + SingleQueryUpdate, TableUpdate, }; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; @@ -96,7 +96,7 @@ impl MetricsRecorder for ExecutionCounters { pub fn execute_plan(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> where Tx: Datastore + DeltaStore, - F: WebsocketFormat, + F: BuildableWebsocketFormat, { let mut count = 0; let mut list = F::ListBuilder::default(); @@ -134,7 +134,7 @@ pub fn collect_table_update( ) -> Result<(TableUpdate, ExecutionMetrics)> where Tx: Datastore + DeltaStore, - F: WebsocketFormat, + F: BuildableWebsocketFormat, { execute_plan::(plan_fragments, tx).map(|(rows, num_rows, metrics)| { let empty = F::List::default(); @@ -167,7 +167,7 @@ pub fn execute_plans( ) -> Result<(DatabaseUpdate, ExecutionMetrics), DBError> where Tx: Datastore + DeltaStore + Sync, - F: WebsocketFormat, + F: BuildableWebsocketFormat, { plans .par_iter() diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 2ec8e0ecdd8..229c601de5f 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -16,8 +16,8 @@ use hashbrown::{HashMap, HashSet}; use parking_lot::RwLock; use prometheus::IntGauge; use spacetimedb_client_api_messages::websocket::{ - BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate, - WebsocketFormat, + BsatnFormat, BuildableWebsocketFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, + SingleQueryUpdate, }; use spacetimedb_data_structures::map::{Entry, IntMap}; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; @@ -1187,7 +1187,7 @@ impl SubscriptionManager { let mut ops_bin_uncompressed: Option<(CompressableQueryUpdate, _, _)> = None; let mut ops_json: Option<(QueryUpdate, _, _)> = None; - fn memo_encode( + fn memo_encode( updates: &UpdatesRelValue<'_>, memory: &mut Option<(F::QueryUpdate, u64, usize)>, metrics: &mut ExecutionMetrics, diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 28a1fe66c97..caf71558193 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -31,7 +31,7 @@ use crate::sql::ast::SchemaViewer; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::{BuildableWebsocketFormat, Compression}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -512,7 +512,7 @@ pub struct ExecutionSet { } impl ExecutionSet { - pub fn eval( + pub fn eval( &self, db: &RelationalDB, tx: &Tx, From c7bffe668b218080f83140e6de0c465c30177450 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 15 Jul 2025 16:47:13 +0200 Subject: [PATCH 3/4] move building parts of websocket format to own module --- crates/client-api-messages/src/lib.rs | 1 + crates/client-api-messages/src/websocket.rs | 231 ++---------------- .../src/websocket_building.rs | 218 +++++++++++++++++ 3 files changed, 237 insertions(+), 213 deletions(-) create mode 100644 crates/client-api-messages/src/websocket_building.rs diff --git a/crates/client-api-messages/src/lib.rs b/crates/client-api-messages/src/lib.rs index bf2f1dc9ca5..c13870ad6d2 100644 --- a/crates/client-api-messages/src/lib.rs +++ b/crates/client-api-messages/src/lib.rs @@ -4,3 +4,4 @@ pub mod energy; pub mod http; pub mod name; pub mod websocket; +mod websocket_building; diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index 682e5a24aa9..1f89f87b9a6 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -19,7 +19,6 @@ use bytes::Bytes; use bytestring::ByteString; use core::{ fmt::Debug, - mem, ops::{Deref, Range}, }; use enum_as_inner::EnumAsInner; @@ -27,31 +26,22 @@ use smallvec::SmallVec; use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp}; use spacetimedb_primitives::TableId; use spacetimedb_sats::{ - bsatn::{self, ToBsatn}, + bsatn, de::{Deserialize, Error}, impl_deserialize, impl_serialize, impl_st, - ser::{serde::SerializeWrapper, Serialize}, + ser::Serialize, AlgebraicType, SpacetimeType, }; use std::{ - io::{self, Read as _, Write as _}, + io::{self, Read as _}, sync::Arc, }; +pub use crate::websocket_building::*; + pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb"; pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb"; -/// A list of rows being built. -pub trait RowListBuilder: Default { - type FinishedList; - - /// Push a row to the list in a serialized format. - fn push(&mut self, row: impl ToBsatn + Serialize); - - /// Finish the in flight list, throwing away the capability to mutate. - fn finish(self) -> Self::FinishedList; -} - pub trait RowListLen { /// Returns the length, in number of rows, not bytes, of the row list. fn len(&self) -> usize; @@ -103,26 +93,6 @@ pub trait WebsocketFormat: Sized { type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send; } -pub trait BuildableWebsocketFormat: WebsocketFormat { - /// The builder for [`Self::List`]. - type ListBuilder: RowListBuilder; - - /// Encodes the `elems` to a list in the format and also returns the length of the list. - fn encode_list(elems: impl Iterator) -> (Self::List, u64) { - let mut num_rows = 0; - let mut list = Self::ListBuilder::default(); - for elem in elems { - num_rows += 1; - list.push(elem); - } - (list.finish(), num_rows) - } - - /// Convert a `QueryUpdate` into `Self::QueryUpdate`. - /// This allows some formats to e.g., compress the update. - fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate; -} - /// Messages sent from the client to the server. /// /// Parametric over the reducer argument type to enable [`ClientMessage::map_args`]. @@ -785,25 +755,6 @@ impl WebsocketFormat for JsonFormat { type QueryUpdate = QueryUpdate; } -impl BuildableWebsocketFormat for JsonFormat { - type ListBuilder = Self::List; - - fn into_query_update(qu: QueryUpdate, _: Compression) -> Self::QueryUpdate { - qu - } -} - -impl RowListBuilder for Vec { - type FinishedList = Self; - fn push(&mut self, row: impl ToBsatn + Serialize) { - let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into(); - self.push(value); - } - fn finish(self) -> Self::FinishedList { - self - } -} - #[derive(Clone, Copy, Default, Debug, SpacetimeType)] #[sats(crate = spacetimedb_lib)] pub struct BsatnFormat; @@ -814,30 +765,6 @@ impl WebsocketFormat for BsatnFormat { type QueryUpdate = CompressableQueryUpdate; } -impl BuildableWebsocketFormat for BsatnFormat { - type ListBuilder = BsatnRowListBuilder; - - fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate { - let qu_len_would_have_been = bsatn::to_len(&qu).unwrap(); - - match decide_compression(qu_len_would_have_been, compression) { - Compression::None => CompressableQueryUpdate::Uncompressed(qu), - Compression::Brotli => { - let bytes = bsatn::to_vec(&qu).unwrap(); - let mut out = Vec::new(); - brotli_compress(&bytes, &mut out); - CompressableQueryUpdate::Brotli(out.into()) - } - Compression::Gzip => { - let bytes = bsatn::to_vec(&qu).unwrap(); - let mut out = Vec::new(); - gzip_compress(&bytes, &mut out); - CompressableQueryUpdate::Gzip(out.into()) - } - } - } -} - /// A specification of either a desired or decided compression algorithm. #[derive(serde::Deserialize, Default, PartialEq, Eq, Clone, Copy, Hash, Debug)] pub enum Compression { @@ -850,54 +777,20 @@ pub enum Compression { Gzip, } -pub fn decide_compression(len: usize, compression: Compression) -> Compression { - /// The threshold beyond which we start to compress messages. - /// 1KiB was chosen without measurement. - /// TODO(perf): measure! - const COMPRESS_THRESHOLD: usize = 1024; - - if len > COMPRESS_THRESHOLD { - compression - } else { - Compression::None - } -} - -pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) { - // We are optimizing for compression speed, - // so we choose the lowest (fastest) level of compression. - // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 - // for large `SubscriptionUpdate` messages at this level. - const COMPRESSION_LEVEL: i32 = 1; - - let params = brotli::enc::BrotliEncoderParams { - quality: COMPRESSION_LEVEL, - ..<_>::default() - }; - let reader = &mut &bytes[..]; - brotli::BrotliCompress(reader, out, ¶ms).expect("should be able to BrotliCompress"); -} - pub fn brotli_decompress(bytes: &[u8]) -> Result, io::Error> { let mut decompressed = Vec::new(); brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?; Ok(decompressed) } -pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) { - let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast()); - encoder.write_all(bytes).unwrap(); - encoder.finish().expect("should be able to gzip compress `bytes`"); -} - pub fn gzip_decompress(bytes: &[u8]) -> Result, io::Error> { let mut decompressed = Vec::new(); let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?; Ok(decompressed) } -type RowSize = u16; -type RowOffset = u64; +pub type RowSize = u16; +pub type RowOffset = u64; /// A packed list of BSATN-encoded rows. #[derive(SpacetimeType, Debug, Clone, Default)] @@ -910,6 +803,17 @@ pub struct BsatnRowList { rows_data: Bytes, } +impl BsatnRowList { + /// Returns a new row list where `rows_data` is the flattened byte array + /// containing the BSATN of each row, without any markers for where a row begins and end. + /// + /// The `size_hint` encodes the boundaries of each row in `rows_data`. + /// See [`RowSizeHint`] for more details on the encoding. + pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self { + Self { size_hint, rows_data } + } +} + /// NOTE(centril, 1.0): We might want to add a `None` variant to this /// where the client has to decode in a loop until `rows_data` has been exhausted. /// The use-case for this is clients who are bandwidth limited and where every byte counts. @@ -1004,102 +908,3 @@ impl Iterator for BsatnRowListIter<'_> { self.list.get(index) } } - -/// A [`BsatnRowList`] that can be added to. -#[derive(Default)] -pub struct BsatnRowListBuilder { - /// A size hint about `rows_data` - /// intended to facilitate parallel decode purposes on large initial updates. - size_hint: RowSizeHintBuilder, - /// The flattened byte array for a list of rows. - rows_data: Vec, -} - -/// A [`RowSizeHint`] under construction. -pub enum RowSizeHintBuilder { - /// We haven't seen any rows yet. - Empty, - /// Each row in `rows_data` is of the same fixed size as specified here - /// but we don't know whether the size fits in `RowSize` - /// and we don't know whether future rows will also have this size. - FixedSizeDyn(usize), - /// Each row in `rows_data` is of the same fixed size as specified here - /// and we know that this will be the case for future rows as well. - FixedSizeStatic(RowSize), - /// The offsets into `rows_data` defining the boundaries of each row. - /// Only stores the offset to the start of each row. - /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. - /// The behavior of this is identical to that of `PackedStr`. - RowOffsets(Vec), -} - -impl Default for RowSizeHintBuilder { - fn default() -> Self { - Self::Empty - } -} - -impl RowListBuilder for BsatnRowListBuilder { - type FinishedList = BsatnRowList; - - fn push(&mut self, row: impl ToBsatn + Serialize) { - use RowSizeHintBuilder::*; - - // Record the length before. It will be the starting offset of `row`. - let len_before = self.rows_data.len(); - // BSATN-encode the row directly to the buffer. - row.to_bsatn_extend(&mut self.rows_data).unwrap(); - - let encoded_len = || self.rows_data.len() - len_before; - let push_row_offset = |mut offsets: Vec<_>| { - offsets.push(len_before as u64); - RowOffsets(offsets) - }; - - let hint = mem::replace(&mut self.size_hint, Empty); - self.size_hint = match hint { - // Static size that is unchanging. - h @ FixedSizeStatic(_) => h, - // Dynamic size that is unchanging. - h @ FixedSizeDyn(size) if size == encoded_len() => h, - // Size mismatch for the dynamic fixed size. - // Now we must construct `RowOffsets` for all rows thus far. - // We know that `size != 0` here, as this was excluded when we had `Empty`. - FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)), - // Once there's a size for each row, we'll just add to it. - RowOffsets(offsets) => push_row_offset(offsets), - // First time a row is seen. Use `encoded_len()` as the hint. - // If we have a static layout, we'll always have a fixed size. - // Otherwise, let's start out with a potentially fixed size. - // In either case, if `encoded_len() == 0`, we have to store offsets, - // as we cannot recover the number of elements otherwise. - Empty => match row.static_bsatn_size() { - Some(0) => push_row_offset(Vec::new()), - Some(size) => FixedSizeStatic(size), - None => match encoded_len() { - 0 => push_row_offset(Vec::new()), - size => FixedSizeDyn(size), - }, - }, - }; - } - - fn finish(self) -> Self::FinishedList { - let Self { size_hint, rows_data } = self; - let size_hint = match size_hint { - RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()), - RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs), - RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) { - Ok(fs) => RowSizeHint::FixedSize(fs), - Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()), - }, - RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), - }; - let rows_data = rows_data.into(); - BsatnRowList { size_hint, rows_data } - } -} - -fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec { - (0..num_rows).map(|i| i * size).map(|o| o as u64).collect() -} diff --git a/crates/client-api-messages/src/websocket_building.rs b/crates/client-api-messages/src/websocket_building.rs new file mode 100644 index 00000000000..45c9a266de2 --- /dev/null +++ b/crates/client-api-messages/src/websocket_building.rs @@ -0,0 +1,218 @@ +use crate::websocket::{ + BsatnFormat, BsatnRowList, CompressableQueryUpdate, Compression, JsonFormat, QueryUpdate, RowOffset, RowSize, + RowSizeHint, WebsocketFormat, +}; +use bytestring::ByteString; +use core::mem; +use spacetimedb_sats::bsatn::{self, ToBsatn}; +use spacetimedb_sats::ser::serde::SerializeWrapper; +use spacetimedb_sats::Serialize; +use std::io; +use std::io::Write as _; + +/// A list of rows being built. +pub trait RowListBuilder: Default { + type FinishedList; + + /// Push a row to the list in a serialized format. + fn push(&mut self, row: impl ToBsatn + Serialize); + + /// Finish the in flight list, throwing away the capability to mutate. + fn finish(self) -> Self::FinishedList; +} + +pub trait BuildableWebsocketFormat: WebsocketFormat { + /// The builder for [`Self::List`]. + type ListBuilder: RowListBuilder; + + /// Encodes the `elems` to a list in the format and also returns the length of the list. + fn encode_list(elems: impl Iterator) -> (Self::List, u64) { + let mut num_rows = 0; + let mut list = Self::ListBuilder::default(); + for elem in elems { + num_rows += 1; + list.push(elem); + } + (list.finish(), num_rows) + } + + /// Convert a `QueryUpdate` into `Self::QueryUpdate`. + /// This allows some formats to e.g., compress the update. + fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate; +} + +impl BuildableWebsocketFormat for JsonFormat { + type ListBuilder = Self::List; + + fn into_query_update(qu: QueryUpdate, _: Compression) -> Self::QueryUpdate { + qu + } +} + +impl RowListBuilder for Vec { + type FinishedList = Self; + fn push(&mut self, row: impl ToBsatn + Serialize) { + let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into(); + self.push(value); + } + fn finish(self) -> Self::FinishedList { + self + } +} + +/// A [`BsatnRowList`] that can be added to. +#[derive(Default)] +pub struct BsatnRowListBuilder { + /// A size hint about `rows_data` + /// intended to facilitate parallel decode purposes on large initial updates. + size_hint: RowSizeHintBuilder, + /// The flattened byte array for a list of rows. + rows_data: Vec, +} + +/// A [`RowSizeHint`] under construction. +pub enum RowSizeHintBuilder { + /// We haven't seen any rows yet. + Empty, + /// Each row in `rows_data` is of the same fixed size as specified here + /// but we don't know whether the size fits in `RowSize` + /// and we don't know whether future rows will also have this size. + FixedSizeDyn(usize), + /// Each row in `rows_data` is of the same fixed size as specified here + /// and we know that this will be the case for future rows as well. + FixedSizeStatic(RowSize), + /// The offsets into `rows_data` defining the boundaries of each row. + /// Only stores the offset to the start of each row. + /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. + /// The behavior of this is identical to that of `PackedStr`. + RowOffsets(Vec), +} + +impl Default for RowSizeHintBuilder { + fn default() -> Self { + Self::Empty + } +} + +impl RowListBuilder for BsatnRowListBuilder { + type FinishedList = BsatnRowList; + + fn push(&mut self, row: impl ToBsatn + Serialize) { + use RowSizeHintBuilder::*; + + // Record the length before. It will be the starting offset of `row`. + let len_before = self.rows_data.len(); + // BSATN-encode the row directly to the buffer. + row.to_bsatn_extend(&mut self.rows_data).unwrap(); + + let encoded_len = || self.rows_data.len() - len_before; + let push_row_offset = |mut offsets: Vec<_>| { + offsets.push(len_before as u64); + RowOffsets(offsets) + }; + + let hint = mem::replace(&mut self.size_hint, Empty); + self.size_hint = match hint { + // Static size that is unchanging. + h @ FixedSizeStatic(_) => h, + // Dynamic size that is unchanging. + h @ FixedSizeDyn(size) if size == encoded_len() => h, + // Size mismatch for the dynamic fixed size. + // Now we must construct `RowOffsets` for all rows thus far. + // We know that `size != 0` here, as this was excluded when we had `Empty`. + FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)), + // Once there's a size for each row, we'll just add to it. + RowOffsets(offsets) => push_row_offset(offsets), + // First time a row is seen. Use `encoded_len()` as the hint. + // If we have a static layout, we'll always have a fixed size. + // Otherwise, let's start out with a potentially fixed size. + // In either case, if `encoded_len() == 0`, we have to store offsets, + // as we cannot recover the number of elements otherwise. + Empty => match row.static_bsatn_size() { + Some(0) => push_row_offset(Vec::new()), + Some(size) => FixedSizeStatic(size), + None => match encoded_len() { + 0 => push_row_offset(Vec::new()), + size => FixedSizeDyn(size), + }, + }, + }; + } + + fn finish(self) -> Self::FinishedList { + let Self { size_hint, rows_data } = self; + let size_hint = match size_hint { + RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()), + RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs), + RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) { + Ok(fs) => RowSizeHint::FixedSize(fs), + Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()), + }, + RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), + }; + let rows_data = rows_data.into(); + BsatnRowList::new(size_hint, rows_data) + } +} + +fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec { + (0..num_rows).map(|i| i * size).map(|o| o as u64).collect() +} + +impl BuildableWebsocketFormat for BsatnFormat { + type ListBuilder = BsatnRowListBuilder; + + fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate { + let qu_len_would_have_been = bsatn::to_len(&qu).unwrap(); + + match decide_compression(qu_len_would_have_been, compression) { + Compression::None => CompressableQueryUpdate::Uncompressed(qu), + Compression::Brotli => { + let bytes = bsatn::to_vec(&qu).unwrap(); + let mut out = Vec::new(); + brotli_compress(&bytes, &mut out); + CompressableQueryUpdate::Brotli(out.into()) + } + Compression::Gzip => { + let bytes = bsatn::to_vec(&qu).unwrap(); + let mut out = Vec::new(); + gzip_compress(&bytes, &mut out); + CompressableQueryUpdate::Gzip(out.into()) + } + } + } +} + +pub fn decide_compression(len: usize, compression: Compression) -> Compression { + /// The threshold beyond which we start to compress messages. + /// 1KiB was chosen without measurement. + /// TODO(perf): measure! + const COMPRESS_THRESHOLD: usize = 1024; + + if len > COMPRESS_THRESHOLD { + compression + } else { + Compression::None + } +} + +pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) { + // We are optimizing for compression speed, + // so we choose the lowest (fastest) level of compression. + // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 + // for large `SubscriptionUpdate` messages at this level. + const COMPRESSION_LEVEL: i32 = 1; + + let params = brotli::enc::BrotliEncoderParams { + quality: COMPRESSION_LEVEL, + ..<_>::default() + }; + let reader = &mut &bytes[..]; + brotli::BrotliCompress(reader, out, ¶ms).expect("should be able to BrotliCompress"); +} + +pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) { + let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast()); + encoder.write_all(bytes).unwrap(); + encoder.finish().expect("should be able to gzip compress `bytes`"); +} From c134d82b88397976ad767e23f52f42d81c8079ec Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 15 Jul 2025 19:02:28 +0200 Subject: [PATCH 4/4] move building and compression out of 'client-api-messages' --- Cargo.lock | 3 +- crates/client-api-messages/Cargo.toml | 2 - crates/client-api-messages/src/lib.rs | 1 - crates/client-api-messages/src/websocket.rs | 36 +----------- crates/core/src/client/messages.rs | 7 ++- crates/core/src/host/module_host.rs | 5 +- .../core/src/subscription/execution_unit.rs | 5 +- crates/core/src/subscription/mod.rs | 17 +++--- .../module_subscription_manager.rs | 4 +- crates/core/src/subscription/query.rs | 9 ++- crates/core/src/subscription/subscription.rs | 3 +- .../src/subscription}/websocket_building.rs | 6 +- crates/sdk/Cargo.toml | 1 + crates/sdk/src/compression.rs | 57 +++++++++++++++++++ crates/sdk/src/lib.rs | 1 + crates/sdk/src/spacetime_module.rs | 3 +- crates/sdk/src/websocket.rs | 32 ++--------- 17 files changed, 96 insertions(+), 96 deletions(-) rename crates/{client-api-messages/src => core/src/subscription}/websocket_building.rs (99%) create mode 100644 crates/sdk/src/compression.rs diff --git a/Cargo.lock b/Cargo.lock index f1ab54e1c51..4f308b84812 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5384,13 +5384,11 @@ dependencies = [ name = "spacetimedb-client-api-messages" version = "1.2.0" dependencies = [ - "brotli", "bytes", "bytestring", "chrono", "derive_more", "enum-as-inner", - "flate2", "hex", "itertools 0.12.1", "proptest", @@ -5889,6 +5887,7 @@ dependencies = [ "brotli", "bytes", "cursive", + "flate2", "futures", "futures-channel", "hex", diff --git a/crates/client-api-messages/Cargo.toml b/crates/client-api-messages/Cargo.toml index 90f767ee364..5d67ceaca7c 100644 --- a/crates/client-api-messages/Cargo.toml +++ b/crates/client-api-messages/Cargo.toml @@ -12,10 +12,8 @@ spacetimedb-sats = { workspace = true, features = ["bytestring"] } bytes.workspace = true bytestring.workspace = true -brotli.workspace = true chrono = { workspace = true, features = ["serde"] } enum-as-inner.workspace = true -flate2.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_with.workspace = true diff --git a/crates/client-api-messages/src/lib.rs b/crates/client-api-messages/src/lib.rs index c13870ad6d2..bf2f1dc9ca5 100644 --- a/crates/client-api-messages/src/lib.rs +++ b/crates/client-api-messages/src/lib.rs @@ -4,4 +4,3 @@ pub mod energy; pub mod http; pub mod name; pub mod websocket; -mod websocket_building; diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index 1f89f87b9a6..cf24e6e5e87 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -26,18 +26,12 @@ use smallvec::SmallVec; use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp}; use spacetimedb_primitives::TableId; use spacetimedb_sats::{ - bsatn, de::{Deserialize, Error}, impl_deserialize, impl_serialize, impl_st, ser::Serialize, AlgebraicType, SpacetimeType, }; -use std::{ - io::{self, Read as _}, - sync::Arc, -}; - -pub use crate::websocket_building::*; +use std::sync::Arc; pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb"; pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb"; @@ -661,22 +655,6 @@ pub enum CompressableQueryUpdate { Gzip(Bytes), } -impl CompressableQueryUpdate { - pub fn maybe_decompress(self) -> QueryUpdate { - match self { - Self::Uncompressed(qu) => qu, - Self::Brotli(bytes) => { - let bytes = brotli_decompress(&bytes).unwrap(); - bsatn::from_slice(&bytes).unwrap() - } - Self::Gzip(bytes) => { - let bytes = gzip_decompress(&bytes).unwrap(); - bsatn::from_slice(&bytes).unwrap() - } - } - } -} - #[derive(SpacetimeType, Debug, Clone)] #[sats(crate = spacetimedb_lib)] pub struct QueryUpdate { @@ -777,18 +755,6 @@ pub enum Compression { Gzip, } -pub fn brotli_decompress(bytes: &[u8]) -> Result, io::Error> { - let mut decompressed = Vec::new(); - brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?; - Ok(decompressed) -} - -pub fn gzip_decompress(bytes: &[u8]) -> Result, io::Error> { - let mut decompressed = Vec::new(); - let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?; - Ok(decompressed) -} - pub type RowSize = u16; pub type RowOffset = u64; diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index 29f548acf77..67f3b90397b 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -2,6 +2,7 @@ use super::{ClientConfig, DataMessage, Protocol}; use crate::host::module_host::{EventStatus, ModuleEvent}; use crate::host::ArgsTuple; use crate::messages::websocket as ws; +use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress}; use bytes::{BufMut, Bytes, BytesMut}; use bytestring::ByteString; use derive_more::From; @@ -148,10 +149,10 @@ pub fn serialize( }); // Conditionally compress the message. - let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) { + let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) { Compression::None => buffer.uncompressed(), - Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress), - Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress), + Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress), + Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress), }; (in_use, msg_bytes.into()) } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 5ec2829d1aa..b533ed232b9 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -16,6 +16,7 @@ use crate::sql::parser::RowLevelExpr; use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; @@ -25,9 +26,7 @@ use derive_more::From; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; -use spacetimedb_client_api_messages::websocket::{ - BuildableWebsocketFormat, ByteListLen, Compression, OneOffTable, QueryUpdate, -}; +use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index 31eb22e6f57..75eb0a0442c 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -5,11 +5,10 @@ use crate::error::DBError; use crate::estimation; use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue}; use crate::messages::websocket::TableUpdate; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::util::slow::SlowQueryLogger; use crate::vm::{build_query, TxMode}; -use spacetimedb_client_api_messages::websocket::{ - BuildableWebsocketFormat, Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, -}; +use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate}; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 2012f069a47..d940ce15b4f 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -1,21 +1,19 @@ -use std::sync::Arc; - +use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _}; +use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use anyhow::Result; use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ - BuildableWebsocketFormat, ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, - SingleQueryUpdate, TableUpdate, + ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, }; -use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; -use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; -use spacetimedb_primitives::TableId; - -use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use spacetimedb_datastore::{ db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder, }; +use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; +use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; +use spacetimedb_primitives::TableId; +use std::sync::Arc; pub mod delta; pub mod execution_unit; @@ -25,6 +23,7 @@ pub mod query; #[allow(clippy::module_inception)] // it's right this isn't ideal :/ pub mod subscription; pub mod tx; +pub mod websocket_building; #[derive(Debug)] pub struct ExecutionCounters { diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 229c601de5f..efd0f0dcc37 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -9,6 +9,7 @@ use crate::error::DBError; use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue}; use crate::messages::websocket::{self as ws, TableUpdate}; use crate::subscription::delta::eval_delta; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::worker_metrics::WORKER_METRICS; use core::mem; use hashbrown::hash_map::OccupiedError; @@ -16,8 +17,7 @@ use hashbrown::{HashMap, HashSet}; use parking_lot::RwLock; use prometheus::IntGauge; use spacetimedb_client_api_messages::websocket::{ - BsatnFormat, BuildableWebsocketFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, - SingleQueryUpdate, + BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate, }; use spacetimedb_data_structures::map::{Entry, IntMap}; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 8ff6e1629a9..71da841ec03 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -159,7 +159,7 @@ mod tests { use crate::vm::tests::create_table_with_rows; use crate::vm::DbProgram; use itertools::Itertools; - use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression}; + use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression}; use spacetimedb_datastore::execution_context::Workload; use spacetimedb_lib::bsatn; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -353,7 +353,7 @@ mod tests { total_tables: usize, rows: &[ProductValue], ) -> ResultTest<()> { - let result = s.eval::(db, tx, None, Compression::Brotli).tables; + let result = s.eval::(db, tx, None, Compression::None).tables; assert_eq!( result.len(), total_tables, @@ -363,7 +363,10 @@ mod tests { let result = result .into_iter() .flat_map(|x| x.updates) - .map(|x| x.maybe_decompress()) + .map(|x| match x { + CompressableQueryUpdate::Uncompressed(x) => x, + _ => unreachable!(), + }) .flat_map(|x| { (&x.deletes) .into_iter() diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index caf71558193..2a1489c555d 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -28,10 +28,11 @@ use crate::error::{DBError, SubscriptionError}; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdateRelValue, UpdatesRelValue}; use crate::messages::websocket as ws; use crate::sql::ast::SchemaViewer; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use spacetimedb_client_api_messages::websocket::{BuildableWebsocketFormat, Compression}; +use spacetimedb_client_api_messages::websocket::Compression; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_lib::db::auth::{StAccess, StTableType}; diff --git a/crates/client-api-messages/src/websocket_building.rs b/crates/core/src/subscription/websocket_building.rs similarity index 99% rename from crates/client-api-messages/src/websocket_building.rs rename to crates/core/src/subscription/websocket_building.rs index 45c9a266de2..e4061eb6b83 100644 --- a/crates/client-api-messages/src/websocket_building.rs +++ b/crates/core/src/subscription/websocket_building.rs @@ -1,9 +1,9 @@ -use crate::websocket::{ +use bytestring::ByteString; +use core::mem; +use spacetimedb_client_api_messages::websocket::{ BsatnFormat, BsatnRowList, CompressableQueryUpdate, Compression, JsonFormat, QueryUpdate, RowOffset, RowSize, RowSizeHint, WebsocketFormat, }; -use bytestring::ByteString; -use core::mem; use spacetimedb_sats::bsatn::{self, ToBsatn}; use spacetimedb_sats::ser::serde::SerializeWrapper; use spacetimedb_sats::Serialize; diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 9daa2a983eb..c0eaf809c61 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -19,6 +19,7 @@ anymap.workspace = true base64.workspace = true brotli.workspace = true bytes.workspace = true +flate2.workspace = true futures.workspace = true futures-channel.workspace = true home.workspace = true diff --git a/crates/sdk/src/compression.rs b/crates/sdk/src/compression.rs new file mode 100644 index 00000000000..e190cbcb709 --- /dev/null +++ b/crates/sdk/src/compression.rs @@ -0,0 +1,57 @@ +use crate::websocket::WsError; +use spacetimedb_client_api_messages::websocket::{ + BsatnFormat, CompressableQueryUpdate, QueryUpdate, SERVER_MSG_COMPRESSION_TAG_BROTLI, + SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE, +}; +use spacetimedb_sats::bsatn; +use std::borrow::Cow; +use std::io::{self, Read as _}; +use std::sync::Arc; + +fn brotli_decompress(bytes: &[u8]) -> Result, io::Error> { + let mut decompressed = Vec::new(); + brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?; + Ok(decompressed) +} + +fn gzip_decompress(bytes: &[u8]) -> Result, io::Error> { + let mut decompressed = Vec::new(); + let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?; + Ok(decompressed) +} + +pub(crate) fn maybe_decompress_cqu(cqu: CompressableQueryUpdate) -> QueryUpdate { + match cqu { + CompressableQueryUpdate::Uncompressed(qu) => qu, + CompressableQueryUpdate::Brotli(bytes) => { + let bytes = brotli_decompress(&bytes).unwrap(); + bsatn::from_slice(&bytes).unwrap() + } + CompressableQueryUpdate::Gzip(bytes) => { + let bytes = gzip_decompress(&bytes).unwrap(); + bsatn::from_slice(&bytes).unwrap() + } + } +} + +/// Decompresses a `ServerMessage` encoded in BSATN into the raw BSATN +/// for further deserialization. +pub(crate) fn decompress_server_message(raw: &[u8]) -> Result, WsError> { + let err_decompress = |scheme| { + move |source| WsError::Decompress { + scheme, + source: Arc::new(source), + } + }; + match raw { + [] => Err(WsError::EmptyMessage), + [SERVER_MSG_COMPRESSION_TAG_NONE, bytes @ ..] => Ok(Cow::Borrowed(bytes)), + [SERVER_MSG_COMPRESSION_TAG_BROTLI, bytes @ ..] => brotli_decompress(bytes) + .map(Cow::Owned) + .map_err(err_decompress("brotli")), + [SERVER_MSG_COMPRESSION_TAG_GZIP, bytes @ ..] => { + gzip_decompress(bytes).map(Cow::Owned).map_err(err_decompress("gzip")) + } + [c, ..] => Err(WsError::UnknownCompressionScheme { scheme: *c }), + } +} diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index 66459845a37..07be8ecc7fb 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -12,6 +12,7 @@ mod callbacks; mod client_cache; +mod compression; mod db_connection; mod metrics; mod spacetime_module; diff --git a/crates/sdk/src/spacetime_module.rs b/crates/sdk/src/spacetime_module.rs index cab0b8557a0..8b795d7b426 100644 --- a/crates/sdk/src/spacetime_module.rs +++ b/crates/sdk/src/spacetime_module.rs @@ -9,6 +9,7 @@ use crate::{ subscription::{OnEndedCallback, SubscriptionHandleImpl}, Event, ReducerEvent, __codegen::InternalError, + compression::maybe_decompress_cqu, }; use bytes::Bytes; use spacetimedb_client_api_messages::websocket::{self as ws, RowListLen as _}; @@ -228,7 +229,7 @@ impl TableUpdate { let mut inserts = Vec::new(); let mut deletes = Vec::new(); for update in raw_updates.updates { - let update = update.maybe_decompress(); + let update = maybe_decompress_cqu(update); Self::parse_from_row_list(&mut deletes, &update.deletes)?; Self::parse_from_row_list(&mut inserts, &update.inserts)?; } diff --git a/crates/sdk/src/websocket.rs b/crates/sdk/src/websocket.rs index 59b5c5e34f5..03d3acdda01 100644 --- a/crates/sdk/src/websocket.rs +++ b/crates/sdk/src/websocket.rs @@ -10,10 +10,7 @@ use bytes::Bytes; use futures::{SinkExt, StreamExt as _, TryStreamExt}; use futures_channel::mpsc; use http::uri::{InvalidUri, Scheme, Uri}; -use spacetimedb_client_api_messages::websocket::{ - brotli_decompress, gzip_decompress, BsatnFormat, Compression, BIN_PROTOCOL, SERVER_MSG_COMPRESSION_TAG_BROTLI, - SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE, -}; +use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression, BIN_PROTOCOL}; use spacetimedb_client_api_messages::websocket::{ClientMessage, ServerMessage}; use spacetimedb_lib::{bsatn, ConnectionId}; use thiserror::Error; @@ -27,6 +24,7 @@ use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, }; +use crate::compression::decompress_server_message; use crate::metrics::CLIENT_METRICS; #[derive(Error, Debug, Clone)] @@ -234,30 +232,8 @@ impl WsConnection { } pub(crate) fn parse_response(bytes: &[u8]) -> Result, WsError> { - let (compression, bytes) = bytes.split_first().ok_or(WsError::EmptyMessage)?; - - Ok(match *compression { - SERVER_MSG_COMPRESSION_TAG_NONE => { - bsatn::from_slice(bytes).map_err(|source| WsError::DeserializeMessage { source })? - } - SERVER_MSG_COMPRESSION_TAG_BROTLI => { - bsatn::from_slice(&brotli_decompress(bytes).map_err(|source| WsError::Decompress { - scheme: "brotli", - source: Arc::new(source), - })?) - .map_err(|source| WsError::DeserializeMessage { source })? - } - SERVER_MSG_COMPRESSION_TAG_GZIP => { - bsatn::from_slice(&gzip_decompress(bytes).map_err(|source| WsError::Decompress { - scheme: "gzip", - source: Arc::new(source), - })?) - .map_err(|source| WsError::DeserializeMessage { source })? - } - c => { - return Err(WsError::UnknownCompressionScheme { scheme: c }); - } - }) + let bytes = &*decompress_server_message(bytes)?; + bsatn::from_slice(bytes).map_err(|source| WsError::DeserializeMessage { source }) } pub(crate) fn encode_message(msg: ClientMessage) -> WebSocketMessage {