Skip to content

Commit a7293a3

Browse files
authored
(perf) send_one_computed_queries: reuse aggregation HashMaps (#2909)
1 parent 0c36351 commit a7293a3

File tree

1 file changed

+61
-44
lines changed

1 file changed

+61
-44
lines changed

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue
1111
use crate::messages::websocket::{self as ws, TableUpdate};
1212
use crate::subscription::delta::eval_delta;
1313
use crate::worker_metrics::WORKER_METRICS;
14+
use core::mem;
1415
use hashbrown::hash_map::OccupiedError;
1516
use hashbrown::{HashMap, HashSet};
1617
use parking_lot::RwLock;
@@ -1318,6 +1319,14 @@ struct SendWorker {
13181319
///
13191320
/// If `Some`, this type's `drop` method will do `remove_label_values` to clean up the metric on exit.
13201321
database_identity_to_clean_up_metric: Option<Identity>,
1322+
1323+
/// A map (re)used by [`SendWorker::send_one_computed_queries`]
1324+
/// to avoid creating new allocations.
1325+
table_updates_client_id_table_id: HashMap<(ClientId, TableId), SwitchedTableUpdate>,
1326+
1327+
/// A map (re)used by [`SendWorker::send_one_computed_queries`]
1328+
/// to avoid creating new allocations.
1329+
table_updates_client_id: HashMap<ClientId, SwitchedDbUpdate>,
13211330
}
13221331

13231332
impl Drop for SendWorker {
@@ -1369,6 +1378,8 @@ impl SendWorker {
13691378
queue_length_metric,
13701379
clients: Default::default(),
13711380
database_identity_to_clean_up_metric,
1381+
table_updates_client_id_table_id: <_>::default(),
1382+
table_updates_client_id: <_>::default(),
13721383
}
13731384
}
13741385

@@ -1415,7 +1426,7 @@ impl SendWorker {
14151426
}
14161427

14171428
fn send_one_computed_queries(
1418-
&self,
1429+
&mut self,
14191430
ComputedQueries {
14201431
updates,
14211432
errs,
@@ -1429,50 +1440,52 @@ impl SendWorker {
14291440

14301441
let span = tracing::info_span!("eval_incr_group_messages_by_client");
14311442

1432-
let mut eval = updates
1443+
// Reuse the aggregation maps from the worker.
1444+
let client_table_id_updates = mem::take(&mut self.table_updates_client_id_table_id);
1445+
let client_id_updates = mem::take(&mut self.table_updates_client_id);
1446+
1447+
// For each subscriber, aggregate all the updates for the same table.
1448+
// That is, we build a map `(subscriber_id, table_id) -> updates`.
1449+
// A particular subscriber uses only one format,
1450+
// so their `TableUpdate` will contain either JSON (`Protocol::Text`)
1451+
// or BSATN (`Protocol::Binary`).
1452+
let mut client_table_id_updates = updates
14331453
.into_iter()
14341454
// Filter out clients whose subscriptions failed
14351455
.filter(|upd| !clients_with_errors.contains(&upd.id))
1436-
// For each subscriber, aggregate all the updates for the same table.
1437-
// That is, we build a map `(subscriber_id, table_id) -> updates`.
1438-
// A particular subscriber uses only one format,
1439-
// so their `TableUpdate` will contain either JSON (`Protocol::Text`)
1440-
// or BSATN (`Protocol::Binary`).
1441-
.fold(
1442-
HashMap::<(ClientId, TableId), SwitchedTableUpdate>::new(),
1443-
|mut tables, upd| {
1444-
match tables.entry((upd.id, upd.table_id)) {
1445-
Entry::Occupied(mut entry) => match entry.get_mut().zip_mut(upd.update) {
1446-
Bsatn((tbl_upd, update)) => tbl_upd.push(update),
1447-
Json((tbl_upd, update)) => tbl_upd.push(update),
1448-
},
1449-
Entry::Vacant(entry) => drop(entry.insert(match upd.update {
1450-
Bsatn(update) => Bsatn(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
1451-
Json(update) => Json(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
1452-
})),
1453-
}
1454-
tables
1455-
},
1456-
)
1457-
.into_iter()
1458-
// Each client receives a single list of updates per transaction.
1459-
// So before sending the updates to each client,
1460-
// we must stitch together the `TableUpdate*`s into an aggregated list.
1461-
.fold(
1462-
HashMap::<ClientId, SwitchedDbUpdate>::new(),
1463-
|mut updates, ((id, _), update)| {
1464-
let entry = updates.entry(id);
1465-
let entry = entry.or_insert_with(|| match &update {
1466-
Bsatn(_) => Bsatn(<_>::default()),
1467-
Json(_) => Json(<_>::default()),
1468-
});
1469-
match entry.zip_mut(update) {
1470-
Bsatn((list, elem)) => list.tables.push(elem),
1471-
Json((list, elem)) => list.tables.push(elem),
1472-
}
1473-
updates
1474-
},
1475-
);
1456+
// Do the aggregation.
1457+
.fold(client_table_id_updates, |mut tables, upd| {
1458+
match tables.entry((upd.id, upd.table_id)) {
1459+
Entry::Occupied(mut entry) => match entry.get_mut().zip_mut(upd.update) {
1460+
Bsatn((tbl_upd, update)) => tbl_upd.push(update),
1461+
Json((tbl_upd, update)) => tbl_upd.push(update),
1462+
},
1463+
Entry::Vacant(entry) => drop(entry.insert(match upd.update {
1464+
Bsatn(update) => Bsatn(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
1465+
Json(update) => Json(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
1466+
})),
1467+
}
1468+
tables
1469+
});
1470+
1471+
// Each client receives a single list of updates per transaction.
1472+
// So before sending the updates to each client,
1473+
// we must stitch together the `TableUpdate*`s into an aggregated list.
1474+
let mut client_id_updates = client_table_id_updates
1475+
.drain()
1476+
// Do the aggregation.
1477+
.fold(client_id_updates, |mut updates, ((id, _), update)| {
1478+
let entry = updates.entry(id);
1479+
let entry = entry.or_insert_with(|| match &update {
1480+
Bsatn(_) => Bsatn(<_>::default()),
1481+
Json(_) => Json(<_>::default()),
1482+
});
1483+
match entry.zip_mut(update) {
1484+
Bsatn((list, elem)) => list.tables.push(elem),
1485+
Json((list, elem)) => list.tables.push(elem),
1486+
}
1487+
updates
1488+
});
14761489

14771490
drop(clients_with_errors);
14781491
drop(span);
@@ -1487,7 +1500,7 @@ impl SendWorker {
14871500
// That is, in the case of the caller, we don't respect the light setting.
14881501
if let Some(caller) = caller {
14891502
let caller_id = (caller.id.identity, caller.id.connection_id);
1490-
let database_update = eval
1503+
let database_update = client_id_updates
14911504
.remove(&caller_id)
14921505
.map(|update| SubscriptionUpdateMessage::from_event_and_update(&event, update))
14931506
.unwrap_or_else(|| {
@@ -1501,7 +1514,7 @@ impl SendWorker {
15011514
}
15021515

15031516
// Send all the other updates.
1504-
for (id, update) in eval {
1517+
for (id, update) in client_id_updates.drain() {
15051518
let database_update = SubscriptionUpdateMessage::from_event_and_update(&event, update);
15061519
let client = self.clients[&id].outbound_ref.clone();
15071520
// Conditionally send out a full update or a light one otherwise.
@@ -1510,6 +1523,10 @@ impl SendWorker {
15101523
send_to_client(&client, message);
15111524
}
15121525

1526+
// Put back the aggregation maps into the worker.
1527+
self.table_updates_client_id_table_id = client_table_id_updates;
1528+
self.table_updates_client_id = client_id_updates;
1529+
15131530
// Send error messages and mark clients for removal
15141531
for (id, message) in errs {
15151532
if let Some(client) = self.clients.get(&id) {

0 commit comments

Comments
 (0)