Skip to content

Commit 6271604

Browse files
committed
move building and compression out of 'client-api-messages'
1 parent edb1319 commit 6271604

File tree

17 files changed

+98
-98
lines changed

17 files changed

+98
-98
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api-messages/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ spacetimedb-sats = { workspace = true, features = ["bytestring"] }
1212

1313
bytes.workspace = true
1414
bytestring.workspace = true
15-
brotli.workspace = true
1615
chrono = { workspace = true, features = ["serde"] }
1716
enum-as-inner.workspace = true
18-
flate2.workspace = true
1917
serde = { workspace = true, features = ["derive"] }
2018
serde_json.workspace = true
2119
serde_with.workspace = true

crates/client-api-messages/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,3 @@ pub mod energy;
44
pub mod http;
55
pub mod name;
66
pub mod websocket;
7-
mod websocket_building;

crates/client-api-messages/src/websocket.rs

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,12 @@ use smallvec::SmallVec;
2626
use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
2727
use spacetimedb_primitives::TableId;
2828
use spacetimedb_sats::{
29-
bsatn,
3029
de::{Deserialize, Error},
3130
impl_deserialize, impl_serialize, impl_st,
3231
ser::Serialize,
3332
AlgebraicType, SpacetimeType,
3433
};
35-
use std::{
36-
io::{self, Read as _},
37-
sync::Arc,
38-
};
39-
40-
pub use crate::websocket_building::*;
34+
use std::sync::Arc;
4135

4236
pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
4337
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
@@ -661,22 +655,6 @@ pub enum CompressableQueryUpdate<F: WebsocketFormat> {
661655
Gzip(Bytes),
662656
}
663657

664-
impl CompressableQueryUpdate<BsatnFormat> {
665-
pub fn maybe_decompress(self) -> QueryUpdate<BsatnFormat> {
666-
match self {
667-
Self::Uncompressed(qu) => qu,
668-
Self::Brotli(bytes) => {
669-
let bytes = brotli_decompress(&bytes).unwrap();
670-
bsatn::from_slice(&bytes).unwrap()
671-
}
672-
Self::Gzip(bytes) => {
673-
let bytes = gzip_decompress(&bytes).unwrap();
674-
bsatn::from_slice(&bytes).unwrap()
675-
}
676-
}
677-
}
678-
}
679-
680658
#[derive(SpacetimeType, Debug, Clone)]
681659
#[sats(crate = spacetimedb_lib)]
682660
pub struct QueryUpdate<F: WebsocketFormat> {
@@ -777,18 +755,6 @@ pub enum Compression {
777755
Gzip,
778756
}
779757

780-
pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
781-
let mut decompressed = Vec::new();
782-
brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
783-
Ok(decompressed)
784-
}
785-
786-
pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
787-
let mut decompressed = Vec::new();
788-
let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
789-
Ok(decompressed)
790-
}
791-
792758
pub type RowSize = u16;
793759
pub type RowOffset = u64;
794760

crates/core/src/client/messages.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::execution_context::WorkloadType;
33
use crate::host::module_host::{EventStatus, ModuleEvent};
44
use crate::host::ArgsTuple;
55
use crate::messages::websocket as ws;
6+
use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress};
67
use bytes::{BufMut, Bytes, BytesMut};
78
use bytestring::ByteString;
89
use derive_more::From;
@@ -148,10 +149,10 @@ pub fn serialize(
148149
});
149150

150151
// Conditionally compress the message.
151-
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
152+
let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) {
152153
Compression::None => buffer.uncompressed(),
153-
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress),
154-
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress),
154+
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress),
155+
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress),
155156
};
156157
(in_use, msg_bytes.into())
157158
}

crates/core/src/host/module_host.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::sql::parser::RowLevelExpr;
1818
use crate::subscription::execute_plan;
1919
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
2020
use crate::subscription::tx::DeltaTx;
21+
use crate::subscription::websocket_building::BuildableWebsocketFormat;
2122
use crate::util::asyncify;
2223
use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread};
2324
use crate::vm::check_row_limit;
@@ -28,9 +29,7 @@ use derive_more::From;
2829
use indexmap::IndexSet;
2930
use itertools::Itertools;
3031
use prometheus::{Histogram, IntGauge};
31-
use spacetimedb_client_api_messages::websocket::{
32-
BuildableWebsocketFormat, ByteListLen, Compression, OneOffTable, QueryUpdate,
33-
};
32+
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate};
3433
use spacetimedb_data_structures::error_stream::ErrorStream;
3534
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3635
use spacetimedb_execution::pipelined::PipelinedProject;

crates/core/src/subscription/execution_unit.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ use crate::error::DBError;
66
use crate::estimation;
77
use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue};
88
use crate::messages::websocket::TableUpdate;
9+
use crate::subscription::websocket_building::BuildableWebsocketFormat;
910
use crate::util::slow::SlowQueryLogger;
1011
use crate::vm::{build_query, TxMode};
11-
use spacetimedb_client_api_messages::websocket::{
12-
BuildableWebsocketFormat, Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate,
13-
};
12+
use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate};
1413
use spacetimedb_lib::Identity;
1514
use spacetimedb_primitives::TableId;
1615
use spacetimedb_sats::{u256, ProductValue};

crates/core/src/subscription/mod.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
1-
use std::sync::Arc;
2-
1+
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _};
2+
use crate::{
3+
db::{datastore::locking_tx_datastore::datastore::MetricsRecorder, db_metrics::DB_METRICS},
4+
error::DBError,
5+
execution_context::WorkloadType,
6+
worker_metrics::WORKER_METRICS,
7+
};
38
use anyhow::Result;
49
use module_subscription_manager::Plan;
510
use prometheus::IntCounter;
611
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
712
use spacetimedb_client_api_messages::websocket::{
8-
BuildableWebsocketFormat, ByteListLen, Compression, DatabaseUpdate, QueryUpdate, RowListBuilder as _,
9-
SingleQueryUpdate, TableUpdate,
13+
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate,
1014
};
1115
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1216
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
1317
use spacetimedb_primitives::TableId;
14-
15-
use crate::{
16-
db::{datastore::locking_tx_datastore::datastore::MetricsRecorder, db_metrics::DB_METRICS},
17-
error::DBError,
18-
execution_context::WorkloadType,
19-
worker_metrics::WORKER_METRICS,
20-
};
18+
use std::sync::Arc;
2119

2220
pub mod delta;
2321
pub mod execution_unit;
@@ -27,6 +25,7 @@ pub mod query;
2725
#[allow(clippy::module_inception)] // it's right this isn't ideal :/
2826
pub mod subscription;
2927
pub mod tx;
28+
pub mod websocket_building;
3029

3130
#[derive(Debug)]
3231
pub struct ExecutionCounters {

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ use crate::error::DBError;
1010
use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue};
1111
use crate::messages::websocket::{self as ws, TableUpdate};
1212
use crate::subscription::delta::eval_delta;
13+
use crate::subscription::websocket_building::BuildableWebsocketFormat;
1314
use crate::worker_metrics::WORKER_METRICS;
1415
use core::mem;
1516
use hashbrown::hash_map::OccupiedError;
1617
use hashbrown::{HashMap, HashSet};
1718
use parking_lot::RwLock;
1819
use prometheus::IntGauge;
1920
use spacetimedb_client_api_messages::websocket::{
20-
BsatnFormat, BuildableWebsocketFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate,
21-
SingleQueryUpdate,
21+
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate,
2222
};
2323
use spacetimedb_data_structures::map::{Entry, IntMap};
2424
use spacetimedb_lib::metrics::ExecutionMetrics;

crates/core/src/subscription/query.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ mod tests {
160160
use crate::vm::tests::create_table_with_rows;
161161
use crate::vm::DbProgram;
162162
use itertools::Itertools;
163-
use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression};
163+
use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression};
164164
use spacetimedb_lib::bsatn;
165165
use spacetimedb_lib::db::auth::{StAccess, StTableType};
166166
use spacetimedb_lib::error::ResultTest;
@@ -353,7 +353,7 @@ mod tests {
353353
total_tables: usize,
354354
rows: &[ProductValue],
355355
) -> ResultTest<()> {
356-
let result = s.eval::<BsatnFormat>(db, tx, None, Compression::Brotli).tables;
356+
let result = s.eval::<BsatnFormat>(db, tx, None, Compression::None).tables;
357357
assert_eq!(
358358
result.len(),
359359
total_tables,
@@ -363,7 +363,10 @@ mod tests {
363363
let result = result
364364
.into_iter()
365365
.flat_map(|x| x.updates)
366-
.map(|x| x.maybe_decompress())
366+
.map(|x| match x {
367+
CompressableQueryUpdate::Uncompressed(x) => x,
368+
_ => unreachable!(),
369+
})
367370
.flat_map(|x| {
368371
(&x.deletes)
369372
.into_iter()

0 commit comments

Comments
 (0)