Skip to content

Commit c134d82

Browse files
committed
move building and compression out of 'client-api-messages'
1 parent c7bffe6 commit c134d82

File tree

17 files changed

+96
-96
lines changed

17 files changed

+96
-96
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api-messages/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ spacetimedb-sats = { workspace = true, features = ["bytestring"] }
1212

1313
bytes.workspace = true
1414
bytestring.workspace = true
15-
brotli.workspace = true
1615
chrono = { workspace = true, features = ["serde"] }
1716
enum-as-inner.workspace = true
18-
flate2.workspace = true
1917
serde = { workspace = true, features = ["derive"] }
2018
serde_json.workspace = true
2119
serde_with.workspace = true

crates/client-api-messages/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,3 @@ pub mod energy;
44
pub mod http;
55
pub mod name;
66
pub mod websocket;
7-
mod websocket_building;

crates/client-api-messages/src/websocket.rs

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,12 @@ use smallvec::SmallVec;
2626
use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
2727
use spacetimedb_primitives::TableId;
2828
use spacetimedb_sats::{
29-
bsatn,
3029
de::{Deserialize, Error},
3130
impl_deserialize, impl_serialize, impl_st,
3231
ser::Serialize,
3332
AlgebraicType, SpacetimeType,
3433
};
35-
use std::{
36-
io::{self, Read as _},
37-
sync::Arc,
38-
};
39-
40-
pub use crate::websocket_building::*;
34+
use std::sync::Arc;
4135

4236
pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
4337
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
@@ -661,22 +655,6 @@ pub enum CompressableQueryUpdate<F: WebsocketFormat> {
661655
Gzip(Bytes),
662656
}
663657

664-
impl CompressableQueryUpdate<BsatnFormat> {
665-
pub fn maybe_decompress(self) -> QueryUpdate<BsatnFormat> {
666-
match self {
667-
Self::Uncompressed(qu) => qu,
668-
Self::Brotli(bytes) => {
669-
let bytes = brotli_decompress(&bytes).unwrap();
670-
bsatn::from_slice(&bytes).unwrap()
671-
}
672-
Self::Gzip(bytes) => {
673-
let bytes = gzip_decompress(&bytes).unwrap();
674-
bsatn::from_slice(&bytes).unwrap()
675-
}
676-
}
677-
}
678-
}
679-
680658
#[derive(SpacetimeType, Debug, Clone)]
681659
#[sats(crate = spacetimedb_lib)]
682660
pub struct QueryUpdate<F: WebsocketFormat> {
@@ -777,18 +755,6 @@ pub enum Compression {
777755
Gzip,
778756
}
779757

780-
pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
781-
let mut decompressed = Vec::new();
782-
brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
783-
Ok(decompressed)
784-
}
785-
786-
pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
787-
let mut decompressed = Vec::new();
788-
let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
789-
Ok(decompressed)
790-
}
791-
792758
pub type RowSize = u16;
793759
pub type RowOffset = u64;
794760

crates/core/src/client/messages.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::{ClientConfig, DataMessage, Protocol};
22
use crate::host::module_host::{EventStatus, ModuleEvent};
33
use crate::host::ArgsTuple;
44
use crate::messages::websocket as ws;
5+
use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress};
56
use bytes::{BufMut, Bytes, BytesMut};
67
use bytestring::ByteString;
78
use derive_more::From;
@@ -148,10 +149,10 @@ pub fn serialize(
148149
});
149150

150151
// Conditionally compress the message.
151-
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
152+
let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) {
152153
Compression::None => buffer.uncompressed(),
153-
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress),
154-
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress),
154+
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress),
155+
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress),
155156
};
156157
(in_use, msg_bytes.into())
157158
}

crates/core/src/host/module_host.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::sql::parser::RowLevelExpr;
1616
use crate::subscription::execute_plan;
1717
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
1818
use crate::subscription::tx::DeltaTx;
19+
use crate::subscription::websocket_building::BuildableWebsocketFormat;
1920
use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread};
2021
use crate::vm::check_row_limit;
2122
use crate::worker_metrics::WORKER_METRICS;
@@ -25,9 +26,7 @@ use derive_more::From;
2526
use indexmap::IndexSet;
2627
use itertools::Itertools;
2728
use prometheus::{Histogram, IntGauge};
28-
use spacetimedb_client_api_messages::websocket::{
29-
BuildableWebsocketFormat, ByteListLen, Compression, OneOffTable, QueryUpdate,
30-
};
29+
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate};
3130
use spacetimedb_data_structures::error_stream::ErrorStream;
3231
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3332
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};

crates/core/src/subscription/execution_unit.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ use crate::error::DBError;
55
use crate::estimation;
66
use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue};
77
use crate::messages::websocket::TableUpdate;
8+
use crate::subscription::websocket_building::BuildableWebsocketFormat;
89
use crate::util::slow::SlowQueryLogger;
910
use crate::vm::{build_query, TxMode};
10-
use spacetimedb_client_api_messages::websocket::{
11-
BuildableWebsocketFormat, Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate,
12-
};
11+
use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate};
1312
use spacetimedb_datastore::locking_tx_datastore::TxId;
1413
use spacetimedb_lib::Identity;
1514
use spacetimedb_primitives::TableId;

crates/core/src/subscription/mod.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
1-
use std::sync::Arc;
2-
1+
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _};
2+
use crate::{error::DBError, worker_metrics::WORKER_METRICS};
33
use anyhow::Result;
44
use module_subscription_manager::Plan;
55
use prometheus::IntCounter;
66
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
77
use spacetimedb_client_api_messages::websocket::{
8-
BuildableWebsocketFormat, ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _,
9-
SingleQueryUpdate, TableUpdate,
8+
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate,
109
};
11-
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
12-
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
13-
use spacetimedb_primitives::TableId;
14-
15-
use crate::{error::DBError, worker_metrics::WORKER_METRICS};
1610
use spacetimedb_datastore::{
1711
db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder,
1812
};
13+
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
14+
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
15+
use spacetimedb_primitives::TableId;
16+
use std::sync::Arc;
1917

2018
pub mod delta;
2119
pub mod execution_unit;
@@ -25,6 +23,7 @@ pub mod query;
2523
#[allow(clippy::module_inception)] // it's right this isn't ideal :/
2624
pub mod subscription;
2725
pub mod tx;
26+
pub mod websocket_building;
2827

2928
#[derive(Debug)]
3029
pub struct ExecutionCounters {

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ use crate::error::DBError;
99
use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue};
1010
use crate::messages::websocket::{self as ws, TableUpdate};
1111
use crate::subscription::delta::eval_delta;
12+
use crate::subscription::websocket_building::BuildableWebsocketFormat;
1213
use crate::worker_metrics::WORKER_METRICS;
1314
use core::mem;
1415
use hashbrown::hash_map::OccupiedError;
1516
use hashbrown::{HashMap, HashSet};
1617
use parking_lot::RwLock;
1718
use prometheus::IntGauge;
1819
use spacetimedb_client_api_messages::websocket::{
19-
BsatnFormat, BuildableWebsocketFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate,
20-
SingleQueryUpdate,
20+
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate,
2121
};
2222
use spacetimedb_data_structures::map::{Entry, IntMap};
2323
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;

crates/core/src/subscription/query.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
use crate::vm::tests::create_table_with_rows;
160160
use crate::vm::DbProgram;
161161
use itertools::Itertools;
162-
use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression};
162+
use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression};
163163
use spacetimedb_datastore::execution_context::Workload;
164164
use spacetimedb_lib::bsatn;
165165
use spacetimedb_lib::db::auth::{StAccess, StTableType};
@@ -353,7 +353,7 @@ mod tests {
353353
total_tables: usize,
354354
rows: &[ProductValue],
355355
) -> ResultTest<()> {
356-
let result = s.eval::<BsatnFormat>(db, tx, None, Compression::Brotli).tables;
356+
let result = s.eval::<BsatnFormat>(db, tx, None, Compression::None).tables;
357357
assert_eq!(
358358
result.len(),
359359
total_tables,
@@ -363,7 +363,10 @@ mod tests {
363363
let result = result
364364
.into_iter()
365365
.flat_map(|x| x.updates)
366-
.map(|x| x.maybe_decompress())
366+
.map(|x| match x {
367+
CompressableQueryUpdate::Uncompressed(x) => x,
368+
_ => unreachable!(),
369+
})
367370
.flat_map(|x| {
368371
(&x.deletes)
369372
.into_iter()

0 commit comments

Comments
 (0)