Skip to content

Commit 666ade7

Browse files
authored
Move locking operations onto the module's thread (#2866)
1 parent 6a9b524 commit 666ade7

File tree

4 files changed

+298
-226
lines changed

4 files changed

+298
-226
lines changed

crates/core/src/client/client_connection.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -530,12 +530,13 @@ impl ClientConnection {
530530
timer: Instant,
531531
) -> Result<Option<ExecutionMetrics>, DBError> {
532532
let me = self.clone();
533-
asyncify(move || {
534-
me.module
535-
.subscriptions()
536-
.add_single_subscription(me.sender, subscription, timer, None)
537-
})
538-
.await
533+
self.module
534+
.on_module_thread("subscribe_single", move || {
535+
me.module
536+
.subscriptions()
537+
.add_single_subscription(me.sender, subscription, timer, None)
538+
})
539+
.await?
539540
}
540541

541542
pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<Option<ExecutionMetrics>, DBError> {
@@ -554,12 +555,13 @@ impl ClientConnection {
554555
timer: Instant,
555556
) -> Result<Option<ExecutionMetrics>, DBError> {
556557
let me = self.clone();
557-
asyncify(move || {
558-
me.module
559-
.subscriptions()
560-
.add_multi_subscription(me.sender, request, timer, None)
561-
})
562-
.await
558+
self.module
559+
.on_module_thread("subscribe_multi", move || {
560+
me.module
561+
.subscriptions()
562+
.add_multi_subscription(me.sender, request, timer, None)
563+
})
564+
.await?
563565
}
564566

565567
pub async fn unsubscribe_multi(
@@ -568,12 +570,13 @@ impl ClientConnection {
568570
timer: Instant,
569571
) -> Result<Option<ExecutionMetrics>, DBError> {
570572
let me = self.clone();
571-
asyncify(move || {
572-
me.module
573-
.subscriptions()
574-
.remove_multi_subscription(me.sender, request, timer)
575-
})
576-
.await
573+
self.module
574+
.on_module_thread("unsubscribe_multi", move || {
575+
me.module
576+
.subscriptions()
577+
.remove_multi_subscription(me.sender, request, timer)
578+
})
579+
.await?
577580
}
578581

579582
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {

crates/core/src/host/host_controller.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,9 @@ impl HostController {
334334
warn!("database operation panicked");
335335
on_panic();
336336
});
337-
let result = asyncify(move || f(&module.replica_ctx().relational_db)).await;
337+
338+
let db = module.replica_ctx().relational_db.clone();
339+
let result = module.on_module_thread("using_database", move || f(&db)).await?;
338340
Ok(result)
339341
}
340342

@@ -643,6 +645,7 @@ async fn make_replica_ctx(
643645
let Some(subscriptions) = downgraded.upgrade() else {
644646
break;
645647
};
648+
// This should happen on the module thread, but we haven't created the module yet.
646649
asyncify(move || subscriptions.write().remove_dropped_clients()).await
647650
}
648651
});

0 commit comments

Comments
 (0)