Skip to content

Commit d0a8a49

Browse files
committed
Move implementation to a separate module and document a bunch
1 parent 0c87751 commit d0a8a49

File tree

2 files changed

+81
-89
lines changed

2 files changed

+81
-89
lines changed

src/job_token.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use jobserver::{Acquired, Client, HelperThread};
2+
use std::sync::mpsc::{self, Receiver, Sender};
3+
4+
pub(crate) struct JobToken {
5+
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
6+
/// Both are valid values to put into queue.
7+
token: Option<Acquired>,
8+
pool: Sender<Option<Acquired>>,
9+
should_return_to_queue: bool,
10+
}
11+
12+
impl Drop for JobToken {
13+
fn drop(&mut self) {
14+
if self.should_return_to_queue {
15+
let _ = self.pool.send(self.token.take());
16+
}
17+
}
18+
}
19+
20+
impl JobToken {
21+
/// Ensure that this token is not put back into queue once it's dropped.
22+
/// This also leads to releasing it sooner for other processes to use, which is a good thing to do once you know that
23+
/// you're never going to request a token in this process again.
24+
pub(crate) fn forget(&mut self) {
25+
self.should_return_to_queue = false;
26+
}
27+
}
28+
29+
/// A thin wrapper around jobserver's Client.
30+
/// It would be perfectly fine to just use that, but we also want to reuse our own implicit token assigned for this build script.
31+
/// This struct manages that and gives out tokens without exposing whether they're implicit tokens or tokens from jobserver.
32+
/// Furthermore, instead of giving up job tokens, it keeps them around for reuse if we know we're going to request another token after freeing the current one.
33+
pub(crate) struct JobTokenServer {
34+
helper: HelperThread,
35+
tx: Sender<Option<Acquired>>,
36+
rx: Receiver<Option<Acquired>>,
37+
}
38+
39+
impl JobTokenServer {
40+
pub(crate) fn new(client: Client) -> Result<Self, crate::Error> {
41+
let (tx, rx) = mpsc::channel();
42+
// Initialize the
43+
tx.send(None).unwrap();
44+
let pool = tx.clone();
45+
let helper = client.into_helper_thread(move |acq| {
46+
let _ = pool.send(Some(acq.unwrap()));
47+
})?;
48+
Ok(Self { helper, tx, rx })
49+
}
50+
51+
pub(crate) fn acquire(&mut self) -> JobToken {
52+
let token = if let Ok(token) = self.rx.try_recv() {
53+
// Opportunistically check if we already have a token for our own reuse.
54+
token
55+
} else {
56+
// Cold path, request a token and block
57+
self.helper.request_token();
58+
self.rx.recv().unwrap()
59+
};
60+
JobToken {
61+
token,
62+
pool: self.tx.clone(),
63+
should_return_to_queue: true,
64+
}
65+
}
66+
}

src/lib.rs

Lines changed: 15 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio};
6666
use std::sync::{Arc, Mutex};
6767
use std::thread::{self, JoinHandle};
6868

69+
#[cfg(feature = "parallel")]
70+
mod job_token;
6971
mod os_pipe;
70-
7172
// These modules are all glue to support reading the MSVC version from
7273
// the registry and from COM interfaces
7374
#[cfg(windows)]
@@ -1293,12 +1294,7 @@ impl Build {
12931294

12941295
#[cfg(feature = "parallel")]
12951296
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
1296-
use std::sync::{
1297-
mpsc::{self, Receiver, Sender},
1298-
Once,
1299-
};
1300-
1301-
use jobserver::{Acquired, Client};
1297+
use std::sync::mpsc;
13021298

13031299
if objs.len() <= 1 {
13041300
for obj in objs {
@@ -1309,59 +1305,10 @@ impl Build {
13091305
return Ok(());
13101306
}
13111307

1312-
// Limit our parallelism globally with a jobserver. Start off by
1313-
// releasing our own token for this process so we can have a bit of an
1314-
// easier to write loop below. If this fails, though, then we're likely
1315-
// on Windows with the main implicit token, so we just have a bit extra
1316-
// parallelism for a bit and don't reacquire later.
1317-
let server = jobserver().clone();
1308+
// Limit our parallelism globally with a jobserver.
1309+
let server = unsafe { default_jobserver() };
13181310
// Reacquire our process's token on drop
1319-
//let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));
1320-
struct JobToken {
1321-
owned: Option<Acquired>,
1322-
pool: Sender<Option<Acquired>>,
1323-
should_return_to_queue: bool,
1324-
}
1325-
impl Drop for JobToken {
1326-
fn drop(&mut self) {
1327-
if self.should_return_to_queue {
1328-
let _ = self.pool.send(self.owned.take());
1329-
}
1330-
}
1331-
}
1332-
struct JobTokenServer {
1333-
helper: jobserver::HelperThread,
1334-
tx: Sender<Option<Acquired>>,
1335-
rx: Receiver<Option<Acquired>>,
1336-
}
1337-
impl JobTokenServer {
1338-
fn new(client: Client) -> Result<Self, Error> {
1339-
let (tx, rx) = std::sync::mpsc::channel();
1340-
tx.send(None).unwrap();
1341-
let pool = tx.clone();
1342-
let helper = client.into_helper_thread(move |acq| {
1343-
let _ = pool.send(Some(acq.unwrap()));
1344-
})?;
1345-
Ok(Self { helper, tx, rx })
1346-
}
1347-
fn acquire(&mut self) -> JobToken {
1348-
if let Ok(token) = self.rx.try_recv() {
1349-
JobToken {
1350-
owned: token,
1351-
pool: self.tx.clone(),
1352-
should_return_to_queue: true,
1353-
}
1354-
} else {
1355-
self.helper.request_token();
1356-
let token = self.rx.recv().unwrap();
1357-
JobToken {
1358-
owned: token,
1359-
pool: self.tx.clone(),
1360-
should_return_to_queue: true,
1361-
}
1362-
}
1363-
}
1364-
}
1311+
13651312
// When compiling objects in parallel we do a few dirty tricks to speed
13661313
// things up:
13671314
//
@@ -1381,7 +1328,7 @@ impl Build {
13811328
// acquire the appropriate tokens, Once all objects have been compiled
13821329
// we wait on all the processes and propagate the results of compilation.
13831330

1384-
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, JobToken)>();
1331+
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>();
13851332

13861333
// Since jobserver::Client::acquire can block, waiting
13871334
// must be done in parallel so that acquire won't block forever.
@@ -1394,6 +1341,10 @@ impl Build {
13941341

13951342
loop {
13961343
let mut has_made_progress = false;
1344+
// If the other end of the pipe is already disconnected, then we're not gonna get any new jobs,
1345+
// so it doesn't make sense to reuse the tokens; in fact, releasing them as soon as possible (once we know that the other end is disconnected) is beneficial.
1346+
// Imagine that the last file built takes an hour to finish; in this scenario, by not releasing the tokens before other builds are done we'd effectively block other processes from
1347+
// starting sooner - even though we only need one token, not however many we've acquired.
13971348
let mut is_disconnected = false;
13981349
// Reading new pending tasks
13991350
loop {
@@ -1422,10 +1373,10 @@ impl Build {
14221373
pendings.retain_mut(|(cmd, program, child, token)| {
14231374
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
14241375
Ok(Some(())) => {
1376+
// Task done, remove the entry
14251377
if is_disconnected {
1426-
token.should_return_to_queue = false;
1378+
token.forget();
14271379
}
1428-
// Task done, remove the entry
14291380
has_made_progress = true;
14301381
false
14311382
}
@@ -1434,7 +1385,7 @@ impl Build {
14341385
// Task fail, remove the entry.
14351386
has_made_progress = true;
14361387
if is_disconnected {
1437-
token.should_return_to_queue = false;
1388+
token.forget();
14381389
}
14391390
// Since we can only return one error, log the error to make
14401391
// sure users always see all the compilation failures.
@@ -1473,7 +1424,7 @@ impl Build {
14731424
};
14741425
}
14751426
})?;
1476-
let mut tokens = JobTokenServer::new(server)?;
1427+
let mut tokens = crate::job_token::JobTokenServer::new(server)?;
14771428
for obj in objs {
14781429
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
14791430
let token = tokens.acquire();
@@ -1487,24 +1438,6 @@ impl Build {
14871438

14881439
return wait_thread.join().expect("wait_thread panics");
14891440

1490-
/// Returns a suitable `jobserver::Client` used to coordinate
1491-
/// parallelism between build scripts.
1492-
fn jobserver() -> &'static jobserver::Client {
1493-
static INIT: Once = Once::new();
1494-
static mut JOBSERVER: Option<jobserver::Client> = None;
1495-
1496-
fn _assert_sync<T: Sync>() {}
1497-
_assert_sync::<jobserver::Client>();
1498-
1499-
unsafe {
1500-
INIT.call_once(|| {
1501-
let server = default_jobserver();
1502-
JOBSERVER = Some(server);
1503-
});
1504-
JOBSERVER.as_ref().unwrap()
1505-
}
1506-
}
1507-
15081441
unsafe fn default_jobserver() -> jobserver::Client {
15091442
// Try to use the environmental jobserver which Cargo typically
15101443
// initializes for us...
@@ -1541,13 +1474,6 @@ impl Build {
15411474
child.kill().ok();
15421475
}
15431476
}
1544-
1545-
struct JobserverToken(&'static jobserver::Client);
1546-
impl Drop for JobserverToken {
1547-
fn drop(&mut self) {
1548-
let _ = self.0.acquire_raw();
1549-
}
1550-
}
15511477
}
15521478

15531479
#[cfg(not(feature = "parallel"))]

0 commit comments

Comments
 (0)