Skip to content

Commit 9aebe31

Browse files
Filter out dropped clients in the send worker (#2899)
1 parent a7293a3 commit 9aebe31

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

crates/core/src/client/client_connection.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::VecDeque;
22
use std::ops::Deref;
3+
use std::sync::atomic::Ordering;
34
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
45
use std::sync::Arc;
56
use std::time::Instant;
@@ -158,6 +159,10 @@ impl ClientConnectionSender {
158159
Self::dummy_with_channel(id, config).0
159160
}
160161

162+
pub fn is_cancelled(&self) -> bool {
163+
self.cancelled.load(Ordering::Relaxed)
164+
}
165+
161166
/// Send a message to the client. For data-related messages, you should probably use
162167
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
163168
pub fn send_message(&self, message: impl Into<SerializableMessage>) -> Result<(), ClientSendError> {
@@ -175,7 +180,7 @@ impl ClientConnectionSender {
175180
// the channel, so forcibly kick the client
176181
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
177182
self.abort_handle.abort();
178-
self.cancelled.store(true, Relaxed);
183+
self.cancelled.store(true, Ordering::Relaxed);
179184
return Err(ClientSendError::Cancelled);
180185
}
181186
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,12 +1209,7 @@ impl SubscriptionManager {
12091209
SingleQueryUpdate { update, num_rows }
12101210
}
12111211

1212-
// filter out clients that've dropped
1213-
let clients_for_query = qstate.all_clients().filter(|id| {
1214-
self.clients
1215-
.get(*id)
1216-
.is_some_and(|info| !info.dropped.load(Ordering::Acquire))
1217-
});
1212+
let clients_for_query = qstate.all_clients();
12181213

12191214
match eval_delta(tx, &mut acc.metrics, plan) {
12201215
Err(err) => {
@@ -1293,6 +1288,16 @@ struct SendWorkerClient {
12931288
outbound_ref: Client,
12941289
}
12951290

1291+
impl SendWorkerClient {
1292+
fn is_dropped(&self) -> bool {
1293+
self.dropped.load(Ordering::Relaxed)
1294+
}
1295+
1296+
fn is_cancelled(&self) -> bool {
1297+
self.outbound_ref.is_cancelled()
1298+
}
1299+
}
1300+
12961301
/// Asynchronous background worker which aggregates each of the clients' updates from a [`ComputedQueries`]
12971302
/// into `DbUpdate`s and then sends them to the clients' WebSocket workers.
12981303
///
@@ -1339,6 +1344,14 @@ impl Drop for SendWorker {
13391344
}
13401345
}
13411346

1347+
impl SendWorker {
1348+
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
1349+
self.clients
1350+
.get(client_id)
1351+
.is_some_and(|client| client.is_cancelled() || client.is_dropped())
1352+
}
1353+
}
1354+
13421355
#[derive(Debug, Clone)]
13431356
pub struct BroadcastQueue(SenderWithGauge<SendWorkerMessage>);
13441357

@@ -1451,6 +1464,8 @@ impl SendWorker {
14511464
// or BSATN (`Protocol::Binary`).
14521465
let mut client_table_id_updates = updates
14531466
.into_iter()
1467+
// Filter out dropped or cancelled clients
1468+
.filter(|upd| !self.is_client_dropped_or_cancelled(&upd.id))
14541469
// Filter out clients whose subscriptions failed
14551470
.filter(|upd| !clients_with_errors.contains(&upd.id))
14561471
// Do the aggregation.

0 commit comments

Comments
 (0)