Skip to content

Commit c99dc82

Browse files
gefjonjsdt
andauthored
Remove the lock around the ClientsMap in the SubscriptionManager (#2821)
Co-authored-by: Jeffrey Dallatezza <jeffreydallatezza@gmail.com>
1 parent 967e82a commit c99dc82

File tree

5 files changed

+443
-216
lines changed

5 files changed

+443
-216
lines changed

crates/client-api-messages/src/websocket.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ pub struct TableUpdate<F: WebsocketFormat> {
623623
}
624624

625625
/// Computed update for a single query, annotated with the number of matching rows.
626+
#[derive(Debug)]
626627
pub struct SingleQueryUpdate<F: WebsocketFormat> {
627628
pub update: F::QueryUpdate,
628629
pub num_rows: u64,

crates/core/src/client/client_connection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ impl ClientConnectionSender {
156156
Self::dummy_with_channel(id, config).0
157157
}
158158

159+
/// Send a message to the client. For data-related messages, you should probably use
160+
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
159161
pub fn send_message(&self, message: impl Into<SerializableMessage>) -> Result<(), ClientSendError> {
160162
self.send(message.into())
161163
}

crates/core/src/host/host_controller.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::messages::control_db::{Database, HostType};
1313
use crate::module_host_context::ModuleCreationContext;
1414
use crate::replica_context::ReplicaContext;
1515
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
16-
use crate::subscription::module_subscription_manager::SubscriptionManager;
16+
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
1717
use crate::util::asyncify;
1818
use crate::util::jobs::{JobCore, JobCores};
1919
use crate::worker_metrics::WORKER_METRICS;
@@ -545,11 +545,17 @@ async fn make_replica_ctx(
545545
relational_db: Arc<RelationalDB>,
546546
) -> anyhow::Result<ReplicaContext> {
547547
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
548-
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::for_database(
549-
database.database_identity,
548+
let send_worker_queue = spawn_send_worker(Some(database.database_identity));
549+
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
550+
send_worker_queue.clone(),
550551
)));
551552
let downgraded = Arc::downgrade(&subscriptions);
552-
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);
553+
let subscriptions = ModuleSubscriptions::new(
554+
relational_db.clone(),
555+
subscriptions,
556+
send_worker_queue,
557+
database.owner_identity,
558+
);
553559

554560
// If an error occurs when evaluating a subscription,
555561
// we mark each client that was affected,

0 commit comments

Comments
 (0)