Skip to content

Commit 967e82a

Browse files
coolreader18jsdtjoshua-spacetimegefjon
authored
Pin db threads to cores (#2801)
Signed-off-by: Noa <coolreader18@gmail.com> Co-authored-by: Jeffrey Dallatezza <jeffreydallatezza@gmail.com> Co-authored-by: joshua-spacetime <josh@clockworklabs.io> Co-authored-by: Phoebe Goldman <phoebe@clockworklabs.io>
1 parent 380acf1 commit 967e82a

File tree

15 files changed

+666
-436
lines changed

15 files changed

+666
-436
lines changed

Cargo.lock

Lines changed: 27 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ log = "0.4.17"
200200
memchr = "2"
201201
mimalloc = "0.1.39"
202202
nohash-hasher = "0.2"
203+
nix = "0.30"
203204
once_cell = "1.16"
204205
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
205206
parse-size = "1.1.0"

crates/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ bytes.workspace = true
4848
bytestring.workspace = true
4949
chrono.workspace = true
5050
crossbeam-channel.workspace = true
51+
crossbeam-queue.workspace = true
5152
derive_more.workspace = true
5253
dirs.workspace = true
5354
enum-as-inner.workspace = true
@@ -113,11 +114,15 @@ wasmtime.workspace = true
113114
jwks.workspace = true
114115
async_cache = "0.3.1"
115116
faststr = "0.2.23"
117+
core_affinity = "0.8"
116118

117119
[target.'cfg(not(target_env = "msvc"))'.dependencies]
118120
tikv-jemallocator = {workspace = true}
119121
tikv-jemalloc-ctl = {workspace = true}
120122

123+
[target.'cfg(target_os = "linux")'.dependencies]
124+
nix = { workspace = true, features = ["sched"] }
125+
121126
[features]
122127
# Print a warning when doing an unindexed `iter_by_col_range` on a large table.
123128
unindexed_iter_by_col_range_warn = []

crates/core/src/host/host_controller.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use crate::module_host_context::ModuleCreationContext;
1414
use crate::replica_context::ReplicaContext;
1515
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
1616
use crate::subscription::module_subscription_manager::SubscriptionManager;
17-
use crate::util::{asyncify, spawn_rayon};
17+
use crate::util::asyncify;
18+
use crate::util::jobs::{JobCore, JobCores};
1819
use crate::worker_metrics::WORKER_METRICS;
1920
use anyhow::{anyhow, ensure, Context};
2021
use async_trait::async_trait;
@@ -95,6 +96,8 @@ pub struct HostController {
9596
pub page_pool: PagePool,
9697
/// The runtimes for running our modules.
9798
runtimes: Arc<HostRuntimes>,
99+
/// The CPU cores that are reserved for ModuleHost operations to run on.
100+
db_cores: JobCores,
98101
}
99102

100103
struct HostRuntimes {
@@ -169,6 +172,7 @@ impl HostController {
169172
program_storage: ProgramStorage,
170173
energy_monitor: Arc<impl EnergyMonitor>,
171174
durability: Arc<dyn DurabilityProvider>,
175+
db_cores: JobCores,
172176
) -> Self {
173177
Self {
174178
hosts: <_>::default(),
@@ -179,6 +183,7 @@ impl HostController {
179183
runtimes: HostRuntimes::new(Some(&data_dir)),
180184
data_dir,
181185
page_pool: PagePool::new(default_config.page_pool_max_size),
186+
db_cores,
182187
}
183188
}
184189

@@ -267,7 +272,19 @@ impl HostController {
267272
/// This is not necessary during hotswap publishes,
268273
/// as the automigration planner and executor accomplish the same validity checks.
269274
pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result<Arc<ModuleInfo>> {
270-
Host::try_init_in_memory_to_check(&self.runtimes, self.page_pool.clone(), database, program).await
275+
Host::try_init_in_memory_to_check(
276+
&self.runtimes,
277+
self.page_pool.clone(),
278+
database,
279+
program,
280+
// This takes a db core to check validity, and we will later take
281+
// another core to actually run the module. Due to the round-robin
282+
// algorithm that JobCores uses, that will likely just be the same
283+
// core - there's not a concern that we'll only end up using 1/2
284+
// of the actual cores.
285+
self.db_cores.take(),
286+
)
287+
.await
271288
}
272289

273290
/// Run a computation on the [`RelationalDB`] of a [`ModuleHost`] managed by
@@ -338,6 +355,7 @@ impl HostController {
338355
program,
339356
self.energy_monitor.clone(),
340357
self.unregister_fn(replica_id),
358+
self.db_cores.take(),
341359
)
342360
.await?;
343361

@@ -415,6 +433,7 @@ impl HostController {
415433
program,
416434
self.energy_monitor.clone(),
417435
self.unregister_fn(replica_id),
436+
self.db_cores.take(),
418437
)
419438
.await?;
420439
match update_result {
@@ -556,6 +575,7 @@ async fn make_replica_ctx(
556575

557576
/// Initialize a module host for the given program.
558577
/// The passed replica_ctx may not be configured for this version of the program's database schema yet.
578+
#[allow(clippy::too_many_arguments)]
559579
async fn make_module_host(
560580
runtimes: Arc<HostRuntimes>,
561581
host_type: HostType,
@@ -564,8 +584,14 @@ async fn make_module_host(
564584
program: Program,
565585
energy_monitor: Arc<dyn EnergyMonitor>,
566586
unregister: impl Fn() + Send + Sync + 'static,
587+
core: JobCore,
567588
) -> anyhow::Result<(Program, ModuleHost)> {
568-
spawn_rayon(move || {
589+
// `make_actor` is blocking, as it needs to compile the wasm to native code,
590+
// which may be computationally expensive - sometimes up to 1s for a large module.
591+
// TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking
592+
// threads, but those aren't for computation. Also, wasmtime uses rayon
593+
// to run compilation in parallel, so it'll need to run stuff in rayon anyway.
594+
asyncify(move || {
569595
let module_host = match host_type {
570596
HostType::Wasm => {
571597
let mcc = ModuleCreationContext {
@@ -577,7 +603,7 @@ async fn make_module_host(
577603
let start = Instant::now();
578604
let actor = runtimes.wasmtime.make_actor(mcc)?;
579605
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
580-
ModuleHost::new(actor, unregister)
606+
ModuleHost::new(actor, unregister, core)
581607
}
582608
};
583609
Ok((program, module_host))
@@ -610,6 +636,7 @@ async fn launch_module(
610636
energy_monitor: Arc<dyn EnergyMonitor>,
611637
replica_dir: ReplicaDir,
612638
runtimes: Arc<HostRuntimes>,
639+
core: JobCore,
613640
) -> anyhow::Result<(Program, LaunchedModule)> {
614641
let db_identity = database.database_identity;
615642
let host_type = database.host_type;
@@ -626,6 +653,7 @@ async fn launch_module(
626653
program,
627654
energy_monitor.clone(),
628655
on_panic,
656+
core,
629657
)
630658
.await?;
631659

@@ -776,6 +804,7 @@ impl Host {
776804
energy_monitor.clone(),
777805
replica_dir,
778806
runtimes.clone(),
807+
host_controller.db_cores.take(),
779808
)
780809
.await?;
781810

@@ -834,6 +863,7 @@ impl Host {
834863
page_pool: PagePool,
835864
database: Database,
836865
program: Program,
866+
core: JobCore,
837867
) -> anyhow::Result<Arc<ModuleInfo>> {
838868
// Even in-memory databases acquire a lockfile.
839869
// Grab a tempdir to put that lockfile in.
@@ -865,6 +895,7 @@ impl Host {
865895
Arc::new(NullEnergyMonitor),
866896
phony_replica_dir,
867897
runtimes.clone(),
898+
core,
868899
)
869900
.await?;
870901

@@ -895,6 +926,7 @@ impl Host {
895926
program: Program,
896927
energy_monitor: Arc<dyn EnergyMonitor>,
897928
on_panic: impl Fn() + Send + Sync + 'static,
929+
core: JobCore,
898930
) -> anyhow::Result<UpdateDatabaseResult> {
899931
let replica_ctx = &self.replica_ctx;
900932
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
@@ -907,6 +939,7 @@ impl Host {
907939
program,
908940
energy_monitor,
909941
on_panic,
942+
core,
910943
)
911944
.await?;
912945

@@ -981,10 +1014,15 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an
9811014

9821015
let runtimes = HostRuntimes::new(None);
9831016
let page_pool = PagePool::new(None);
984-
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program).await?;
985-
let module_info = Arc::into_inner(module_info).unwrap();
1017+
let core = JobCore::default();
1018+
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
1019+
// this should always succeed, but sometimes it doesn't
1020+
let module_def = match Arc::try_unwrap(module_info) {
1021+
Ok(info) => info.module_def,
1022+
Err(info) => info.module_def.clone(),
1023+
};
9861024

987-
Ok(module_info.module_def)
1025+
Ok(module_def)
9881026
}
9891027

9901028
// Remove all gauges associated with a database.

0 commit comments

Comments
 (0)