diff --git a/Cargo.lock b/Cargo.lock index f1ab54e1c51..4f308b84812 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5384,13 +5384,11 @@ dependencies = [ name = "spacetimedb-client-api-messages" version = "1.2.0" dependencies = [ - "brotli", "bytes", "bytestring", "chrono", "derive_more", "enum-as-inner", - "flate2", "hex", "itertools 0.12.1", "proptest", @@ -5889,6 +5887,7 @@ dependencies = [ "brotli", "bytes", "cursive", + "flate2", "futures", "futures-channel", "hex", diff --git a/crates/client-api-messages/Cargo.toml b/crates/client-api-messages/Cargo.toml index 90f767ee364..5d67ceaca7c 100644 --- a/crates/client-api-messages/Cargo.toml +++ b/crates/client-api-messages/Cargo.toml @@ -12,10 +12,8 @@ spacetimedb-sats = { workspace = true, features = ["bytestring"] } bytes.workspace = true bytestring.workspace = true -brotli.workspace = true chrono = { workspace = true, features = ["serde"] } enum-as-inner.workspace = true -flate2.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_with.workspace = true diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index b2a92e71bb0..cf24e6e5e87 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -26,22 +26,18 @@ use smallvec::SmallVec; use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp}; use spacetimedb_primitives::TableId; use spacetimedb_sats::{ - bsatn::{self, ToBsatn}, de::{Deserialize, Error}, impl_deserialize, impl_serialize, impl_st, - ser::{serde::SerializeWrapper, Serialize}, + ser::Serialize, AlgebraicType, SpacetimeType, }; -use std::{ - io::{self, Read as _, Write as _}, - sync::Arc, -}; +use std::sync::Arc; pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb"; pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb"; pub trait RowListLen { - /// Returns the length of the list. + /// Returns the length, in number of rows, not bytes, of the row list. fn len(&self) -> usize; /// Returns whether the list is empty or not. fn is_empty(&self) -> bool { @@ -86,16 +82,9 @@ pub trait WebsocketFormat: Sized { + Clone + Default; - /// Encodes the `elems` to a list in the format and also returns the length of the list. - fn encode_list(elems: impl Iterator) -> (Self::List, u64); - /// The type used to encode query updates. /// This type exists so that some formats, e.g., BSATN, can compress an update. type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send; - - /// Convert a `QueryUpdate` into `Self::QueryUpdate`. - /// This allows some formats to e.g., compress the update. - fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate; } /// Messages sent from the client to the server. @@ -666,22 +655,6 @@ pub enum CompressableQueryUpdate { Gzip(Bytes), } -impl CompressableQueryUpdate { - pub fn maybe_decompress(self) -> QueryUpdate { - match self { - Self::Uncompressed(qu) => qu, - Self::Brotli(bytes) => { - let bytes = brotli_decompress(&bytes).unwrap(); - bsatn::from_slice(&bytes).unwrap() - } - Self::Gzip(bytes) => { - let bytes = gzip_decompress(&bytes).unwrap(); - bsatn::from_slice(&bytes).unwrap() - } - } - } -} - #[derive(SpacetimeType, Debug, Clone)] #[sats(crate = spacetimedb_lib)] pub struct QueryUpdate { @@ -756,23 +729,8 @@ pub struct JsonFormat; impl WebsocketFormat for JsonFormat { type Single = ByteString; - type List = Vec; - - fn encode_list(elems: impl Iterator) -> (Self::List, u64) { - let mut count = 0; - let list = elems - .map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into()) - .inspect(|_| count += 1) - .collect(); - (list, count) - } - type QueryUpdate = QueryUpdate; - - fn into_query_update(qu: QueryUpdate, _: Compression) -> Self::QueryUpdate { - qu - } } #[derive(Clone, Copy, Default, Debug, SpacetimeType)] @@ -781,57 +739,8 @@ pub struct BsatnFormat; impl WebsocketFormat for BsatnFormat { type Single = Box<[u8]>; - type List = BsatnRowList; - - fn encode_list(mut elems: impl Iterator) -> (Self::List, u64) { - // For an empty list, the size of a row is unknown, so use `RowOffsets`. - let Some(first) = elems.next() else { - return (BsatnRowList::row_offsets(), 0); - }; - // We have at least one row. Determine the static size from that, if available. - let (mut list, mut scratch) = match first.static_bsatn_size() { - Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)), - None => (BsatnRowListBuilder::row_offsets(), Vec::new()), - }; - // Add the first element and then the rest. - // We assume that the schema of rows yielded by `elems` stays the same, - // so once the size is fixed, it will stay that way. - let mut count = 0; - let mut push = |elem: R| { - elem.to_bsatn_extend(&mut scratch).unwrap(); - list.push(&scratch); - scratch.clear(); - count += 1; - }; - push(first); - for elem in elems { - push(elem); - } - (list.finish(), count) - } - type QueryUpdate = CompressableQueryUpdate; - - fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate { - let qu_len_would_have_been = bsatn::to_len(&qu).unwrap(); - - match decide_compression(qu_len_would_have_been, compression) { - Compression::None => CompressableQueryUpdate::Uncompressed(qu), - Compression::Brotli => { - let bytes = bsatn::to_vec(&qu).unwrap(); - let mut out = Vec::new(); - brotli_compress(&bytes, &mut out); - CompressableQueryUpdate::Brotli(out.into()) - } - Compression::Gzip => { - let bytes = bsatn::to_vec(&qu).unwrap(); - let mut out = Vec::new(); - gzip_compress(&bytes, &mut out); - CompressableQueryUpdate::Gzip(out.into()) - } - } - } } /// A specification of either a desired or decided compression algorithm. @@ -846,69 +755,28 @@ pub enum Compression { Gzip, } -pub fn decide_compression(len: usize, compression: Compression) -> Compression { - /// The threshold beyond which we start to compress messages. - /// 1KiB was chosen without measurement. - /// TODO(perf): measure! - const COMPRESS_THRESHOLD: usize = 1024; - - if len > COMPRESS_THRESHOLD { - compression - } else { - Compression::None - } -} - -pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) { - // We are optimizing for compression speed, - // so we choose the lowest (fastest) level of compression. - // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 - // for large `SubscriptionUpdate` messages at this level. - const COMPRESSION_LEVEL: i32 = 1; - - let params = brotli::enc::BrotliEncoderParams { - quality: COMPRESSION_LEVEL, - ..<_>::default() - }; - let reader = &mut &bytes[..]; - brotli::BrotliCompress(reader, out, ¶ms).expect("should be able to BrotliCompress"); -} - -pub fn brotli_decompress(bytes: &[u8]) -> Result, io::Error> { - let mut decompressed = Vec::new(); - brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?; - Ok(decompressed) -} - -pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) { - let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast()); - encoder.write_all(bytes).unwrap(); - encoder.finish().expect("should be able to gzip compress `bytes`"); -} - -pub fn gzip_decompress(bytes: &[u8]) -> Result, io::Error> { - let mut decompressed = Vec::new(); - let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?; - Ok(decompressed) -} - -type RowSize = u16; -type RowOffset = u64; +pub type RowSize = u16; +pub type RowOffset = u64; /// A packed list of BSATN-encoded rows. -#[derive(SpacetimeType, Debug, Clone)] +#[derive(SpacetimeType, Debug, Clone, Default)] #[sats(crate = spacetimedb_lib)] -pub struct BsatnRowList> { +pub struct BsatnRowList { /// A size hint about `rows_data` /// intended to facilitate parallel decode purposes on large initial updates. - size_hint: RowSizeHint, + size_hint: RowSizeHint, /// The flattened byte array for a list of rows. - rows_data: B, + rows_data: Bytes, } -impl Default for BsatnRowList { - fn default() -> Self { - Self::row_offsets() +impl BsatnRowList { + /// Returns a new row list where `rows_data` is the flattened byte array + /// containing the BSATN of each row, without any markers for where a row begins and end. + /// + /// The `size_hint` encodes the boundaries of each row in `rows_data`. + /// See [`RowSizeHint`] for more details on the encoding. + pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self { + Self { size_hint, rows_data } } } @@ -917,17 +785,23 @@ impl Default for BsatnRowList { /// The use-case for this is clients who are bandwidth limited and where every byte counts. #[derive(SpacetimeType, Debug, Clone)] #[sats(crate = spacetimedb_lib)] -pub enum RowSizeHint { +pub enum RowSizeHint { /// Each row in `rows_data` is of the same fixed size as specified here. FixedSize(RowSize), /// The offsets into `rows_data` defining the boundaries of each row. /// Only stores the offset to the start of each row. /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. /// The behavior of this is identical to that of `PackedStr`. - RowOffsets(I), + RowOffsets(Arc<[RowOffset]>), +} + +impl Default for RowSizeHint { + fn default() -> Self { + Self::RowOffsets([].into()) + } } -impl> RowSizeHint { +impl RowSizeHint { fn index_to_range(&self, index: usize, data_end: usize) -> Option> { match self { Self::FixedSize(size) => { @@ -952,37 +826,17 @@ impl> RowSizeHint { } } -impl BsatnRowList { - pub fn fixed(row_size: RowSize) -> Self { - Self { - size_hint: RowSizeHint::FixedSize(row_size), - rows_data: <_>::default(), - } - } - - /// Returns a new empty list using indices - pub fn row_offsets() -> Self - where - I: From<[RowOffset; 0]>, - { - Self { - size_hint: RowSizeHint::RowOffsets([].into()), - rows_data: <_>::default(), - } - } -} - -impl, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList { - /// Returns the length of the row list. +impl RowListLen for BsatnRowList { fn len(&self) -> usize { match &self.size_hint { + // `size != 0` is always the case for `FixedSize`. RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize, RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(), } } } -impl, I> ByteListLen for BsatnRowList { +impl ByteListLen for BsatnRowList { /// Returns the uncompressed size of the list in bytes fn num_bytes(&self) -> usize { self.rows_data.as_ref().len() @@ -1020,28 +874,3 @@ impl Iterator for BsatnRowListIter<'_> { self.list.get(index) } } - -/// A [`BsatnRowList`] that can be added to. -pub type BsatnRowListBuilder = BsatnRowList, Vec>; - -impl BsatnRowListBuilder { - /// Adds `row`, BSATN-encoded to this list. - #[inline] - pub fn push(&mut self, row: &[u8]) { - if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint { - offsets.push(self.rows_data.len() as u64); - } - self.rows_data.extend_from_slice(row); - } - - /// Finish the in flight list, throwing away the capability to mutate. - pub fn finish(self) -> BsatnRowList { - let Self { size_hint, rows_data } = self; - let rows_data = rows_data.into(); - let size_hint = match size_hint { - RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs), - RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), - }; - BsatnRowList { size_hint, rows_data } - } -} diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index 29f548acf77..67f3b90397b 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -2,6 +2,7 @@ use super::{ClientConfig, DataMessage, Protocol}; use crate::host::module_host::{EventStatus, ModuleEvent}; use crate::host::ArgsTuple; use crate::messages::websocket as ws; +use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress}; use bytes::{BufMut, Bytes, BytesMut}; use bytestring::ByteString; use derive_more::From; @@ -148,10 +149,10 @@ pub fn serialize( }); // Conditionally compress the message. - let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) { + let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) { Compression::None => buffer.uncompressed(), - Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress), - Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress), + Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress), + Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress), }; (in_use, msg_bytes.into()) } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 6925b0f2b7f..b533ed232b9 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -16,6 +16,7 @@ use crate::sql::parser::RowLevelExpr; use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; @@ -25,7 +26,7 @@ use derive_more::From; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; -use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; @@ -134,7 +135,7 @@ impl UpdatesRelValue<'_> { !(self.deletes.is_empty() && self.inserts.is_empty()) } - pub fn encode(&self) -> (F::QueryUpdate, u64, usize) { + pub fn encode(&self) -> (F::QueryUpdate, u64, usize) { let (deletes, nr_del) = F::encode_list(self.deletes.iter()); let (inserts, nr_ins) = F::encode_list(self.inserts.iter()); let num_rows = nr_del + nr_ins; @@ -1096,7 +1097,7 @@ impl ModuleHost { /// This only returns an error if there is a db-level problem. /// An error with the query itself will be sent to the client. #[tracing::instrument(level = "trace", skip_all)] - pub async fn one_off_query( + pub async fn one_off_query( &self, caller_identity: Identity, query: String, diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index 66f43572a32..75eb0a0442c 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -5,11 +5,10 @@ use crate::error::DBError; use crate::estimation; use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue}; use crate::messages::websocket::TableUpdate; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::util::slow::SlowQueryLogger; use crate::vm::{build_query, TxMode}; -use spacetimedb_client_api_messages::websocket::{ - Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat, -}; +use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate}; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; @@ -236,7 +235,7 @@ impl ExecutionUnit { /// Evaluate this execution unit against the database using the specified format. #[tracing::instrument(level = "trace", skip_all)] - pub fn eval( + pub fn eval( &self, db: &RelationalDB, tx: &Tx, diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 7e77bfb7afc..d940ce15b4f 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -1,20 +1,19 @@ -use std::sync::Arc; - +use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _}; +use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use anyhow::Result; use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ - ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat, + ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, }; -use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; -use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; -use spacetimedb_primitives::TableId; - -use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use spacetimedb_datastore::{ db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder, }; +use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; +use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; +use spacetimedb_primitives::TableId; +use std::sync::Arc; pub mod delta; pub mod execution_unit; @@ -24,6 +23,7 @@ pub mod query; #[allow(clippy::module_inception)] // it's right this isn't ideal :/ pub mod subscription; pub mod tx; +pub mod websocket_building; #[derive(Debug)] pub struct ExecutionCounters { @@ -95,22 +95,24 @@ impl MetricsRecorder for ExecutionCounters { pub fn execute_plan(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> where Tx: Datastore + DeltaStore, - F: WebsocketFormat, + F: BuildableWebsocketFormat, { - let mut rows = vec![]; + let mut count = 0; + let mut list = F::ListBuilder::default(); let mut metrics = ExecutionMetrics::default(); for fragment in plan_fragments { fragment.execute(tx, &mut metrics, &mut |row| { - rows.push(row); + count += 1; + list.push(row); Ok(()) })?; } - let (list, n) = F::encode_list(rows.into_iter()); + let list = list.finish(); metrics.bytes_scanned += list.num_bytes(); metrics.bytes_sent_to_clients += list.num_bytes(); - Ok((list, n, metrics)) + Ok((list, count, metrics)) } /// When collecting a table update are we inserting or deleting rows? @@ -131,7 +133,7 @@ pub fn collect_table_update( ) -> Result<(TableUpdate, ExecutionMetrics)> where Tx: Datastore + DeltaStore, - F: WebsocketFormat, + F: BuildableWebsocketFormat, { execute_plan::(plan_fragments, tx).map(|(rows, num_rows, metrics)| { let empty = F::List::default(); @@ -164,7 +166,7 @@ pub fn execute_plans( ) -> Result<(DatabaseUpdate, ExecutionMetrics), DBError> where Tx: Datastore + DeltaStore + Sync, - F: WebsocketFormat, + F: BuildableWebsocketFormat, { plans .par_iter() diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index e9b8dee0d6c..efd0f0dcc37 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -9,6 +9,7 @@ use crate::error::DBError; use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue}; use crate::messages::websocket::{self as ws, TableUpdate}; use crate::subscription::delta::eval_delta; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::worker_metrics::WORKER_METRICS; use core::mem; use hashbrown::hash_map::OccupiedError; @@ -17,7 +18,6 @@ use parking_lot::RwLock; use prometheus::IntGauge; use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate, - WebsocketFormat, }; use spacetimedb_data_structures::map::{Entry, IntMap}; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; @@ -1187,13 +1187,15 @@ impl SubscriptionManager { let mut ops_bin_uncompressed: Option<(CompressableQueryUpdate, _, _)> = None; let mut ops_json: Option<(QueryUpdate, _, _)> = None; - fn memo_encode( + fn memo_encode( updates: &UpdatesRelValue<'_>, memory: &mut Option<(F::QueryUpdate, u64, usize)>, metrics: &mut ExecutionMetrics, ) -> SingleQueryUpdate { let (update, num_rows, num_bytes) = memory .get_or_insert_with(|| { + // TODO(centril): consider pushing the encoding of each row into + // `eval_delta` instead, to avoid building the temporary `Vec`s in `UpdatesRelValue`. let encoded = updates.encode::(); // The first time we insert into this map, we call encode. // This is when we serialize the rows to BSATN/JSON. diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 8ff6e1629a9..71da841ec03 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -159,7 +159,7 @@ mod tests { use crate::vm::tests::create_table_with_rows; use crate::vm::DbProgram; use itertools::Itertools; - use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression}; + use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression}; use spacetimedb_datastore::execution_context::Workload; use spacetimedb_lib::bsatn; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -353,7 +353,7 @@ mod tests { total_tables: usize, rows: &[ProductValue], ) -> ResultTest<()> { - let result = s.eval::(db, tx, None, Compression::Brotli).tables; + let result = s.eval::(db, tx, None, Compression::None).tables; assert_eq!( result.len(), total_tables, @@ -363,7 +363,10 @@ mod tests { let result = result .into_iter() .flat_map(|x| x.updates) - .map(|x| x.maybe_decompress()) + .map(|x| match x { + CompressableQueryUpdate::Uncompressed(x) => x, + _ => unreachable!(), + }) .flat_map(|x| { (&x.deletes) .into_iter() diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 28a1fe66c97..2a1489c555d 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -28,10 +28,11 @@ use crate::error::{DBError, SubscriptionError}; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdateRelValue, UpdatesRelValue}; use crate::messages::websocket as ws; use crate::sql::ast::SchemaViewer; +use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::Compression; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -512,7 +513,7 @@ pub struct ExecutionSet { } impl ExecutionSet { - pub fn eval( + pub fn eval( &self, db: &RelationalDB, tx: &Tx, diff --git a/crates/core/src/subscription/websocket_building.rs b/crates/core/src/subscription/websocket_building.rs new file mode 100644 index 00000000000..e4061eb6b83 --- /dev/null +++ b/crates/core/src/subscription/websocket_building.rs @@ -0,0 +1,218 @@ +use bytestring::ByteString; +use core::mem; +use spacetimedb_client_api_messages::websocket::{ + BsatnFormat, BsatnRowList, CompressableQueryUpdate, Compression, JsonFormat, QueryUpdate, RowOffset, RowSize, + RowSizeHint, WebsocketFormat, +}; +use spacetimedb_sats::bsatn::{self, ToBsatn}; +use spacetimedb_sats::ser::serde::SerializeWrapper; +use spacetimedb_sats::Serialize; +use std::io; +use std::io::Write as _; + +/// A list of rows being built. +pub trait RowListBuilder: Default { + type FinishedList; + + /// Push a row to the list in a serialized format. + fn push(&mut self, row: impl ToBsatn + Serialize); + + /// Finish the in flight list, throwing away the capability to mutate. + fn finish(self) -> Self::FinishedList; +} + +pub trait BuildableWebsocketFormat: WebsocketFormat { + /// The builder for [`Self::List`]. + type ListBuilder: RowListBuilder; + + /// Encodes the `elems` to a list in the format and also returns the length of the list. + fn encode_list(elems: impl Iterator) -> (Self::List, u64) { + let mut num_rows = 0; + let mut list = Self::ListBuilder::default(); + for elem in elems { + num_rows += 1; + list.push(elem); + } + (list.finish(), num_rows) + } + + /// Convert a `QueryUpdate` into `Self::QueryUpdate`. + /// This allows some formats to e.g., compress the update. + fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate; +} + +impl BuildableWebsocketFormat for JsonFormat { + type ListBuilder = Self::List; + + fn into_query_update(qu: QueryUpdate, _: Compression) -> Self::QueryUpdate { + qu + } +} + +impl RowListBuilder for Vec { + type FinishedList = Self; + fn push(&mut self, row: impl ToBsatn + Serialize) { + let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into(); + self.push(value); + } + fn finish(self) -> Self::FinishedList { + self + } +} + +/// A [`BsatnRowList`] that can be added to. +#[derive(Default)] +pub struct BsatnRowListBuilder { + /// A size hint about `rows_data` + /// intended to facilitate parallel decode purposes on large initial updates. + size_hint: RowSizeHintBuilder, + /// The flattened byte array for a list of rows. + rows_data: Vec, +} + +/// A [`RowSizeHint`] under construction. +pub enum RowSizeHintBuilder { + /// We haven't seen any rows yet. + Empty, + /// Each row in `rows_data` is of the same fixed size as specified here + /// but we don't know whether the size fits in `RowSize` + /// and we don't know whether future rows will also have this size. + FixedSizeDyn(usize), + /// Each row in `rows_data` is of the same fixed size as specified here + /// and we know that this will be the case for future rows as well. + FixedSizeStatic(RowSize), + /// The offsets into `rows_data` defining the boundaries of each row. + /// Only stores the offset to the start of each row. + /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`. + /// The behavior of this is identical to that of `PackedStr`. + RowOffsets(Vec), +} + +impl Default for RowSizeHintBuilder { + fn default() -> Self { + Self::Empty + } +} + +impl RowListBuilder for BsatnRowListBuilder { + type FinishedList = BsatnRowList; + + fn push(&mut self, row: impl ToBsatn + Serialize) { + use RowSizeHintBuilder::*; + + // Record the length before. It will be the starting offset of `row`. + let len_before = self.rows_data.len(); + // BSATN-encode the row directly to the buffer. + row.to_bsatn_extend(&mut self.rows_data).unwrap(); + + let encoded_len = || self.rows_data.len() - len_before; + let push_row_offset = |mut offsets: Vec<_>| { + offsets.push(len_before as u64); + RowOffsets(offsets) + }; + + let hint = mem::replace(&mut self.size_hint, Empty); + self.size_hint = match hint { + // Static size that is unchanging. + h @ FixedSizeStatic(_) => h, + // Dynamic size that is unchanging. + h @ FixedSizeDyn(size) if size == encoded_len() => h, + // Size mismatch for the dynamic fixed size. + // Now we must construct `RowOffsets` for all rows thus far. + // We know that `size != 0` here, as this was excluded when we had `Empty`. + FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)), + // Once there's a size for each row, we'll just add to it. + RowOffsets(offsets) => push_row_offset(offsets), + // First time a row is seen. Use `encoded_len()` as the hint. + // If we have a static layout, we'll always have a fixed size. + // Otherwise, let's start out with a potentially fixed size. + // In either case, if `encoded_len() == 0`, we have to store offsets, + // as we cannot recover the number of elements otherwise. + Empty => match row.static_bsatn_size() { + Some(0) => push_row_offset(Vec::new()), + Some(size) => FixedSizeStatic(size), + None => match encoded_len() { + 0 => push_row_offset(Vec::new()), + size => FixedSizeDyn(size), + }, + }, + }; + } + + fn finish(self) -> Self::FinishedList { + let Self { size_hint, rows_data } = self; + let size_hint = match size_hint { + RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()), + RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs), + RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) { + Ok(fs) => RowSizeHint::FixedSize(fs), + Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()), + }, + RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()), + }; + let rows_data = rows_data.into(); + BsatnRowList::new(size_hint, rows_data) + } +} + +fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec { + (0..num_rows).map(|i| i * size).map(|o| o as u64).collect() +} + +impl BuildableWebsocketFormat for BsatnFormat { + type ListBuilder = BsatnRowListBuilder; + + fn into_query_update(qu: QueryUpdate, compression: Compression) -> Self::QueryUpdate { + let qu_len_would_have_been = bsatn::to_len(&qu).unwrap(); + + match decide_compression(qu_len_would_have_been, compression) { + Compression::None => CompressableQueryUpdate::Uncompressed(qu), + Compression::Brotli => { + let bytes = bsatn::to_vec(&qu).unwrap(); + let mut out = Vec::new(); + brotli_compress(&bytes, &mut out); + CompressableQueryUpdate::Brotli(out.into()) + } + Compression::Gzip => { + let bytes = bsatn::to_vec(&qu).unwrap(); + let mut out = Vec::new(); + gzip_compress(&bytes, &mut out); + CompressableQueryUpdate::Gzip(out.into()) + } + } + } +} + +pub fn decide_compression(len: usize, compression: Compression) -> Compression { + /// The threshold beyond which we start to compress messages. + /// 1KiB was chosen without measurement. + /// TODO(perf): measure! + const COMPRESS_THRESHOLD: usize = 1024; + + if len > COMPRESS_THRESHOLD { + compression + } else { + Compression::None + } +} + +pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) { + // We are optimizing for compression speed, + // so we choose the lowest (fastest) level of compression. + // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 + // for large `SubscriptionUpdate` messages at this level. + const COMPRESSION_LEVEL: i32 = 1; + + let params = brotli::enc::BrotliEncoderParams { + quality: COMPRESSION_LEVEL, + ..<_>::default() + }; + let reader = &mut &bytes[..]; + brotli::BrotliCompress(reader, out, ¶ms).expect("should be able to BrotliCompress"); +} + +pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) { + let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast()); + encoder.write_all(bytes).unwrap(); + encoder.finish().expect("should be able to gzip compress `bytes`"); +} diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 9daa2a983eb..c0eaf809c61 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -19,6 +19,7 @@ anymap.workspace = true base64.workspace = true brotli.workspace = true bytes.workspace = true +flate2.workspace = true futures.workspace = true futures-channel.workspace = true home.workspace = true diff --git a/crates/sdk/src/compression.rs b/crates/sdk/src/compression.rs new file mode 100644 index 00000000000..e190cbcb709 --- /dev/null +++ b/crates/sdk/src/compression.rs @@ -0,0 +1,57 @@ +use crate::websocket::WsError; +use spacetimedb_client_api_messages::websocket::{ + BsatnFormat, CompressableQueryUpdate, QueryUpdate, SERVER_MSG_COMPRESSION_TAG_BROTLI, + SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE, +}; +use spacetimedb_sats::bsatn; +use std::borrow::Cow; +use std::io::{self, Read as _}; +use std::sync::Arc; + +fn brotli_decompress(bytes: &[u8]) -> Result, io::Error> { + let mut decompressed = Vec::new(); + brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?; + Ok(decompressed) +} + +fn gzip_decompress(bytes: &[u8]) -> Result, io::Error> { + let mut decompressed = Vec::new(); + let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?; + Ok(decompressed) +} + +pub(crate) fn maybe_decompress_cqu(cqu: CompressableQueryUpdate) -> QueryUpdate { + match cqu { + CompressableQueryUpdate::Uncompressed(qu) => qu, + CompressableQueryUpdate::Brotli(bytes) => { + let bytes = brotli_decompress(&bytes).unwrap(); + bsatn::from_slice(&bytes).unwrap() + } + CompressableQueryUpdate::Gzip(bytes) => { + let bytes = gzip_decompress(&bytes).unwrap(); + bsatn::from_slice(&bytes).unwrap() + } + } +} + +/// Decompresses a `ServerMessage` encoded in BSATN into the raw BSATN +/// for further deserialization. +pub(crate) fn decompress_server_message(raw: &[u8]) -> Result, WsError> { + let err_decompress = |scheme| { + move |source| WsError::Decompress { + scheme, + source: Arc::new(source), + } + }; + match raw { + [] => Err(WsError::EmptyMessage), + [SERVER_MSG_COMPRESSION_TAG_NONE, bytes @ ..] => Ok(Cow::Borrowed(bytes)), + [SERVER_MSG_COMPRESSION_TAG_BROTLI, bytes @ ..] => brotli_decompress(bytes) + .map(Cow::Owned) + .map_err(err_decompress("brotli")), + [SERVER_MSG_COMPRESSION_TAG_GZIP, bytes @ ..] => { + gzip_decompress(bytes).map(Cow::Owned).map_err(err_decompress("gzip")) + } + [c, ..] => Err(WsError::UnknownCompressionScheme { scheme: *c }), + } +} diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index 66459845a37..07be8ecc7fb 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -12,6 +12,7 @@ mod callbacks; mod client_cache; +mod compression; mod db_connection; mod metrics; mod spacetime_module; diff --git a/crates/sdk/src/spacetime_module.rs b/crates/sdk/src/spacetime_module.rs index cab0b8557a0..8b795d7b426 100644 --- a/crates/sdk/src/spacetime_module.rs +++ b/crates/sdk/src/spacetime_module.rs @@ -9,6 +9,7 @@ use crate::{ subscription::{OnEndedCallback, SubscriptionHandleImpl}, Event, ReducerEvent, __codegen::InternalError, + compression::maybe_decompress_cqu, }; use bytes::Bytes; use spacetimedb_client_api_messages::websocket::{self as ws, RowListLen as _}; @@ -228,7 +229,7 @@ impl TableUpdate { let mut inserts = Vec::new(); let mut deletes = Vec::new(); for update in raw_updates.updates { - let update = update.maybe_decompress(); + let update = maybe_decompress_cqu(update); Self::parse_from_row_list(&mut deletes, &update.deletes)?; Self::parse_from_row_list(&mut inserts, &update.inserts)?; } diff --git a/crates/sdk/src/websocket.rs b/crates/sdk/src/websocket.rs index 59b5c5e34f5..03d3acdda01 100644 --- a/crates/sdk/src/websocket.rs +++ b/crates/sdk/src/websocket.rs @@ -10,10 +10,7 @@ use bytes::Bytes; use futures::{SinkExt, StreamExt as _, TryStreamExt}; use futures_channel::mpsc; use http::uri::{InvalidUri, Scheme, Uri}; -use spacetimedb_client_api_messages::websocket::{ - brotli_decompress, gzip_decompress, BsatnFormat, Compression, BIN_PROTOCOL, SERVER_MSG_COMPRESSION_TAG_BROTLI, - SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE, -}; +use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression, BIN_PROTOCOL}; use spacetimedb_client_api_messages::websocket::{ClientMessage, ServerMessage}; use spacetimedb_lib::{bsatn, ConnectionId}; use thiserror::Error; @@ -27,6 +24,7 @@ use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, }; +use crate::compression::decompress_server_message; use crate::metrics::CLIENT_METRICS; #[derive(Error, Debug, Clone)] @@ -234,30 +232,8 @@ impl WsConnection { } pub(crate) fn parse_response(bytes: &[u8]) -> Result, WsError> { - let (compression, bytes) = bytes.split_first().ok_or(WsError::EmptyMessage)?; - - Ok(match *compression { - SERVER_MSG_COMPRESSION_TAG_NONE => { - bsatn::from_slice(bytes).map_err(|source| WsError::DeserializeMessage { source })? - } - SERVER_MSG_COMPRESSION_TAG_BROTLI => { - bsatn::from_slice(&brotli_decompress(bytes).map_err(|source| WsError::Decompress { - scheme: "brotli", - source: Arc::new(source), - })?) - .map_err(|source| WsError::DeserializeMessage { source })? - } - SERVER_MSG_COMPRESSION_TAG_GZIP => { - bsatn::from_slice(&gzip_decompress(bytes).map_err(|source| WsError::Decompress { - scheme: "gzip", - source: Arc::new(source), - })?) - .map_err(|source| WsError::DeserializeMessage { source })? - } - c => { - return Err(WsError::UnknownCompressionScheme { scheme: c }); - } - }) + let bytes = &*decompress_server_message(bytes)?; + bsatn::from_slice(bytes).map_err(|source| WsError::DeserializeMessage { source }) } pub(crate) fn encode_message(msg: ClientMessage) -> WebSocketMessage {