Skip to content

Commit f3be26f

Browse files
committed
subscriptions: reuse buffers in ServerMessage<BsatnFormat> via global pool
1 parent bccfdb1 commit f3be26f

File tree

24 files changed

+455
-93
lines changed

24 files changed

+455
-93
lines changed

crates/bench/benches/subscription.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use criterion::{black_box, criterion_group, criterion_main, Criterion};
2+
use spacetimedb::client::consume_each_list::ConsumeEachBuffer;
23
use spacetimedb::error::DBError;
34
use spacetimedb::execution_context::Workload;
45
use spacetimedb::host::module_host::DatabaseTableUpdate;
56
use spacetimedb::identity::AuthCtx;
67
use spacetimedb::messages::websocket::BsatnFormat;
78
use spacetimedb::sql::ast::SchemaViewer;
89
use spacetimedb::subscription::query::compile_read_only_queryset;
10+
use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
911
use spacetimedb::subscription::subscription::ExecutionSet;
1012
use spacetimedb::subscription::tx::DeltaTx;
1113
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
@@ -119,6 +121,8 @@ fn eval(c: &mut Criterion) {
119121
let ins_rhs = insert_op(rhs, "location", new_rhs_row);
120122
let update = [&ins_lhs, &ins_rhs];
121123

124+
let bsatn_rlb_pool = BsatnRowListBuilderPool::new();
125+
122126
// A benchmark runner for the new query engine
123127
let bench_query = |c: &mut Criterion, name, sql| {
124128
c.bench_function(name, |b| {
@@ -134,13 +138,17 @@ fn eval(c: &mut Criterion) {
134138
let tx = DeltaTx::from(&tx);
135139

136140
b.iter(|| {
137-
drop(black_box(collect_table_update::<_, BsatnFormat>(
141+
let updates = black_box(collect_table_update::<_, BsatnFormat>(
138142
&plans,
139143
table_id,
140144
table_name.clone(),
141145
&tx,
142146
TableUpdateType::Subscribe,
143-
)))
147+
&bsatn_rlb_pool,
148+
));
149+
if let Ok((updates, _)) = updates {
150+
updates.consume_each_list(&mut |buffer| black_box(bsatn_rlb_pool.try_put(buffer)));
151+
}
144152
})
145153
});
146154
};
@@ -152,12 +160,9 @@ fn eval(c: &mut Criterion) {
152160
let query: ExecutionSet = query.into();
153161

154162
b.iter(|| {
155-
drop(black_box(query.eval::<BsatnFormat>(
156-
&raw.db,
157-
&tx,
158-
None,
159-
Compression::None,
160-
)))
163+
let updates =
164+
black_box(query.eval::<BsatnFormat>(&raw.db, &tx, &bsatn_rlb_pool, None, Compression::None));
165+
updates.consume_each_list(&mut |buffer| black_box(bsatn_rlb_pool.try_put(buffer)));
161166
})
162167
});
163168
};

crates/client-api-messages/src/websocket.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//! rather than using an external mirror of this schema.
1616
1717
use crate::energy::EnergyQuanta;
18-
use bytes::Bytes;
18+
use bytes::{Bytes, BytesMut};
1919
use bytestring::ByteString;
2020
use core::{
2121
fmt::Debug,
@@ -41,8 +41,15 @@ use std::{
4141
pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
4242
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
4343

44+
/// A source of row list builders for a given [`WebsocketFormat`].
45+
pub trait RowListBuilderSource<F: WebsocketFormat> {
46+
/// Returns a row list builder from the source `self`.
47+
fn take_row_list_builder(&self) -> F::ListBuilder;
48+
}
49+
4450
/// A list of rows being built.
45-
pub trait RowListBuilder: Default {
51+
pub trait RowListBuilder {
52+
/// The type of a finished list returned by [`RowListBuilder::finish`].
4653
type FinishedList;
4754

4855
/// Push a row to the list in a serialized format.
@@ -98,13 +105,17 @@ pub trait WebsocketFormat: Sized {
98105
+ Clone
99106
+ Default;
100107

101-
/// The builder for [`Self::List`].
108+
/// The builder for [`WebsocketFormat::List`].
102109
type ListBuilder: RowListBuilder<FinishedList = Self::List>;
103110

104111
/// Encodes the `elems` to a list in the format and also returns the length of the list.
105-
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
112+
///
113+
/// Needs to be provided with an empty [`WebsocketFormat::ListBuilder`].
114+
fn encode_list<R: ToBsatn + Serialize>(
115+
mut list: Self::ListBuilder,
116+
elems: impl Iterator<Item = R>,
117+
) -> (Self::List, u64) {
106118
let mut num_rows = 0;
107-
let mut list = Self::ListBuilder::default();
108119
for elem in elems {
109120
num_rows += 1;
110121
list.push(elem);
@@ -116,7 +127,7 @@ pub trait WebsocketFormat: Sized {
116127
/// This type exists so that some formats, e.g., BSATN, can compress an update.
117128
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
118129

119-
/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
130+
/// Convert a `QueryUpdate` into [`WebsocketFormat::QueryUpdate`].
120131
/// This allows some formats to e.g., compress the update.
121132
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
122133
}
@@ -976,6 +987,11 @@ impl BsatnRowList {
976987
let data_range = self.size_hint.index_to_range(index, data_end)?;
977988
Some(self.rows_data.slice(data_range))
978989
}
990+
991+
/// Consumes the list and returns the parts.
992+
pub fn into_inner(self) -> (RowSizeHint, Bytes) {
993+
(self.size_hint, self.rows_data)
994+
}
979995
}
980996

981997
/// An iterator over all the elements in a [`BsatnRowList`].
@@ -1008,7 +1024,7 @@ pub struct BsatnRowListBuilder {
10081024
/// intended to facilitate parallel decode purposes on large initial updates.
10091025
size_hint: RowSizeHintBuilder,
10101026
/// The flattened byte array for a list of rows.
1011-
rows_data: Vec<u8>,
1027+
rows_data: BytesMut,
10121028
}
10131029

10141030
/// A [`RowSizeHint`] under construction.
@@ -1096,6 +1112,14 @@ impl RowListBuilder for BsatnRowListBuilder {
10961112
}
10971113
}
10981114

1115+
impl BsatnRowListBuilder {
1116+
/// Returns a new builder using an empty [`BytesMut`] for the `rows_data` buffer.
1117+
pub fn new_from_bytes(rows_data: BytesMut) -> Self {
1118+
let size_hint = <_>::default();
1119+
Self { size_hint, rows_data }
1120+
}
1121+
}
1122+
10991123
fn collect_offsets_from_num_rows(num_rows: usize, size: usize) -> Vec<u64> {
11001124
(0..num_rows).map(|i| i * size).map(|o| o as u64).collect()
11011125
}

crates/client-api/src/routes/subscribe.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ async fn ws_client_actor_inner(
234234
let mut closed = false;
235235
let mut rx_buf = Vec::new();
236236

237+
let bsatn_rlb_pool = &client.module.subscriptions().bsatn_rlb_pool.clone();
238+
237239
let mut msg_buffer = SerializeBuffer::new(client.config);
238240
loop {
239241
rx_buf.clear();
@@ -292,7 +294,7 @@ async fn ws_client_actor_inner(
292294

293295
// Serialize the message, report metrics,
294296
// and keep a handle to the buffer.
295-
let (msg_alloc, msg_data) = serialize(msg_buffer, msg, client.config);
297+
let (msg_alloc, msg_data) = serialize(bsatn_rlb_pool, msg_buffer, msg, client.config);
296298
report_ws_sent_metrics(&addr, workload, num_rows, &msg_data);
297299

298300
// Buffer the message without necessarily sending it.
@@ -440,7 +442,7 @@ async fn ws_client_actor_inner(
440442
if let MessageHandleError::Execution(err) = e {
441443
log::error!("reducer execution error: {err:#}");
442444
// Serialize the message and keep a handle to the buffer.
443-
let (msg_alloc, msg_data) = serialize(msg_buffer, err, client.config);
445+
let (msg_alloc, msg_data) = serialize(bsatn_rlb_pool, msg_buffer, err, client.config);
444446

445447
let send = async { ws.send(datamsg_to_wsmsg(msg_data)).await };
446448
let send = tokio::time::timeout(SEND_TIMEOUT, send);

crates/core/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::fmt;
33

44
mod client_connection;
55
mod client_connection_index;
6+
pub mod consume_each_list;
67
mod message_handlers;
78
pub mod messages;
89

crates/core/src/client/client_connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::error::DBError;
1111
use crate::host::module_host::ClientConnectedError;
1212
use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult};
1313
use crate::messages::websocket::Subscribe;
14+
use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool;
1415
use crate::util::asyncify;
1516
use crate::util::prometheus_handle::IntGaugeExt;
1617
use crate::worker_metrics::WORKER_METRICS;
@@ -586,6 +587,7 @@ impl ClientConnection {
586587
self.sender.clone(),
587588
message_id.to_owned(),
588589
timer,
590+
JsonRowListBuilderFakePool,
589591
|msg: OneOffQueryResponseMessage<JsonFormat>| msg.into(),
590592
)
591593
.await
@@ -597,13 +599,15 @@ impl ClientConnection {
597599
message_id: &[u8],
598600
timer: Instant,
599601
) -> Result<(), anyhow::Error> {
602+
let bsatn_rlb_pool = self.module.replica_ctx().subscriptions.bsatn_rlb_pool.clone();
600603
self.module
601604
.one_off_query::<BsatnFormat>(
602605
self.id.identity,
603606
query.to_owned(),
604607
self.sender.clone(),
605608
message_id.to_owned(),
606609
timer,
610+
bsatn_rlb_pool,
607611
|msg: OneOffQueryResponseMessage<BsatnFormat>| msg.into(),
608612
)
609613
.await
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use bytes::Bytes;
2+
use spacetimedb_client_api_messages::websocket::{
3+
BsatnFormat, BsatnRowList, CompressableQueryUpdate, DatabaseUpdate, OneOffQueryResponse, QueryUpdate,
4+
ServerMessage, TableUpdate, UpdateStatus,
5+
};
6+
7+
/// Moves each buffer in `self` into a closure.
8+
pub trait ConsumeEachBuffer {
9+
/// Consumes `self`, moving each `Bytes` buffer in `self` into the closure `each`.
10+
fn consume_each_list(self, each: &mut impl FnMut(Bytes));
11+
}
12+
13+
impl ConsumeEachBuffer for ServerMessage<BsatnFormat> {
14+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
15+
use ServerMessage::*;
16+
match self {
17+
InitialSubscription(x) => x.database_update.consume_each_list(each),
18+
TransactionUpdate(x) => x.status.consume_each_list(each),
19+
TransactionUpdateLight(x) => x.update.consume_each_list(each),
20+
IdentityToken(_) | SubscriptionError(_) => {}
21+
OneOffQueryResponse(x) => x.consume_each_list(each),
22+
SubscribeApplied(x) => x.rows.table_rows.consume_each_list(each),
23+
UnsubscribeApplied(x) => x.rows.table_rows.consume_each_list(each),
24+
SubscribeMultiApplied(x) => x.update.consume_each_list(each),
25+
UnsubscribeMultiApplied(x) => x.update.consume_each_list(each),
26+
}
27+
}
28+
}
29+
30+
impl ConsumeEachBuffer for OneOffQueryResponse<BsatnFormat> {
31+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
32+
Vec::from(self.tables)
33+
.into_iter()
34+
.for_each(|x| x.rows.consume_each_list(each));
35+
}
36+
}
37+
38+
impl ConsumeEachBuffer for UpdateStatus<BsatnFormat> {
39+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
40+
match self {
41+
Self::Committed(x) => x.consume_each_list(each),
42+
Self::Failed(_) | UpdateStatus::OutOfEnergy => {}
43+
}
44+
}
45+
}
46+
47+
impl ConsumeEachBuffer for DatabaseUpdate<BsatnFormat> {
48+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
49+
self.tables.into_iter().for_each(|x| x.consume_each_list(each));
50+
}
51+
}
52+
53+
impl ConsumeEachBuffer for TableUpdate<BsatnFormat> {
54+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
55+
self.updates.into_iter().for_each(|x| x.consume_each_list(each));
56+
}
57+
}
58+
59+
impl ConsumeEachBuffer for CompressableQueryUpdate<BsatnFormat> {
60+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
61+
match self {
62+
Self::Uncompressed(x) => x.consume_each_list(each),
63+
Self::Brotli(bytes) | Self::Gzip(bytes) => each(bytes),
64+
}
65+
}
66+
}
67+
68+
impl ConsumeEachBuffer for QueryUpdate<BsatnFormat> {
69+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
70+
self.deletes.consume_each_list(each);
71+
self.inserts.consume_each_list(each);
72+
}
73+
}
74+
75+
impl ConsumeEachBuffer for BsatnRowList {
76+
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
77+
let (_, buffer) = self.into_inner();
78+
each(buffer);
79+
}
80+
}

crates/core/src/client/messages.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use super::{ClientConfig, DataMessage, Protocol};
2+
use crate::client::consume_each_list::ConsumeEachBuffer;
23
use crate::execution_context::WorkloadType;
34
use crate::host::module_host::{EventStatus, ModuleEvent};
45
use crate::host::ArgsTuple;
56
use crate::messages::websocket as ws;
7+
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
68
use bytes::{BufMut, Bytes, BytesMut};
79
use bytestring::ByteString;
810
use derive_more::From;
@@ -125,6 +127,7 @@ impl InUseSerializeBuffer {
125127
/// If `protocol` is [`Protocol::Binary`],
126128
/// the message will be conditionally compressed by this method according to `compression`.
127129
pub fn serialize(
130+
bsatn_rlb_pool: &BsatnRowListBuilderPool,
128131
mut buffer: SerializeBuffer,
129132
msg: impl ToProtocol<Encoded = SwitchedServerMessage>,
130133
config: ClientConfig,
@@ -147,6 +150,10 @@ pub fn serialize(
147150
bsatn::to_writer(w.into_inner(), &msg).unwrap()
148151
});
149152

153+
// At this point, we no longer have a use for `msg`,
154+
// so try to reclaim its buffers.
155+
msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));
156+
150157
// Conditionally compress the message.
151158
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
152159
Compression::None => buffer.uncompressed(),

0 commit comments

Comments
 (0)