Skip to content

Commit 0c87751

Browse files
committed
feat: Do not preemptively drop implicit job token.
1 parent 5b91c7b commit 0c87751

File tree

1 file changed

+68
-11
lines changed

1 file changed

+68
-11
lines changed

src/lib.rs

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,7 +1293,12 @@ impl Build {
12931293

12941294
#[cfg(feature = "parallel")]
12951295
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
1296-
use std::sync::{mpsc, Once};
1296+
use std::sync::{
1297+
mpsc::{self, Receiver, Sender},
1298+
Once,
1299+
};
1300+
1301+
use jobserver::{Acquired, Client};
12971302

12981303
if objs.len() <= 1 {
12991304
for obj in objs {
@@ -1309,10 +1314,54 @@ impl Build {
13091314
// easier to write loop below. If this fails, though, then we're likely
13101315
// on Windows with the main implicit token, so we just have a bit extra
13111316
// parallelism for a bit and don't reacquire later.
1312-
let server = jobserver();
1317+
let server = jobserver().clone();
13131318
// Reacquire our process's token on drop
1314-
let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));
1315-
1319+
//let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));
1320+
struct JobToken {
1321+
owned: Option<Acquired>,
1322+
pool: Sender<Option<Acquired>>,
1323+
should_return_to_queue: bool,
1324+
}
1325+
impl Drop for JobToken {
1326+
fn drop(&mut self) {
1327+
if self.should_return_to_queue {
1328+
let _ = self.pool.send(self.owned.take());
1329+
}
1330+
}
1331+
}
1332+
struct JobTokenServer {
1333+
helper: jobserver::HelperThread,
1334+
tx: Sender<Option<Acquired>>,
1335+
rx: Receiver<Option<Acquired>>,
1336+
}
1337+
impl JobTokenServer {
1338+
fn new(client: Client) -> Result<Self, Error> {
1339+
let (tx, rx) = std::sync::mpsc::channel();
1340+
tx.send(None).unwrap();
1341+
let pool = tx.clone();
1342+
let helper = client.into_helper_thread(move |acq| {
1343+
let _ = pool.send(Some(acq.unwrap()));
1344+
})?;
1345+
Ok(Self { helper, tx, rx })
1346+
}
1347+
fn acquire(&mut self) -> JobToken {
1348+
if let Ok(token) = self.rx.try_recv() {
1349+
JobToken {
1350+
owned: token,
1351+
pool: self.tx.clone(),
1352+
should_return_to_queue: true,
1353+
}
1354+
} else {
1355+
self.helper.request_token();
1356+
let token = self.rx.recv().unwrap();
1357+
JobToken {
1358+
owned: token,
1359+
pool: self.tx.clone(),
1360+
should_return_to_queue: true,
1361+
}
1362+
}
1363+
}
1364+
}
13161365
// When compiling objects in parallel we do a few dirty tricks to speed
13171366
// things up:
13181367
//
@@ -1332,7 +1381,7 @@ impl Build {
13321381
// acquire the appropriate tokens, Once all objects have been compiled
13331382
// we wait on all the processes and propagate the results of compilation.
13341383

1335-
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>();
1384+
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, JobToken)>();
13361385

13371386
// Since jobserver::Client::acquire can block, waiting
13381387
// must be done in parallel so that acquire won't block forever.
@@ -1345,7 +1394,7 @@ impl Build {
13451394

13461395
loop {
13471396
let mut has_made_progress = false;
1348-
1397+
let mut is_disconnected = false;
13491398
// Reading new pending tasks
13501399
loop {
13511400
match rx.try_recv() {
@@ -1361,14 +1410,21 @@ impl Build {
13611410
Ok(())
13621411
};
13631412
}
1413+
Err(mpsc::TryRecvError::Disconnected) => {
1414+
is_disconnected = true;
1415+
break;
1416+
}
13641417
_ => break,
13651418
}
13661419
}
13671420

13681421
// Try waiting on them.
1369-
pendings.retain_mut(|(cmd, program, child, _)| {
1422+
pendings.retain_mut(|(cmd, program, child, token)| {
13701423
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
13711424
Ok(Some(())) => {
1425+
if is_disconnected {
1426+
token.should_return_to_queue = false;
1427+
}
13721428
// Task done, remove the entry
13731429
has_made_progress = true;
13741430
false
@@ -1377,7 +1433,9 @@ impl Build {
13771433
Err(err) => {
13781434
// Task fail, remove the entry.
13791435
has_made_progress = true;
1380-
1436+
if is_disconnected {
1437+
token.should_return_to_queue = false;
1438+
}
13811439
// Since we can only return one error, log the error to make
13821440
// sure users always see all the compilation failures.
13831441
let _ = writeln!(stdout, "cargo:warning={}", err);
@@ -1415,11 +1473,10 @@ impl Build {
14151473
};
14161474
}
14171475
})?;
1418-
1476+
let mut tokens = JobTokenServer::new(server)?;
14191477
for obj in objs {
14201478
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
1421-
let token = server.acquire()?;
1422-
1479+
let token = tokens.acquire();
14231480
let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;
14241481

14251482
tx.send((cmd, program, KillOnDrop(child), token))

0 commit comments

Comments
 (0)