Skip to content

Commit a4983da

Browse files
committed
execute_plan: don't build temporary Vec of rows
1 parent e523625 commit a4983da

File tree

3 files changed

+153
-94
lines changed

3 files changed

+153
-94
lines changed

crates/client-api-messages/src/websocket.rs

Lines changed: 143 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use bytes::Bytes;
1919
use bytestring::ByteString;
2020
use core::{
2121
fmt::Debug,
22+
mem,
2223
ops::{Deref, Range},
2324
};
2425
use enum_as_inner::EnumAsInner;
@@ -40,8 +41,19 @@ use std::{
4041
pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
4142
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
4243

44+
/// A list of rows being built.
45+
pub trait RowListBuilder: Default {
46+
type FinishedList;
47+
48+
/// Push a row to the list in a serialized format.
49+
fn push(&mut self, row: impl ToBsatn + Serialize);
50+
51+
/// Finish the in flight list, throwing away the capability to mutate.
52+
fn finish(self) -> Self::FinishedList;
53+
}
54+
4355
pub trait RowListLen {
44-
/// Returns the length of the list.
56+
/// Returns the length, in number of rows, not bytes, of the row list.
4557
fn len(&self) -> usize;
4658
/// Returns whether the list is empty or not.
4759
fn is_empty(&self) -> bool {
@@ -86,8 +98,19 @@ pub trait WebsocketFormat: Sized {
8698
+ Clone
8799
+ Default;
88100

101+
/// The builder for [`Self::List`].
102+
type ListBuilder: RowListBuilder<FinishedList = Self::List>;
103+
89104
/// Encodes the `elems` to a list in the format and also returns the length of the list.
90-
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64);
105+
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
106+
let mut num_rows = 0;
107+
let mut list = Self::ListBuilder::default();
108+
for elem in elems {
109+
num_rows += 1;
110+
list.push(elem);
111+
}
112+
(list.finish(), num_rows)
113+
}
91114

92115
/// The type used to encode query updates.
93116
/// This type exists so that some formats, e.g., BSATN, can compress an update.
@@ -758,15 +781,7 @@ impl WebsocketFormat for JsonFormat {
758781
type Single = ByteString;
759782

760783
type List = Vec<ByteString>;
761-
762-
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
763-
let mut count = 0;
764-
let list = elems
765-
.map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into())
766-
.inspect(|_| count += 1)
767-
.collect();
768-
(list, count)
769-
}
784+
type ListBuilder = Self::List;
770785

771786
type QueryUpdate = QueryUpdate<Self>;
772787

@@ -775,6 +790,17 @@ impl WebsocketFormat for JsonFormat {
775790
}
776791
}
777792

793+
impl RowListBuilder for Vec<ByteString> {
794+
type FinishedList = Self;
795+
fn push(&mut self, row: impl ToBsatn + Serialize) {
796+
let value = serde_json::to_string(&SerializeWrapper::new(row)).unwrap().into();
797+
self.push(value);
798+
}
799+
fn finish(self) -> Self::FinishedList {
800+
self
801+
}
802+
}
803+
778804
#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
779805
#[sats(crate = spacetimedb_lib)]
780806
pub struct BsatnFormat;
@@ -783,33 +809,7 @@ impl WebsocketFormat for BsatnFormat {
783809
type Single = Box<[u8]>;
784810

785811
type List = BsatnRowList;
786-
787-
fn encode_list<R: ToBsatn + Serialize>(mut elems: impl Iterator<Item = R>) -> (Self::List, u64) {
788-
// For an empty list, the size of a row is unknown, so use `RowOffsets`.
789-
let Some(first) = elems.next() else {
790-
return (BsatnRowList::row_offsets(), 0);
791-
};
792-
// We have at least one row. Determine the static size from that, if available.
793-
let (mut list, mut scratch) = match first.static_bsatn_size() {
794-
Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)),
795-
None => (BsatnRowListBuilder::row_offsets(), Vec::new()),
796-
};
797-
// Add the first element and then the rest.
798-
// We assume that the schema of rows yielded by `elems` stays the same,
799-
// so once the size is fixed, it will stay that way.
800-
let mut count = 0;
801-
let mut push = |elem: R| {
802-
elem.to_bsatn_extend(&mut scratch).unwrap();
803-
list.push(&scratch);
804-
scratch.clear();
805-
count += 1;
806-
};
807-
push(first);
808-
for elem in elems {
809-
push(elem);
810-
}
811-
(list.finish(), count)
812-
}
812+
type ListBuilder = BsatnRowListBuilder;
813813

814814
type QueryUpdate = CompressableQueryUpdate<Self>;
815815

@@ -896,38 +896,38 @@ type RowSize = u16;
896896
type RowOffset = u64;
897897

898898
/// A packed list of BSATN-encoded rows.
899-
#[derive(SpacetimeType, Debug, Clone)]
899+
#[derive(SpacetimeType, Debug, Clone, Default)]
900900
#[sats(crate = spacetimedb_lib)]
901-
pub struct BsatnRowList<B = Bytes, I = Arc<[RowOffset]>> {
901+
pub struct BsatnRowList {
902902
/// A size hint about `rows_data`
903903
/// intended to facilitate parallel decode purposes on large initial updates.
904-
size_hint: RowSizeHint<I>,
904+
size_hint: RowSizeHint,
905905
/// The flattened byte array for a list of rows.
906-
rows_data: B,
907-
}
908-
909-
impl Default for BsatnRowList {
910-
fn default() -> Self {
911-
Self::row_offsets()
912-
}
906+
rows_data: Bytes,
913907
}
914908

915909
/// NOTE(centril, 1.0): We might want to add a `None` variant to this
916910
/// where the client has to decode in a loop until `rows_data` has been exhausted.
917911
/// The use-case for this is clients who are bandwidth limited and where every byte counts.
918912
#[derive(SpacetimeType, Debug, Clone)]
919913
#[sats(crate = spacetimedb_lib)]
920-
pub enum RowSizeHint<I> {
914+
pub enum RowSizeHint {
921915
/// Each row in `rows_data` is of the same fixed size as specified here.
922916
FixedSize(RowSize),
923917
/// The offsets into `rows_data` defining the boundaries of each row.
924918
/// Only stores the offset to the start of each row.
925919
/// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
926920
/// The behavior of this is identical to that of `PackedStr`.
927-
RowOffsets(I),
921+
RowOffsets(Arc<[RowOffset]>),
922+
}
923+
924+
impl Default for RowSizeHint {
925+
fn default() -> Self {
926+
Self::RowOffsets([].into())
927+
}
928928
}
929929

930-
impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
930+
impl RowSizeHint {
931931
fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
932932
match self {
933933
Self::FixedSize(size) => {
@@ -952,37 +952,17 @@ impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
952952
}
953953
}
954954

955-
impl<B: Default, I> BsatnRowList<B, I> {
956-
pub fn fixed(row_size: RowSize) -> Self {
957-
Self {
958-
size_hint: RowSizeHint::FixedSize(row_size),
959-
rows_data: <_>::default(),
960-
}
961-
}
962-
963-
/// Returns a new empty list using indices
964-
pub fn row_offsets() -> Self
965-
where
966-
I: From<[RowOffset; 0]>,
967-
{
968-
Self {
969-
size_hint: RowSizeHint::RowOffsets([].into()),
970-
rows_data: <_>::default(),
971-
}
972-
}
973-
}
974-
975-
impl<B: AsRef<[u8]>, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList<B, I> {
976-
/// Returns the length of the row list.
955+
impl RowListLen for BsatnRowList {
977956
fn len(&self) -> usize {
978957
match &self.size_hint {
958+
// `size != 0` is always the case for `FixedSize`.
979959
RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
980960
RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
981961
}
982962
}
983963
}
984964

985-
impl<B: AsRef<[u8]>, I> ByteListLen for BsatnRowList<B, I> {
965+
impl ByteListLen for BsatnRowList {
986966
/// Returns the uncompressed size of the list in bytes
987967
fn num_bytes(&self) -> usize {
988968
self.rows_data.as_ref().len()
@@ -1022,26 +1002,100 @@ impl Iterator for BsatnRowListIter<'_> {
10221002
}
10231003

10241004
/// A [`BsatnRowList`] that can be added to.
1025-
pub type BsatnRowListBuilder = BsatnRowList<Vec<u8>, Vec<RowOffset>>;
1026-
1027-
impl BsatnRowListBuilder {
1028-
/// Adds `row`, BSATN-encoded to this list.
1029-
#[inline]
1030-
pub fn push(&mut self, row: &[u8]) {
1031-
if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint {
1032-
offsets.push(self.rows_data.len() as u64);
1033-
}
1034-
self.rows_data.extend_from_slice(row);
1005+
#[derive(Default)]
1006+
pub struct BsatnRowListBuilder {
1007+
/// A size hint about `rows_data`
1008+
/// intended to facilitate parallel decode purposes on large initial updates.
1009+
size_hint: RowSizeHintBuilder,
1010+
/// The flattened byte array for a list of rows.
1011+
rows_data: Vec<u8>,
1012+
}
1013+
1014+
/// A [`RowSizeHint`] under construction.
1015+
pub enum RowSizeHintBuilder {
1016+
/// We haven't seen any rows yet.
1017+
Empty,
1018+
/// Each row in `rows_data` is of the same fixed size as specified here
1019+
/// but we don't know whether the size fits in `RowSize`
1020+
/// and we don't know whether future rows will also have this size.
1021+
FixedSizeDyn(usize),
1022+
/// Each row in `rows_data` is of the same fixed size as specified here
1023+
/// and we know that this will be the case for future rows as well.
1024+
FixedSizeStatic(RowSize),
1025+
/// The offsets into `rows_data` defining the boundaries of each row.
1026+
/// Only stores the offset to the start of each row.
1027+
/// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
1028+
/// The behavior of this is identical to that of `PackedStr`.
1029+
RowOffsets(Vec<RowOffset>),
1030+
}
1031+
1032+
impl Default for RowSizeHintBuilder {
1033+
fn default() -> Self {
1034+
Self::Empty
10351035
}
1036+
}
10361037

1037-
/// Finish the in flight list, throwing away the capability to mutate.
1038-
pub fn finish(self) -> BsatnRowList {
1038+
impl RowListBuilder for BsatnRowListBuilder {
1039+
type FinishedList = BsatnRowList;
1040+
1041+
fn push(&mut self, row: impl ToBsatn + Serialize) {
1042+
use RowSizeHintBuilder::*;
1043+
1044+
// Record the length before. It will be the starting offset of `row`.
1045+
let len_before = self.rows_data.len();
1046+
// BSATN-encode the row directly to the buffer.
1047+
row.to_bsatn_extend(&mut self.rows_data).unwrap();
1048+
1049+
let encoded_len = || self.rows_data.len() - len_before;
1050+
let push_row_offset = |mut offsets: Vec<_>| {
1051+
offsets.push(len_before as u64);
1052+
RowOffsets(offsets)
1053+
};
1054+
1055+
let hint = mem::replace(&mut self.size_hint, Empty);
1056+
self.size_hint = match hint {
1057+
// Static size that is unchanging.
1058+
h @ FixedSizeStatic(_) => h,
1059+
// Dynamic size that is unchanging.
1060+
h @ FixedSizeDyn(size) if size == encoded_len() => h,
1061+
// Size mismatch for the dynamic fixed size.
1062+
// Now we must construct `RowOffsets` for all rows thus far.
1063+
// We know that `size != 0` here, as this was excluded when we had `Empty`.
1064+
FixedSizeDyn(size) => RowOffsets(collect_offsets_from_num_rows(1 + len_before / size, size)),
1065+
// Once there's a size for each row, we'll just add to it.
1066+
RowOffsets(offsets) => push_row_offset(offsets),
1067+
// First time a row is seen. Use `encoded_len()` as the hint.
1068+
// If we have a static layout, we'll always have a fixed size.
1069+
// Otherwise, let's start out with a potentially fixed size.
1070+
// In either case, if `encoded_len() == 0`, we have to store offsets,
1071+
// as we cannot recover the number of elements otherwise.
1072+
Empty => match row.static_bsatn_size() {
1073+
Some(0) => push_row_offset(Vec::new()),
1074+
Some(size) => FixedSizeStatic(size),
1075+
None => match encoded_len() {
1076+
0 => push_row_offset(Vec::new()),
1077+
size => FixedSizeDyn(size),
1078+
},
1079+
},
1080+
};
1081+
}
1082+
1083+
fn finish(self) -> Self::FinishedList {
10391084
let Self { size_hint, rows_data } = self;
1040-
let rows_data = rows_data.into();
10411085
let size_hint = match size_hint {
1042-
RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs),
1043-
RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
1086+
RowSizeHintBuilder::Empty => RowSizeHint::RowOffsets([].into()),
1087+
RowSizeHintBuilder::FixedSizeStatic(fs) => RowSizeHint::FixedSize(fs),
1088+
RowSizeHintBuilder::FixedSizeDyn(fs) => match u16::try_from(fs) {
1089+
Ok(fs) => RowSizeHint::FixedSize(fs),
1090+
Err(_) => RowSizeHint::RowOffsets(collect_offsets_from_num_rows(rows_data.len() / fs, fs).into()),
1091+
},
1092+
RowSizeHintBuilder::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
10441093
};
1094+
let rows_data = rows_data.into();
10451095
BsatnRowList { size_hint, rows_data }
10461096
}
10471097
}
1098+
1099+
fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec<u64> {
1100+
(0..num_rows).map(|i| i * size).map(|o| o as u64).collect()
1101+
}

crates/core/src/subscription/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use module_subscription_manager::Plan;
55
use prometheus::IntCounter;
66
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
77
use spacetimedb_client_api_messages::websocket::{
8-
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
8+
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate,
9+
WebsocketFormat,
910
};
1011
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1112
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
@@ -99,20 +100,22 @@ where
99100
Tx: Datastore + DeltaStore,
100101
F: WebsocketFormat,
101102
{
102-
let mut rows = vec![];
103+
let mut count = 0;
104+
let mut list = F::ListBuilder::default();
103105
let mut metrics = ExecutionMetrics::default();
104106

105107
for fragment in plan_fragments {
106108
fragment.execute(tx, &mut metrics, &mut |row| {
107-
rows.push(row);
109+
count += 1;
110+
list.push(row);
108111
Ok(())
109112
})?;
110113
}
111114

112-
let (list, n) = F::encode_list(rows.into_iter());
115+
let list = list.finish();
113116
metrics.bytes_scanned += list.num_bytes();
114117
metrics.bytes_sent_to_clients += list.num_bytes();
115-
Ok((list, n, metrics))
118+
Ok((list, count, metrics))
116119
}
117120

118121
/// When collecting a table update are we inserting or deleting rows?

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,8 @@ impl SubscriptionManager {
11941194
) -> SingleQueryUpdate<F> {
11951195
let (update, num_rows, num_bytes) = memory
11961196
.get_or_insert_with(|| {
1197+
// TODO(centril): consider pushing the encoding of each row into
1198+
// `eval_delta` instead, to avoid building the temporary `Vec`s in `UpdatesRelValue`.
11971199
let encoded = updates.encode::<F>();
11981200
// The first time we insert into this map, we call encode.
11991201
// This is when we serialize the rows to BSATN/JSON.

0 commit comments

Comments
 (0)