Skip to content

Commit 718712a

Browse files
Record the size of a client's outgoing message queue on drop (#2877)
1 parent b63df78 commit 718712a

File tree

4 files changed

+135
-39
lines changed

4 files changed

+135
-39
lines changed

crates/client-api/src/routes/subscribe.rs

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::VecDeque;
21
use std::mem;
32
use std::pin::{pin, Pin};
43
use std::time::Duration;
@@ -15,7 +14,10 @@ use http::{HeaderValue, StatusCode};
1514
use scopeguard::ScopeGuard;
1615
use serde::Deserialize;
1716
use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer};
18-
use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, Protocol};
17+
use spacetimedb::client::{
18+
ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, MeteredDeque, MeteredReceiver,
19+
Protocol,
20+
};
1921
use spacetimedb::execution_context::WorkloadType;
2022
use spacetimedb::host::module_host::ClientConnectedError;
2123
use spacetimedb::host::NoSuchModule;
@@ -25,7 +27,6 @@ use spacetimedb::Identity;
2527
use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
2628
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
2729
use std::time::Instant;
28-
use tokio::sync::mpsc;
2930
use tokio_tungstenite::tungstenite::Utf8Bytes;
3031

3132
use crate::auth::SpacetimeAuth;
@@ -182,7 +183,7 @@ where
182183

183184
const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);
184185

185-
async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: mpsc::Receiver<SerializableMessage>) {
186+
async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: MeteredReceiver<SerializableMessage>) {
186187
// ensure that even if this task gets cancelled, we always cleanup the connection
187188
let mut client = scopeguard::guard(client, |client| {
188189
tokio::spawn(client.disconnect());
@@ -204,11 +205,13 @@ async fn make_progress<Fut: Future>(fut: &mut Pin<&mut MaybeDone<Fut>>) {
204205
async fn ws_client_actor_inner(
205206
client: &mut ClientConnection,
206207
mut ws: WebSocketStream,
207-
mut sendrx: mpsc::Receiver<SerializableMessage>,
208+
mut sendrx: MeteredReceiver<SerializableMessage>,
208209
) {
209210
let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT);
210211
let mut got_pong = true;
211212

213+
let addr = client.module.info().database_identity;
214+
212215
// Build a queue of incoming messages to handle, to be processed one at a time,
213216
// in the order they're received.
214217
//
@@ -222,32 +225,14 @@ async fn ws_client_actor_inner(
222225
// `select!` for examples of how to do this.
223226
//
224227
// TODO: do we want this to have a fixed capacity? or should it be unbounded
225-
let mut message_queue = VecDeque::<(DataMessage, Instant)>::new();
228+
let mut message_queue = MeteredDeque::<(DataMessage, Instant)>::new(
229+
WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr),
230+
);
226231
let mut current_message = pin!(MaybeDone::Gone);
227232

228233
let mut closed = false;
229234
let mut rx_buf = Vec::new();
230235

231-
let addr = client.module.info().database_identity;
232-
233-
// Grab handles on the total incoming and outgoing queue length metrics,
234-
// which we'll increment and decrement as we push into and pull out of those queues.
235-
// Note that `total_outgoing_queue_length` is incremented separately,
236-
// by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
237-
// we're only responsible for decrementing that one.
238-
// Also note that much care must be taken to clean up these metrics when the connection closes!
239-
// Any path which exits this function must decrement each of these metrics
240-
// by the number of messages still waiting in this client's queue,
241-
// or else they will grow without bound as clients disconnect, and be useless.
242-
let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr);
243-
let outgoing_queue_length_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&addr);
244-
245-
let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>,
246-
sendrx: &mpsc::Receiver<SerializableMessage>| {
247-
incoming_queue_length_metric.sub(message_queue.len() as _);
248-
outgoing_queue_length_metric.sub(sendrx.len() as _);
249-
};
250-
251236
let mut msg_buffer = SerializeBuffer::new(client.config);
252237
loop {
253238
rx_buf.clear();
@@ -257,7 +242,6 @@ async fn ws_client_actor_inner(
257242
}
258243
if let MaybeDone::Gone = *current_message {
259244
if let Some((message, timer)) = message_queue.pop_front() {
260-
incoming_queue_length_metric.dec();
261245
let client = client.clone();
262246
let fut = async move { client.handle_message(message, timer).await };
263247
current_message.set(MaybeDone::Future(fut));
@@ -286,15 +270,13 @@ async fn ws_client_actor_inner(
286270
}
287271
// the client sent us a close frame
288272
None => {
289-
clean_up_metrics(&message_queue, &sendrx);
290273
break
291274
},
292275
},
293276

294277
// If we have an outgoing message to send, send it off.
295278
// No incoming `message` to handle, so `continue`.
296279
Some(n) = sendrx.recv_many(&mut rx_buf, 32).map(|n| (n != 0).then_some(n)) => {
297-
outgoing_queue_length_metric.sub(n as _);
298280
if closed {
299281
// TODO: this isn't great. when we receive a close request from the peer,
300282
// tungstenite doesn't let us send any new messages on the socket,
@@ -379,7 +361,6 @@ async fn ws_client_actor_inner(
379361
} else {
380362
// the client never responded to our ping; drop them without trying to send them a Close
381363
log::warn!("client {} timed out", client.id);
382-
clean_up_metrics(&message_queue, &sendrx);
383364
break;
384365
}
385366
}
@@ -394,7 +375,6 @@ async fn ws_client_actor_inner(
394375
match message {
395376
Item::Message(ClientMessage::Message(message)) => {
396377
let timer = Instant::now();
397-
incoming_queue_length_metric.inc();
398378
message_queue.push_back((message, timer))
399379
}
400380
Item::HandleResult(res) => {

crates/core/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ mod message_handlers;
77
pub mod messages;
88

99
pub use client_connection::{
10-
ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, Protocol,
10+
ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, MeteredDeque,
11+
MeteredReceiver, Protocol,
1112
};
1213
pub use client_connection_index::ClientActorIndex;
1314
pub use message_handlers::MessageHandleError;

crates/core/src/client/client_connection.rs

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::VecDeque;
12
use std::ops::Deref;
23
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
34
use std::sync::Arc;
@@ -132,14 +133,15 @@ pub enum ClientSendError {
132133
}
133134

134135
impl ClientConnectionSender {
135-
pub fn dummy_with_channel(id: ClientActorId, config: ClientConfig) -> (Self, mpsc::Receiver<SerializableMessage>) {
136+
pub fn dummy_with_channel(id: ClientActorId, config: ClientConfig) -> (Self, MeteredReceiver<SerializableMessage>) {
136137
let (sendtx, rx) = mpsc::channel(1);
137138
// just make something up, it doesn't need to be attached to a real task
138139
let abort_handle = match tokio::runtime::Handle::try_current() {
139140
Ok(h) => h.spawn(async {}).abort_handle(),
140141
Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(),
141142
};
142143

144+
let rx = MeteredReceiver::new(rx);
143145
let cancelled = AtomicBool::new(false);
144146
let sender = Self {
145147
id,
@@ -257,6 +259,116 @@ impl DataMessage {
257259
}
258260
}
259261

262+
/// Wraps a [VecDeque] with a gauge for tracking its size.
263+
/// We subtract its size from the gauge on drop to avoid leaking the metric.
264+
pub struct MeteredDeque<T> {
265+
inner: VecDeque<T>,
266+
gauge: IntGauge,
267+
}
268+
269+
impl<T> MeteredDeque<T> {
270+
pub fn new(gauge: IntGauge) -> Self {
271+
Self {
272+
inner: VecDeque::new(),
273+
gauge,
274+
}
275+
}
276+
277+
pub fn pop_front(&mut self) -> Option<T> {
278+
self.inner.pop_front().inspect(|_| {
279+
self.gauge.dec();
280+
})
281+
}
282+
283+
pub fn pop_back(&mut self) -> Option<T> {
284+
self.inner.pop_back().inspect(|_| {
285+
self.gauge.dec();
286+
})
287+
}
288+
289+
pub fn push_front(&mut self, value: T) {
290+
self.gauge.inc();
291+
self.inner.push_front(value);
292+
}
293+
294+
pub fn push_back(&mut self, value: T) {
295+
self.gauge.inc();
296+
self.inner.push_back(value);
297+
}
298+
299+
pub fn len(&self) -> usize {
300+
self.inner.len()
301+
}
302+
303+
pub fn is_empty(&self) -> bool {
304+
self.inner.is_empty()
305+
}
306+
}
307+
308+
impl<T> Drop for MeteredDeque<T> {
309+
fn drop(&mut self) {
310+
// Record the number of elements still in the deque on drop
311+
self.gauge.sub(self.inner.len() as _);
312+
}
313+
}
314+
315+
/// Wraps the receiving end of a channel with a gauge for tracking the size of the channel.
316+
/// We subtract the size of the channel from the gauge on drop to avoid leaking the metric.
317+
pub struct MeteredReceiver<T> {
318+
inner: mpsc::Receiver<T>,
319+
gauge: Option<IntGauge>,
320+
}
321+
322+
impl<T> MeteredReceiver<T> {
323+
pub fn new(inner: mpsc::Receiver<T>) -> Self {
324+
Self { inner, gauge: None }
325+
}
326+
327+
pub fn with_gauge(inner: mpsc::Receiver<T>, gauge: IntGauge) -> Self {
328+
Self {
329+
inner,
330+
gauge: Some(gauge),
331+
}
332+
}
333+
334+
pub async fn recv(&mut self) -> Option<T> {
335+
self.inner.recv().await.inspect(|_| {
336+
if let Some(gauge) = &self.gauge {
337+
gauge.dec();
338+
}
339+
})
340+
}
341+
342+
pub async fn recv_many(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
343+
let n = self.inner.recv_many(buf, max).await;
344+
if let Some(gauge) = &self.gauge {
345+
gauge.sub(n as _);
346+
}
347+
n
348+
}
349+
350+
pub fn len(&self) -> usize {
351+
self.inner.len()
352+
}
353+
354+
pub fn is_empty(&self) -> bool {
355+
self.inner.is_empty()
356+
}
357+
358+
pub fn close(&mut self) {
359+
self.inner.close();
360+
}
361+
}
362+
363+
impl<T> Drop for MeteredReceiver<T> {
364+
fn drop(&mut self) {
365+
// Record the number of elements still in the channel on drop
366+
if let Some(gauge) = &self.gauge {
367+
gauge.sub(self.inner.len() as _);
368+
}
369+
}
370+
}
371+
260372
// if a client racks up this many messages in the queue without ACK'ing
261373
// anything, we boot 'em.
262374
const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB;
@@ -269,7 +381,7 @@ impl ClientConnection {
269381
config: ClientConfig,
270382
replica_id: u64,
271383
mut module_rx: watch::Receiver<ModuleHost>,
272-
actor: impl FnOnce(ClientConnection, mpsc::Receiver<SerializableMessage>) -> Fut,
384+
actor: impl FnOnce(ClientConnection, MeteredReceiver<SerializableMessage>) -> Fut,
273385
) -> Result<ClientConnection, ClientConnectedError>
274386
where
275387
Fut: Future<Output = ()> + Send + 'static,
@@ -299,6 +411,7 @@ impl ClientConnection {
299411
.abort_handle();
300412

301413
let metrics = ClientConnectionMetrics::new(database_identity, config.protocol);
414+
let sendrx = MeteredReceiver::with_gauge(sendrx, metrics.sendtx_queue_size.clone());
302415

303416
let sender = Arc::new(ClientConnectionSender {
304417
id,

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ mod tests {
932932
SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
933933
SubscriptionUpdateMessage, TransactionUpdateMessage,
934934
};
935-
use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, Protocol};
935+
use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, MeteredReceiver, Protocol};
936936
use crate::db::datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
937937
use crate::db::relational_db::tests_utils::{
938938
begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB,
@@ -964,7 +964,7 @@ mod tests {
964964
use spacetimedb_sats::product;
965965
use std::time::Instant;
966966
use std::{sync::Arc, time::Duration};
967-
use tokio::sync::mpsc::{self, Receiver};
967+
use tokio::sync::mpsc::{self};
968968

969969
fn add_subscriber(db: Arc<RelationalDB>, sql: &str, assert: Option<AssertTxFn>) -> Result<(), DBError> {
970970
// Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel.
@@ -1072,7 +1072,7 @@ mod tests {
10721072
fn client_connection_with_compression(
10731073
client_id: ClientActorId,
10741074
compression: Compression,
1075-
) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
1075+
) -> (Arc<ClientConnectionSender>, MeteredReceiver<SerializableMessage>) {
10761076
let (sender, rx) = ClientConnectionSender::dummy_with_channel(
10771077
client_id,
10781078
ClientConfig {
@@ -1085,7 +1085,9 @@ mod tests {
10851085
}
10861086

10871087
/// Instantiate a client connection
1088-
fn client_connection(client_id: ClientActorId) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
1088+
fn client_connection(
1089+
client_id: ClientActorId,
1090+
) -> (Arc<ClientConnectionSender>, MeteredReceiver<SerializableMessage>) {
10891091
client_connection_with_compression(client_id, Compression::None)
10901092
}
10911093

@@ -1159,7 +1161,7 @@ mod tests {
11591161

11601162
/// Pull a message from receiver and assert that it is a `TxUpdate` with the expected rows
11611163
async fn assert_tx_update_for_table(
1162-
rx: &mut Receiver<SerializableMessage>,
1164+
rx: &mut MeteredReceiver<SerializableMessage>,
11631165
table_id: TableId,
11641166
schema: &ProductType,
11651167
inserts: impl IntoIterator<Item = ProductValue>,

0 commit comments

Comments
 (0)