Skip to content

execute_plan: don't build temporary vec of rows #2918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/client-api-messages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ spacetimedb-sats = { workspace = true, features = ["bytestring"] }

bytes.workspace = true
bytestring.workspace = true
brotli.workspace = true
chrono = { workspace = true, features = ["serde"] }
enum-as-inner.workspace = true
flate2.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde_with.workspace = true
Expand Down
229 changes: 29 additions & 200 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,18 @@ use smallvec::SmallVec;
use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::{
bsatn::{self, ToBsatn},
de::{Deserialize, Error},
impl_deserialize, impl_serialize, impl_st,
ser::{serde::SerializeWrapper, Serialize},
ser::Serialize,
AlgebraicType, SpacetimeType,
};
use std::{
io::{self, Read as _, Write as _},
sync::Arc,
};
use std::sync::Arc;

pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";

pub trait RowListLen {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move the entire trait out? Isn't the idea that we only want SpacetimeTypes in this file?

/// Returns the length of the list.
/// Returns the length, in number of rows, not bytes, of the row list.
fn len(&self) -> usize;
/// Returns whether the list is empty or not.
fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -86,16 +82,9 @@ pub trait WebsocketFormat: Sized {
+ Clone
+ Default;

/// Encodes the `elems` to a list in the format and also returns the length of the list.
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64);

/// The type used to encode query updates.
/// This type exists so that some formats, e.g., BSATN, can compress an update.
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;

/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
/// This allows some formats to e.g., compress the update.
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
}

/// Messages sent from the client to the server.
Expand Down Expand Up @@ -666,22 +655,6 @@ pub enum CompressableQueryUpdate<F: WebsocketFormat> {
Gzip(Bytes),
}

impl CompressableQueryUpdate<BsatnFormat> {
pub fn maybe_decompress(self) -> QueryUpdate<BsatnFormat> {
match self {
Self::Uncompressed(qu) => qu,
Self::Brotli(bytes) => {
let bytes = brotli_decompress(&bytes).unwrap();
bsatn::from_slice(&bytes).unwrap()
}
Self::Gzip(bytes) => {
let bytes = gzip_decompress(&bytes).unwrap();
bsatn::from_slice(&bytes).unwrap()
}
}
}
}

#[derive(SpacetimeType, Debug, Clone)]
#[sats(crate = spacetimedb_lib)]
pub struct QueryUpdate<F: WebsocketFormat> {
Expand Down Expand Up @@ -756,23 +729,8 @@ pub struct JsonFormat;

impl WebsocketFormat for JsonFormat {
type Single = ByteString;

type List = Vec<ByteString>;

fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
let mut count = 0;
let list = elems
.map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into())
.inspect(|_| count += 1)
.collect();
(list, count)
}

type QueryUpdate = QueryUpdate<Self>;

fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
qu
}
}

#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
Expand All @@ -781,57 +739,8 @@ pub struct BsatnFormat;

impl WebsocketFormat for BsatnFormat {
type Single = Box<[u8]>;

type List = BsatnRowList;

fn encode_list<R: ToBsatn + Serialize>(mut elems: impl Iterator<Item = R>) -> (Self::List, u64) {
// For an empty list, the size of a row is unknown, so use `RowOffsets`.
let Some(first) = elems.next() else {
return (BsatnRowList::row_offsets(), 0);
};
// We have at least one row. Determine the static size from that, if available.
let (mut list, mut scratch) = match first.static_bsatn_size() {
Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)),
None => (BsatnRowListBuilder::row_offsets(), Vec::new()),
};
// Add the first element and then the rest.
// We assume that the schema of rows yielded by `elems` stays the same,
// so once the size is fixed, it will stay that way.
let mut count = 0;
let mut push = |elem: R| {
elem.to_bsatn_extend(&mut scratch).unwrap();
list.push(&scratch);
scratch.clear();
count += 1;
};
push(first);
for elem in elems {
push(elem);
}
(list.finish(), count)
}

type QueryUpdate = CompressableQueryUpdate<Self>;

fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();

match decide_compression(qu_len_would_have_been, compression) {
Compression::None => CompressableQueryUpdate::Uncompressed(qu),
Compression::Brotli => {
let bytes = bsatn::to_vec(&qu).unwrap();
let mut out = Vec::new();
brotli_compress(&bytes, &mut out);
CompressableQueryUpdate::Brotli(out.into())
}
Compression::Gzip => {
let bytes = bsatn::to_vec(&qu).unwrap();
let mut out = Vec::new();
gzip_compress(&bytes, &mut out);
CompressableQueryUpdate::Gzip(out.into())
}
}
}
}

/// A specification of either a desired or decided compression algorithm.
Expand All @@ -846,69 +755,28 @@ pub enum Compression {
Gzip,
}

pub fn decide_compression(len: usize, compression: Compression) -> Compression {
/// The threshold beyond which we start to compress messages.
/// 1KiB was chosen without measurement.
/// TODO(perf): measure!
const COMPRESS_THRESHOLD: usize = 1024;

if len > COMPRESS_THRESHOLD {
compression
} else {
Compression::None
}
}

pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) {
// We are optimizing for compression speed,
// so we choose the lowest (fastest) level of compression.
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
// for large `SubscriptionUpdate` messages at this level.
const COMPRESSION_LEVEL: i32 = 1;

let params = brotli::enc::BrotliEncoderParams {
quality: COMPRESSION_LEVEL,
..<_>::default()
};
let reader = &mut &bytes[..];
brotli::BrotliCompress(reader, out, &params).expect("should be able to BrotliCompress");
}

pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
let mut decompressed = Vec::new();
brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
Ok(decompressed)
}

pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) {
let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast());
encoder.write_all(bytes).unwrap();
encoder.finish().expect("should be able to gzip compress `bytes`");
}

pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
let mut decompressed = Vec::new();
let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
Ok(decompressed)
}

type RowSize = u16;
type RowOffset = u64;
pub type RowSize = u16;
pub type RowOffset = u64;

/// A packed list of BSATN-encoded rows.
#[derive(SpacetimeType, Debug, Clone)]
#[derive(SpacetimeType, Debug, Clone, Default)]
#[sats(crate = spacetimedb_lib)]
pub struct BsatnRowList<B = Bytes, I = Arc<[RowOffset]>> {
pub struct BsatnRowList {
/// A size hint about `rows_data`
/// intended to facilitate parallel decode purposes on large initial updates.
size_hint: RowSizeHint<I>,
size_hint: RowSizeHint,
/// The flattened byte array for a list of rows.
rows_data: B,
rows_data: Bytes,
}

impl Default for BsatnRowList {
fn default() -> Self {
Self::row_offsets()
impl BsatnRowList {
/// Returns a new row list where `rows_data` is the flattened byte array
/// containing the BSATN of each row, without any markers for where a row begins and end.
///
/// The `size_hint` encodes the boundaries of each row in `rows_data`.
/// See [`RowSizeHint`] for more details on the encoding.
pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self {
Self { size_hint, rows_data }
}
}

Expand All @@ -917,17 +785,23 @@ impl Default for BsatnRowList {
/// The use-case for this is clients who are bandwidth limited and where every byte counts.
#[derive(SpacetimeType, Debug, Clone)]
#[sats(crate = spacetimedb_lib)]
pub enum RowSizeHint<I> {
pub enum RowSizeHint {
/// Each row in `rows_data` is of the same fixed size as specified here.
FixedSize(RowSize),
/// The offsets into `rows_data` defining the boundaries of each row.
/// Only stores the offset to the start of each row.
/// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
/// The behavior of this is identical to that of `PackedStr`.
RowOffsets(I),
RowOffsets(Arc<[RowOffset]>),
}

impl Default for RowSizeHint {
fn default() -> Self {
Self::RowOffsets([].into())
}
}

impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
impl RowSizeHint {
fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
match self {
Self::FixedSize(size) => {
Expand All @@ -952,37 +826,17 @@ impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
}
}

impl<B: Default, I> BsatnRowList<B, I> {
pub fn fixed(row_size: RowSize) -> Self {
Self {
size_hint: RowSizeHint::FixedSize(row_size),
rows_data: <_>::default(),
}
}

/// Returns a new empty list using indices
pub fn row_offsets() -> Self
where
I: From<[RowOffset; 0]>,
{
Self {
size_hint: RowSizeHint::RowOffsets([].into()),
rows_data: <_>::default(),
}
}
}

impl<B: AsRef<[u8]>, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList<B, I> {
/// Returns the length of the row list.
impl RowListLen for BsatnRowList {
fn len(&self) -> usize {
match &self.size_hint {
// `size != 0` is always the case for `FixedSize`.
RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
}
}
}

impl<B: AsRef<[u8]>, I> ByteListLen for BsatnRowList<B, I> {
impl ByteListLen for BsatnRowList {
/// Returns the uncompressed size of the list in bytes
fn num_bytes(&self) -> usize {
self.rows_data.as_ref().len()
Expand Down Expand Up @@ -1020,28 +874,3 @@ impl Iterator for BsatnRowListIter<'_> {
self.list.get(index)
}
}

/// A [`BsatnRowList`] that can be added to.
pub type BsatnRowListBuilder = BsatnRowList<Vec<u8>, Vec<RowOffset>>;

impl BsatnRowListBuilder {
/// Adds `row`, BSATN-encoded to this list.
#[inline]
pub fn push(&mut self, row: &[u8]) {
if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint {
offsets.push(self.rows_data.len() as u64);
}
self.rows_data.extend_from_slice(row);
}

/// Finish the in flight list, throwing away the capability to mutate.
pub fn finish(self) -> BsatnRowList {
let Self { size_hint, rows_data } = self;
let rows_data = rows_data.into();
let size_hint = match size_hint {
RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs),
RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
};
BsatnRowList { size_hint, rows_data }
}
}
7 changes: 4 additions & 3 deletions crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{ClientConfig, DataMessage, Protocol};
use crate::host::module_host::{EventStatus, ModuleEvent};
use crate::host::ArgsTuple;
use crate::messages::websocket as ws;
use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress};
use bytes::{BufMut, Bytes, BytesMut};
use bytestring::ByteString;
use derive_more::From;
Expand Down Expand Up @@ -148,10 +149,10 @@ pub fn serialize(
});

// Conditionally compress the message.
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) {
Compression::None => buffer.uncompressed(),
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress),
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress),
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress),
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress),
};
(in_use, msg_bytes.into())
}
Expand Down
Loading
Loading