Skip to content

Commit edb1319

Browse files
committed
move building parts of websocket format to own module
1 parent abe31f8 commit edb1319

File tree

3 files changed

+237
-213
lines changed

3 files changed

+237
-213
lines changed

crates/client-api-messages/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ pub mod energy;
44
pub mod http;
55
pub mod name;
66
pub mod websocket;
7+
mod websocket_building;

crates/client-api-messages/src/websocket.rs

Lines changed: 18 additions & 213 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,29 @@ use bytes::Bytes;
1919
use bytestring::ByteString;
2020
use core::{
2121
fmt::Debug,
22-
mem,
2322
ops::{Deref, Range},
2423
};
2524
use enum_as_inner::EnumAsInner;
2625
use smallvec::SmallVec;
2726
use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
2827
use spacetimedb_primitives::TableId;
2928
use spacetimedb_sats::{
30-
bsatn::{self, ToBsatn},
29+
bsatn,
3130
de::{Deserialize, Error},
3231
impl_deserialize, impl_serialize, impl_st,
33-
ser::{serde::SerializeWrapper, Serialize},
32+
ser::Serialize,
3433
AlgebraicType, SpacetimeType,
3534
};
3635
use std::{
37-
io::{self, Read as _, Write as _},
36+
io::{self, Read as _},
3837
sync::Arc,
3938
};
4039

40+
pub use crate::websocket_building::*;
41+
4142
pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
4243
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
4344

44-
/// A list of rows being built.
45-
pub trait RowListBuilder: Default {
46-
type FinishedList;
47-
48-
/// Push a row to the list in a serialized format.
49-
fn push(&mut self, row: impl ToBsatn + Serialize);
50-
51-
/// Finish the in flight list, throwing away the capability to mutate.
52-
fn finish(self) -> Self::FinishedList;
53-
}
54-
5545
pub trait RowListLen {
5646
/// Returns the length, in number of rows, not bytes, of the row list.
5747
fn len(&self) -> usize;
@@ -103,26 +93,6 @@ pub trait WebsocketFormat: Sized {
10393
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
10494
}
10595

106-
pub trait BuildableWebsocketFormat: WebsocketFormat {
107-
/// The builder for [`Self::List`].
108-
type ListBuilder: RowListBuilder<FinishedList = Self::List>;
109-
110-
/// Encodes the `elems` to a list in the format and also returns the length of the list.
111-
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
112-
let mut num_rows = 0;
113-
let mut list = Self::ListBuilder::default();
114-
for elem in elems {
115-
num_rows += 1;
116-
list.push(elem);
117-
}
118-
(list.finish(), num_rows)
119-
}
120-
121-
/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
122-
/// This allows some formats to e.g., compress the update.
123-
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
124-
}
125-
12696
/// Messages sent from the client to the server.
12797
///
12898
/// Parametric over the reducer argument type to enable [`ClientMessage::map_args`].
@@ -785,25 +755,6 @@ impl WebsocketFormat for JsonFormat {
785755
type QueryUpdate = QueryUpdate<Self>;
786756
}
787757

788-
impl BuildableWebsocketFormat for JsonFormat {
789-
type ListBuilder = Self::List;
790-
791-
fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
792-
qu
793-
}
794-
}
795-
796-
impl RowListBuilder for Vec<ByteString> {
797-
type FinishedList = Self;
798-
fn push(&mut self, row: impl ToBsatn + Serialize) {
799-
let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into();
800-
self.push(value);
801-
}
802-
fn finish(self) -> Self::FinishedList {
803-
self
804-
}
805-
}
806-
807758
#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
808759
#[sats(crate = spacetimedb_lib)]
809760
pub struct BsatnFormat;
@@ -814,30 +765,6 @@ impl WebsocketFormat for BsatnFormat {
814765
type QueryUpdate = CompressableQueryUpdate<Self>;
815766
}
816767

817-
impl BuildableWebsocketFormat for BsatnFormat {
818-
type ListBuilder = BsatnRowListBuilder;
819-
820-
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
821-
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();
822-
823-
match decide_compression(qu_len_would_have_been, compression) {
824-
Compression::None => CompressableQueryUpdate::Uncompressed(qu),
825-
Compression::Brotli => {
826-
let bytes = bsatn::to_vec(&qu).unwrap();
827-
let mut out = Vec::new();
828-
brotli_compress(&bytes, &mut out);
829-
CompressableQueryUpdate::Brotli(out.into())
830-
}
831-
Compression::Gzip => {
832-
let bytes = bsatn::to_vec(&qu).unwrap();
833-
let mut out = Vec::new();
834-
gzip_compress(&bytes, &mut out);
835-
CompressableQueryUpdate::Gzip(out.into())
836-
}
837-
}
838-
}
839-
}
840-
841768
/// A specification of either a desired or decided compression algorithm.
842769
#[derive(serde::Deserialize, Default, PartialEq, Eq, Clone, Copy, Hash, Debug)]
843770
pub enum Compression {
@@ -850,54 +777,20 @@ pub enum Compression {
850777
Gzip,
851778
}
852779

853-
pub fn decide_compression(len: usize, compression: Compression) -> Compression {
854-
/// The threshold beyond which we start to compress messages.
855-
/// 1KiB was chosen without measurement.
856-
/// TODO(perf): measure!
857-
const COMPRESS_THRESHOLD: usize = 1024;
858-
859-
if len > COMPRESS_THRESHOLD {
860-
compression
861-
} else {
862-
Compression::None
863-
}
864-
}
865-
866-
pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) {
867-
// We are optimizing for compression speed,
868-
// so we choose the lowest (fastest) level of compression.
869-
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
870-
// for large `SubscriptionUpdate` messages at this level.
871-
const COMPRESSION_LEVEL: i32 = 1;
872-
873-
let params = brotli::enc::BrotliEncoderParams {
874-
quality: COMPRESSION_LEVEL,
875-
..<_>::default()
876-
};
877-
let reader = &mut &bytes[..];
878-
brotli::BrotliCompress(reader, out, &params).expect("should be able to BrotliCompress");
879-
}
880-
881780
pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
882781
let mut decompressed = Vec::new();
883782
brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
884783
Ok(decompressed)
885784
}
886785

887-
pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) {
888-
let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast());
889-
encoder.write_all(bytes).unwrap();
890-
encoder.finish().expect("should be able to gzip compress `bytes`");
891-
}
892-
893786
pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
894787
let mut decompressed = Vec::new();
895788
let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
896789
Ok(decompressed)
897790
}
898791

899-
type RowSize = u16;
900-
type RowOffset = u64;
792+
pub type RowSize = u16;
793+
pub type RowOffset = u64;
901794

902795
/// A packed list of BSATN-encoded rows.
903796
#[derive(SpacetimeType, Debug, Clone, Default)]
@@ -910,6 +803,17 @@ pub struct BsatnRowList {
910803
rows_data: Bytes,
911804
}
912805

806+
impl BsatnRowList {
807+
/// Returns a new row list where `rows_data` is the flattened byte array
808+
/// containing the BSATN of each row, without any markers for where a row begins and end.
809+
///
810+
/// The `size_hint` encodes the boundaries of each row in `rows_data`.
811+
/// See [`RowSizeHint`] for more details on the encoding.
812+
pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self {
813+
Self { size_hint, rows_data }
814+
}
815+
}
816+
913817
/// NOTE(centril, 1.0): We might want to add a `None` variant to this
914818
/// where the client has to decode in a loop until `rows_data` has been exhausted.
915819
/// The use-case for this is clients who are bandwidth limited and where every byte counts.
@@ -1004,102 +908,3 @@ impl Iterator for BsatnRowListIter<'_> {
1004908
self.list.get(index)
1005909
}
1006910
}
1007-
1008-
/// A [`BsatnRowList`] that can be added to.
1009-
#[derive(Default)]
1010-
pub struct BsatnRowListBuilder {
1011-
/// A size hint about `rows_data`
1012-
/// intended to facilitate parallel decode purposes on large initial updates.
1013-
size_hint: RowSizeHintBuilder,
1014-
/// The flattened byte array for a list of rows.
1015-
rows_data: Vec<u8>,
1016-
}
1017-
1018-
/// A [`RowSizeHint`] under construction.
1019-
pub enum RowSizeHintBuilder {
1020-
/// We haven't seen any rows yet.
1021-
Empty,
1022-
/// Each row in `rows_data` is of the same fixed size as specified here
1023-
/// but we don't know whether the size fits in `RowSize`
1024-
/// and we don't know whether future rows will also have this size.
1025-
FixedSizeDyn(usize),
1026-
/// Each row in `rows_data` is of the same fixed size as specified here
1027-
/// and we know that this will be the case for future rows as well.
1028-
FixedSizeStatic(RowSize),
1029-
/// The offsets into `rows_data` defining the boundaries of each row.
1030-
/// Only stores the offset to the start of each row.
1031-
/// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
1032-
/// The behavior of this is identical to that of `PackedStr`.
1033-
RowOffsets(Vec<RowOffset>),
1034-
}
1035-
1036-
impl Default for RowSizeHintBuilder {
1037-
fn default() -> Self {
1038-
Self::Empty
1039-
}
1040-
}
1041-
1042-
impl RowListBuilder for BsatnRowListBuilder {
1043-
type FinishedList = BsatnRowList;
1044-
1045-
fn push(&mut self, row: impl ToBsatn + Serialize) {
1046-
use RowSizeHintBuilder::*;
1047-
1048-
// Record the length before. It will be the starting offset of `row`.
1049-
let len_before = self.rows_data.len();
1050-
// BSATN-encode the row directly to the buffer.
1051-
row.to_bsatn_extend(&mut self.rows_data).unwrap();
1052-
1053-
let encoded_len = || self.rows_data.len() - len_before;
1054-
let push_row_offset = |mut offsets: Vec<_>| {
1055-
offsets.push(len_before as u64);
1056-
RowOffsets(offsets)
1057-
};
1058-
1059-
let hint = mem::replace(&mut self.size_hint, Empty);
1060-
self.size_hint = match hint {
1061-
// Static size that is unchanging.
1062-
h @ FixedSizeStatic(_) => h,
1063-
// Dynamic size that is unchanging.
1064-
h @ FixedSizeDyn(size) if size == encoded_len() => h,
1065-
// Size mismatch for the dynamic fixed size.
1066-
// Now we must construct `RowOffsets` for all rows thus far.
1067-
// We know that `size != 0` here, as this was excluded when we had `Empty`.
1068-
FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)),
1069-
// Once there's a size for each row, we'll just add to it.
1070-
RowOffsets(offsets) => push_row_offset(offsets),
1071-
// First time a row is seen. Use `encoded_len()` as the hint.
1072-
// If we have a static layout, we'll always have a fixed size.
1073-
// Otherwise, let's start out with a potentially fixed size.
1074-
// In either case, if `encoded_len() == 0`, we have to store offsets,
1075-
// as we cannot recover the number of elements otherwise.
1076-
Empty => match row.static_bsatn_size() {
1077-
Some(0) => push_row_offset(Vec::new()),
1078-
Some(size) => FixedSizeStatic(size),
1079-
None => match encoded_len() {
1080-
0 => push_row_offset(Vec::new()),
1081-
size => FixedSizeDyn(size),
1082-
},
1083-
},
1084-
};
1085-
}
1086-
1087-
fn finish(self) -> Self::FinishedList {
1088-
let Self { size_hint, rows_data } = self;
1089-
let size_hint = match size_hint {
1090-
RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()),
1091-
RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs),
1092-
RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) {
1093-
Ok(fs) => RowSizeHint::FixedSize(fs),
1094-
Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()),
1095-
},
1096-
RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
1097-
};
1098-
let rows_data = rows_data.into();
1099-
BsatnRowList { size_hint, rows_data }
1100-
}
1101-
}
1102-
1103-
fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec<u64> {
1104-
(0..num_rows).map(|i| i * size).map(|o| o as u64).collect()
1105-
}

0 commit comments

Comments
 (0)