Skip to content

Commit abe31f8

Browse files
committed
split WebsocketFormat, adding BuildableWebsocketFormat
1 parent b45b193 commit abe31f8

File tree

6 files changed

+31
-25
lines changed

6 files changed

+31
-25
lines changed

crates/client-api-messages/src/websocket.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ pub trait WebsocketFormat: Sized {
9898
+ Clone
9999
+ Default;
100100

101+
/// The type used to encode query updates.
102+
/// This type exists so that some formats, e.g., BSATN, can compress an update.
103+
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
104+
}
105+
106+
pub trait BuildableWebsocketFormat: WebsocketFormat {
101107
/// The builder for [`Self::List`].
102108
type ListBuilder: RowListBuilder<FinishedList = Self::List>;
103109

@@ -112,10 +118,6 @@ pub trait WebsocketFormat: Sized {
112118
(list.finish(), num_rows)
113119
}
114120

115-
/// The type used to encode query updates.
116-
/// This type exists so that some formats, e.g., BSATN, can compress an update.
117-
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
118-
119121
/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
120122
/// This allows some formats to e.g., compress the update.
121123
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
@@ -779,11 +781,12 @@ pub struct JsonFormat;
779781

780782
impl WebsocketFormat for JsonFormat {
781783
type Single = ByteString;
782-
783784
type List = Vec<ByteString>;
784-
type ListBuilder = Self::List;
785-
786785
type QueryUpdate = QueryUpdate<Self>;
786+
}
787+
788+
impl BuildableWebsocketFormat for JsonFormat {
789+
type ListBuilder = Self::List;
787790

788791
fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
789792
qu
@@ -807,11 +810,12 @@ pub struct BsatnFormat;
807810

808811
impl WebsocketFormat for BsatnFormat {
809812
type Single = Box<[u8]>;
810-
811813
type List = BsatnRowList;
812-
type ListBuilder = BsatnRowListBuilder;
813-
814814
type QueryUpdate = CompressableQueryUpdate<Self>;
815+
}
816+
817+
impl BuildableWebsocketFormat for BsatnFormat {
818+
type ListBuilder = BsatnRowListBuilder;
815819

816820
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
817821
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();

crates/core/src/host/module_host.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use derive_more::From;
2828
use indexmap::IndexSet;
2929
use itertools::Itertools;
3030
use prometheus::{Histogram, IntGauge};
31-
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
31+
use spacetimedb_client_api_messages::websocket::{
32+
BuildableWebsocketFormat, ByteListLen, Compression, OneOffTable, QueryUpdate,
33+
};
3234
use spacetimedb_data_structures::error_stream::ErrorStream;
3335
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3436
use spacetimedb_execution::pipelined::PipelinedProject;
@@ -134,7 +136,7 @@ impl UpdatesRelValue<'_> {
134136
!(self.deletes.is_empty() && self.inserts.is_empty())
135137
}
136138

137-
pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
139+
pub fn encode<F: BuildableWebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
138140
let (deletes, nr_del) = F::encode_list(self.deletes.iter());
139141
let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
140142
let num_rows = nr_del + nr_ins;
@@ -1026,7 +1028,7 @@ impl ModuleHost {
10261028
/// This only returns an error if there is a db-level problem.
10271029
/// An error with the query itself will be sent to the client.
10281030
#[tracing::instrument(level = "trace", skip_all)]
1029-
pub async fn one_off_query<F: WebsocketFormat>(
1031+
pub async fn one_off_query<F: BuildableWebsocketFormat>(
10301032
&self,
10311033
caller_identity: Identity,
10321034
query: String,

crates/core/src/subscription/execution_unit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::messages::websocket::TableUpdate;
99
use crate::util::slow::SlowQueryLogger;
1010
use crate::vm::{build_query, TxMode};
1111
use spacetimedb_client_api_messages::websocket::{
12-
Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
12+
BuildableWebsocketFormat, Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate,
1313
};
1414
use spacetimedb_lib::Identity;
1515
use spacetimedb_primitives::TableId;
@@ -236,7 +236,7 @@ impl ExecutionUnit {
236236

237237
/// Evaluate this execution unit against the database using the specified format.
238238
#[tracing::instrument(level = "trace", skip_all)]
239-
pub fn eval<F: WebsocketFormat>(
239+
pub fn eval<F: BuildableWebsocketFormat>(
240240
&self,
241241
db: &RelationalDB,
242242
tx: &Tx,

crates/core/src/subscription/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use module_subscription_manager::Plan;
55
use prometheus::IntCounter;
66
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
77
use spacetimedb_client_api_messages::websocket::{
8-
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _, SingleQueryUpdate, TableUpdate,
9-
WebsocketFormat,
8+
BuildableWebsocketFormat, ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _,
9+
SingleQueryUpdate, TableUpdate,
1010
};
1111
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1212
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
@@ -98,7 +98,7 @@ impl MetricsRecorder for ExecutionCounters {
9898
pub fn execute_plan<Tx, F>(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)>
9999
where
100100
Tx: Datastore + DeltaStore,
101-
F: WebsocketFormat,
101+
F: BuildableWebsocketFormat,
102102
{
103103
let mut count = 0;
104104
let mut list = F::ListBuilder::default();
@@ -136,7 +136,7 @@ pub fn collect_table_update<Tx, F>(
136136
) -> Result<(TableUpdate<F>, ExecutionMetrics)>
137137
where
138138
Tx: Datastore + DeltaStore,
139-
F: WebsocketFormat,
139+
F: BuildableWebsocketFormat,
140140
{
141141
execute_plan::<Tx, F>(plan_fragments, tx).map(|(rows, num_rows, metrics)| {
142142
let empty = F::List::default();
@@ -169,7 +169,7 @@ pub fn execute_plans<Tx, F>(
169169
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
170170
where
171171
Tx: Datastore + DeltaStore + Sync,
172-
F: WebsocketFormat,
172+
F: BuildableWebsocketFormat,
173173
{
174174
plans
175175
.par_iter()

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use hashbrown::{HashMap, HashSet};
1717
use parking_lot::RwLock;
1818
use prometheus::IntGauge;
1919
use spacetimedb_client_api_messages::websocket::{
20-
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate,
21-
WebsocketFormat,
20+
BsatnFormat, BuildableWebsocketFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate,
21+
SingleQueryUpdate,
2222
};
2323
use spacetimedb_data_structures::map::{Entry, IntMap};
2424
use spacetimedb_lib::metrics::ExecutionMetrics;
@@ -1187,7 +1187,7 @@ impl SubscriptionManager {
11871187
let mut ops_bin_uncompressed: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
11881188
let mut ops_json: Option<(QueryUpdate<JsonFormat>, _, _)> = None;
11891189

1190-
fn memo_encode<F: WebsocketFormat>(
1190+
fn memo_encode<F: BuildableWebsocketFormat>(
11911191
updates: &UpdatesRelValue<'_>,
11921192
memory: &mut Option<(F::QueryUpdate, u64, usize)>,
11931193
metrics: &mut ExecutionMetrics,

crates/core/src/subscription/subscription.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::sql::ast::SchemaViewer;
3232
use crate::vm::{build_query, TxMode};
3333
use anyhow::Context;
3434
use itertools::Either;
35-
use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat};
35+
use spacetimedb_client_api_messages::websocket::{BuildableWebsocketFormat, Compression};
3636
use spacetimedb_data_structures::map::HashSet;
3737
use spacetimedb_lib::db::auth::{StAccess, StTableType};
3838
use spacetimedb_lib::identity::AuthCtx;
@@ -512,7 +512,7 @@ pub struct ExecutionSet {
512512
}
513513

514514
impl ExecutionSet {
515-
pub fn eval<F: WebsocketFormat>(
515+
pub fn eval<F: BuildableWebsocketFormat>(
516516
&self,
517517
db: &RelationalDB,
518518
tx: &Tx,

0 commit comments

Comments
 (0)