Skip to content

feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first #878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion src/job_token.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use jobserver::{Acquired, Client, HelperThread};
use std::sync::mpsc::{self, Receiver, Sender};
use std::{
env,
sync::{
mpsc::{self, Receiver, Sender},
Once,
},
};

pub(crate) struct JobToken {
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
Expand Down Expand Up @@ -68,3 +74,48 @@ impl JobTokenServer {
}
}
}

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
pub(super) fn jobserver() -> jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.clone().unwrap()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}
49 changes: 2 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ impl Build {

#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
use std::sync::{mpsc, Once};
use std::sync::mpsc;

if objs.len() <= 1 {
for obj in objs {
Expand All @@ -1306,7 +1306,7 @@ impl Build {
}

// Limit our parallelism globally with a jobserver.
let server = jobserver();
let server = job_token::jobserver();
// Reacquire our process's token on drop

// When compiling objects in parallel we do a few dirty tricks to speed
Expand Down Expand Up @@ -1440,51 +1440,6 @@ impl Build {

return wait_thread.join().expect("wait_thread panics");

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.clone().unwrap()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}

struct KillOnDrop(Child);

impl Drop for KillOnDrop {
Expand Down