Skip to content

Commit 712e2f6

Browse files
committed
split WebsocketFormat, adding BuildableWebsocketFormat
1 parent c944598 commit 712e2f6

File tree

6 files changed

+31
-25
lines changed

6 files changed

+31
-25
lines changed

crates/client-api-messages/src/websocket.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ pub trait WebsocketFormat: Sized {
9898
+ Clone
9999
+ Default;
100100

101+
/// The type used to encode query updates.
102+
/// This type exists so that some formats, e.g., BSATN, can compress an update.
103+
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
104+
}
105+
106+
pub trait BuildableWebsocketFormat: WebsocketFormat {
101107
/// The builder for [`Self::List`].
102108
type ListBuilder: RowListBuilder<FinishedList = Self::List>;
103109

@@ -112,10 +118,6 @@ pub trait WebsocketFormat: Sized {
112118
(list.finish(), num_rows)
113119
}
114120

115-
/// The type used to encode query updates.
116-
/// This type exists so that some formats, e.g., BSATN, can compress an update.
117-
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
118-
119121
/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
120122
/// This allows some formats to e.g., compress the update.
121123
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
@@ -779,11 +781,12 @@ pub struct JsonFormat;
779781

780782
impl WebsocketFormat for JsonFormat {
781783
type Single = ByteString;
782-
783784
type List = Vec<ByteString>;
784-
type ListBuilder = Self::List;
785-
786785
type QueryUpdate = QueryUpdate<Self>;
786+
}
787+
788+
impl BuildableWebsocketFormat for JsonFormat {
789+
type ListBuilder = Self::List;
787790

788791
fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
789792
qu
@@ -807,11 +810,12 @@ pub struct BsatnFormat;
807810

808811
impl WebsocketFormat for BsatnFormat {
809812
type Single = Box<[u8]>;
810-
811813
type List = BsatnRowList;
812-
type ListBuilder = BsatnRowListBuilder;
813-
814814
type QueryUpdate = CompressableQueryUpdate<Self>;
815+
}
816+
817+
impl BuildableWebsocketFormat for BsatnFormat {
818+
type ListBuilder = BsatnRowListBuilder;
815819

816820
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
817821
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();

crates/core/src/host/module_host.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use derive_more::From;
2525
use indexmap::IndexSet;
2626
use itertools::Itertools;
2727
use prometheus::{Histogram, IntGauge};
28-
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
28+
use spacetimedb_client_api_messages::websocket::{
29+
BuildableWebsocketFormat, ByteListLen, Compression, OneOffTable, QueryUpdate,
30+
};
2931
use spacetimedb_data_structures::error_stream::ErrorStream;
3032
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3133
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
@@ -134,7 +136,7 @@ impl UpdatesRelValue<'_> {
134136
!(self.deletes.is_empty() && self.inserts.is_empty())
135137
}
136138

137-
pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
139+
pub fn encode<F: BuildableWebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
138140
let (deletes, nr_del) = F::encode_list(self.deletes.iter());
139141
let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
140142
let num_rows = nr_del + nr_ins;
@@ -1096,7 +1098,7 @@ impl ModuleHost {
10961098
/// This only returns an error if there is a db-level problem.
10971099
/// An error with the query itself will be sent to the client.
10981100
#[tracing::instrument(level = "trace", skip_all)]
1099-
pub async fn one_off_query<F: WebsocketFormat>(
1101+
pub async fn one_off_query<F: BuildableWebsocketFormat>(
11001102
&self,
11011103
caller_identity: Identity,
11021104
query: String,

crates/core/src/subscription/execution_unit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::messages::websocket::TableUpdate;
88
use crate::util::slow::SlowQueryLogger;
99
use crate::vm::{build_query, TxMode};
1010
use spacetimedb_client_api_messages::websocket::{
11-
Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
11+
BuildableWebsocketFormat, Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate,
1212
};
1313
use spacetimedb_datastore::locking_tx_datastore::TxId;
1414
use spacetimedb_lib::Identity;
@@ -236,7 +236,7 @@ impl ExecutionUnit {
236236

237237
/// Evaluate this execution unit against the database using the specified format.
238238
#[tracing::instrument(level = "trace", skip_all)]
239-
pub fn eval<F: WebsocketFormat>(
239+
pub fn eval<F: BuildableWebsocketFormat>(
240240
&self,
241241
db: &RelationalDB,
242242
tx: &Tx,

crates/core/src/subscription/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use module_subscription_manager::Plan;
55
use prometheus::IntCounter;
66
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
77
use spacetimedb_client_api_messages::websocket::{
8-
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate,
9-
WebsocketFormat,
8+
BuildableWebsocketFormat, ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _,
9+
SingleQueryUpdate, TableUpdate,
1010
};
1111
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1212
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
@@ -96,7 +96,7 @@ impl MetricsRecorder for ExecutionCounters {
9696
pub fn execute_plan<Tx, F>(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)>
9797
where
9898
Tx: Datastore + DeltaStore,
99-
F: WebsocketFormat,
99+
F: BuildableWebsocketFormat,
100100
{
101101
let mut count = 0;
102102
let mut list = F::ListBuilder::default();
@@ -134,7 +134,7 @@ pub fn collect_table_update<Tx, F>(
134134
) -> Result<(TableUpdate<F>, ExecutionMetrics)>
135135
where
136136
Tx: Datastore + DeltaStore,
137-
F: WebsocketFormat,
137+
F: BuildableWebsocketFormat,
138138
{
139139
execute_plan::<Tx, F>(plan_fragments, tx).map(|(rows, num_rows, metrics)| {
140140
let empty = F::List::default();
@@ -167,7 +167,7 @@ pub fn execute_plans<Tx, F>(
167167
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
168168
where
169169
Tx: Datastore + DeltaStore + Sync,
170-
F: WebsocketFormat,
170+
F: BuildableWebsocketFormat,
171171
{
172172
plans
173173
.par_iter()

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use hashbrown::{HashMap, HashSet};
1616
use parking_lot::RwLock;
1717
use prometheus::IntGauge;
1818
use spacetimedb_client_api_messages::websocket::{
19-
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate,
20-
WebsocketFormat,
19+
BsatnFormat, BuildableWebsocketFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate,
20+
SingleQueryUpdate,
2121
};
2222
use spacetimedb_data_structures::map::{Entry, IntMap};
2323
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
@@ -1187,7 +1187,7 @@ impl SubscriptionManager {
11871187
let mut ops_bin_uncompressed: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
11881188
let mut ops_json: Option<(QueryUpdate<JsonFormat>, _, _)> = None;
11891189

1190-
fn memo_encode<F: WebsocketFormat>(
1190+
fn memo_encode<F: BuildableWebsocketFormat>(
11911191
updates: &UpdatesRelValue<'_>,
11921192
memory: &mut Option<(F::QueryUpdate, u64, usize)>,
11931193
metrics: &mut ExecutionMetrics,

crates/core/src/subscription/subscription.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::sql::ast::SchemaViewer;
3131
use crate::vm::{build_query, TxMode};
3232
use anyhow::Context;
3333
use itertools::Either;
34-
use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat};
34+
use spacetimedb_client_api_messages::websocket::{BuildableWebsocketFormat, Compression};
3535
use spacetimedb_data_structures::map::HashSet;
3636
use spacetimedb_datastore::locking_tx_datastore::TxId;
3737
use spacetimedb_lib::db::auth::{StAccess, StTableType};
@@ -512,7 +512,7 @@ pub struct ExecutionSet {
512512
}
513513

514514
impl ExecutionSet {
515-
pub fn eval<F: WebsocketFormat>(
515+
pub fn eval<F: BuildableWebsocketFormat>(
516516
&self,
517517
db: &RelationalDB,
518518
tx: &Tx,

0 commit comments

Comments
 (0)