Skip to content

Commit 63f833a

Browse files
Implement jobserver event communication
1 parent 14530d0 commit 63f833a

File tree

5 files changed

+214
-48
lines changed

5 files changed

+214
-48
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3667,6 +3667,7 @@ dependencies = [
36673667
"rustc_feature",
36683668
"rustc_hir",
36693669
"rustc_interface",
3670+
"rustc_jobserver",
36703671
"rustc_lint",
36713672
"rustc_metadata",
36723673
"rustc_mir",
@@ -3825,6 +3826,7 @@ dependencies = [
38253826
"jobserver",
38263827
"lazy_static 1.4.0",
38273828
"log",
3829+
"serialize",
38283830
]
38293831

38303832
[[package]]

src/librustc_codegen_ssa/back/write.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -994,9 +994,12 @@ fn start_executing_work<B: ExtraBackendMethods>(
994994
// get tokens on `coordinator_receive` which will
995995
// get managed in the main loop below.
996996
let coordinator_send2 = coordinator_send.clone();
997-
let helper = rustc_jobserver::helper_thread(move |token| {
998-
drop(coordinator_send2.send(Box::new(Message::Token::<B>(token))));
999-
});
997+
let request_token = move || {
998+
let coordinator_send2 = coordinator_send2.clone();
999+
rustc_jobserver::request_token(move |token| {
1000+
drop(coordinator_send2.send(Box::new(Message::Token::<B>(token))));
1001+
});
1002+
};
10001003

10011004
let mut each_linked_rlib_for_lto = Vec::new();
10021005
drop(link::each_linked_rlib(crate_info, &mut |cnum, path| {
@@ -1303,7 +1306,7 @@ fn start_executing_work<B: ExtraBackendMethods>(
13031306
.unwrap_or_else(|e| e);
13041307
work_items.insert(insertion_index, (work, cost));
13051308
if !cgcx.opts.debugging_opts.no_parallel_llvm {
1306-
helper.request_token();
1309+
request_token();
13071310
}
13081311
}
13091312
}
@@ -1413,7 +1416,7 @@ fn start_executing_work<B: ExtraBackendMethods>(
14131416
work_items.insert(insertion_index, (llvm_work_item, cost));
14141417

14151418
if !cgcx.opts.debugging_opts.no_parallel_llvm {
1416-
helper.request_token();
1419+
request_token();
14171420
}
14181421
assert!(!codegen_aborted);
14191422
assert_eq!(main_thread_worker_state, MainThreadWorkerState::Codegenning);

src/librustc_driver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ rustc_codegen_utils = { path = "../librustc_codegen_utils" }
3131
rustc_error_codes = { path = "../librustc_error_codes" }
3232
rustc_interface = { path = "../librustc_interface" }
3333
rustc_serialize = { path = "../libserialize", package = "serialize" }
34+
rustc_jobserver = { path = "../librustc_jobserver" }
3435
syntax = { path = "../libsyntax" }
3536
rustc_span = { path = "../librustc_span" }
3637

src/librustc_jobserver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ doctest = false
1313
jobserver = "0.1.21"
1414
log = "0.4"
1515
lazy_static = "1"
16+
rustc_serialize = { path = "../libserialize", package = "serialize" }

src/librustc_jobserver/lib.rs

Lines changed: 202 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,69 @@
1+
//! rustc wants to manage its jobserver pool such that it never keeps a token
2+
//! around for too long if it's not being used (i.e., eagerly return tokens), so
3+
//! that Cargo can spawn more rustcs to go around.
4+
//!
5+
//! rustc also has a process-global implicit token when it starts, which we keep
6+
//! track of -- we cannot release it to Cargo, and we want to make sure that if
7+
//! it is released we *must* unblock a thread of execution onto it (otherwise we
8+
//! will deadlock on it almost for sure).
9+
//!
10+
//! So, when we start up, we have an implicit token and no acquired tokens from
11+
//! Cargo.
12+
//!
13+
//! We immediately on startup spawn a thread into the background to manage
14+
//! communication with the jobserver (otherwise it's too hard to work with the
15+
//! jobserver API). This is non-ideal, and it would be good to avoid, but
16+
//! currently that cost is pretty much required for correct functionality, as we
17+
//! must be able to atomically wait on both a Condvar (for other threads
18+
//! releasing the implicit token) and the jobserver itself. That's not possible
19+
//! with the jobserver API today unless we spawn up an additional thread.
20+
//!
21+
//! There are 3 primary APIs this crate exposes:
22+
//! * acquire()
23+
//! * release()
24+
//! * acquire_from_request()
25+
//! * request_token()
26+
//!
27+
//! The first two, acquire and release, are blocking functions which acquire
28+
//! and release a jobserver token.
29+
//!
30+
//! The latter two help manage async requesting of tokens: specifically,
31+
//! acquire_from_request() will block on acquiring token but will not request it
32+
//! from the jobserver itself, whereas the last one just requests a token (and
33+
//! should return pretty quickly, i.e., it does not block on some event).
34+
//!
35+
//! -------------------------------------
36+
//!
37+
//! We also have two modes to manage here. In the primary (default) mode we
38+
//! communicate directly with the underlying jobserver (i.e., all
39+
//! acquire/release requests fall through to the jobserver crate's
40+
//! acquire/release functions).
41+
//!
42+
//! This can be quite poor for scalability, as at least on current Linux
43+
//! kernels, each release on the jobserver will trigger the kernel to wake up
44+
//! *all* waiters instead of just one, which, if you have lots of threads
45+
//! waiting, is quite bad.
46+
//!
47+
//! For that reason, we have a second mode which utilizes Cargo to improve
48+
//! scaling here. In that mode, we have slightly altered communication with the
49+
//! jobserver. Instead of just blocking on the jobserver, we will instead first
50+
//! print to stderr a JSON message indicating that we're interested in receiving
51+
//! a jobserver token, and only then block on actually receiving said token. On
52+
//! release, we don't write into the jobserver at all, instead merely printing
53+
//! out that we've released a token.
54+
//!
55+
//! Note that the second mode allows Cargo to hook up each rustc with its own
56+
//! jobserver (i.e., one per rustc process) and then fairly easily make sure to
57+
//! fulfill the requests from rustc and such. Ultimately, that means that we
58+
//! have just one rustc thread waiting on the jobserver: a solution that is
59+
//! nearly optimal for scalability.
60+
161
use jobserver::Client;
262
use lazy_static::lazy_static;
63+
use rustc_serialize::json::as_json;
64+
use std::collections::VecDeque;
365
use std::sync::atomic::{AtomicUsize, Ordering};
66+
use std::sync::{Arc, Condvar, Mutex};
467

568
lazy_static! {
669
// We can only call `from_env` once per process
@@ -22,23 +85,15 @@ lazy_static! {
2285
// per-process.
2386
static ref GLOBAL_CLIENT: Client = unsafe {
2487
Client::from_env().unwrap_or_else(|| {
88+
log::trace!("initializing fresh jobserver (not from env)");
2589
let client = Client::new(32).expect("failed to create jobserver");
2690
// Acquire a token for the main thread which we can release later
2791
client.acquire_raw().ok();
2892
client
2993
})
3094
};
31-
}
3295

33-
// Unlike releasing tokens, there's not really a "one size fits all" approach, as we have two
34-
// primary ways of acquiring a token: via the helper thread, and via the acquire_thread function.
35-
//
36-
// That makes this function necessary unlike in the release case where everything is piped through
37-
// `release_thread`.
38-
fn notify_acquiring_token() {
39-
if should_notify() {
40-
// FIXME: tell Cargo of our interest
41-
}
96+
static ref HELPER: Mutex<Helper> = Mutex::new(Helper::new());
4297
}
4398

4499
// These are the values for TOKEN_REQUESTS, which is an enum between these
@@ -60,71 +115,175 @@ fn should_notify() -> bool {
60115
value == CARGO_REQUESTED
61116
}
62117

63-
/// This changes a global value to the new value of token_requests, which means
64-
/// that you probably don't want to be calling this more than once per process.
65-
/// Unfortunately the jobserver is inherently a global resource (we can't have
66-
/// more than one) so the token requesting strategy must likewise be global.
118+
/// This will only adjust the global value to the new value of token_requests
119+
/// the first time it's called, which means that you want to make sure that the
120+
/// first call you make has the right value for `token_requests`. We try to help
121+
/// out a bit by making sure that this is called before any interaction with the
122+
/// jobserver (which usually happens almost immediately as soon as rustc does
123+
/// anything due to spawning up the Rayon threadpool).
124+
///
125+
/// Unfortunately the jobserver is inherently a global resource (we can't
126+
/// have more than one) so the token requesting strategy must likewise be global.
67127
///
68128
/// Usually this doesn't matter too much, as you're not wanting to set the token
69129
/// requests unless you're in the one-rustc-per-process model, and we help out
70130
/// here a bit by not resetting it once it's set (i.e., only the first init will
71131
/// change the value).
72132
pub fn initialize(token_requests: bool) {
73-
TOKEN_REQUESTS.compare_and_swap(
133+
lazy_static::initialize(&GLOBAL_CLIENT);
134+
lazy_static::initialize(&HELPER);
135+
let previous = TOKEN_REQUESTS.compare_and_swap(
74136
EMPTY,
75137
if token_requests { CARGO_REQUESTED } else { MAKE_REQUESTED },
76138
Ordering::SeqCst,
77139
);
78-
lazy_static::initialize(&GLOBAL_CLIENT)
140+
if previous == EMPTY {
141+
log::info!("initialized rustc jobserver, set token_requests={:?}", token_requests);
142+
}
79143
}
80144

81-
pub struct HelperThread {
145+
pub struct Helper {
82146
helper: jobserver::HelperThread,
147+
tokens: usize,
148+
requests: Arc<Mutex<VecDeque<Box<dyn FnOnce(Acquired) + Send>>>>,
83149
}
84150

85-
impl HelperThread {
86-
// This API does not block, but is shimmed so that we can inform Cargo of our interest here.
151+
impl Helper {
152+
fn new() -> Self {
153+
let requests: Arc<Mutex<VecDeque<Box<dyn FnOnce(Acquired) + Send>>>> =
154+
Arc::new(Mutex::new(VecDeque::new()));
155+
let requests2 = requests.clone();
156+
let helper = GLOBAL_CLIENT
157+
.clone()
158+
.into_helper_thread(move |token| {
159+
log::trace!("Helper thread token sending into channel");
160+
// We've acquired a token, but we need to not use it as we have our own
161+
// custom release-on-drop struct since we'll want different logic than
162+
// just normally releasing the token in this case.
163+
//
164+
// On unix this unfortunately means that we lose the specific byte that
165+
// was in the pipe (i.e., we just write back the same byte all the time)
166+
// but that's not expected to be a problem.
167+
token.expect("acquire token").drop_without_releasing();
168+
if let Some(sender) = requests2.lock().unwrap().pop_front() {
169+
sender(Acquired::new());
170+
}
171+
})
172+
.expect("spawned helper");
173+
Helper { helper, tokens: 1, requests }
174+
}
175+
176+
// This blocks on acquiring a token (that must have been previously
177+
// requested).
178+
fn acquire_token_from_prior_request(&mut self) -> Acquired {
179+
if self.tokens == 0 {
180+
self.tokens += 1;
181+
return Acquired::new();
182+
}
183+
184+
let receiver = Arc::new((Mutex::new(None), Condvar::new()));
185+
let receiver2 = receiver.clone();
186+
187+
self.requests.lock().unwrap().push_back(Box::new(move |token| {
188+
let mut slot = receiver.0.lock().unwrap();
189+
*slot = Some(token);
190+
receiver.1.notify_one();
191+
}));
192+
193+
let (lock, cvar) = &*receiver2;
194+
let mut guard = cvar.wait_while(lock.lock().unwrap(), |slot| slot.is_none()).unwrap();
195+
196+
self.tokens += 1;
197+
guard.take().unwrap()
198+
}
199+
200+
fn release_token(&mut self) {
201+
let mut requests = self.requests.lock().unwrap();
202+
203+
self.tokens -= 1;
204+
205+
if self.tokens == 0 {
206+
// If there is a sender, then it needs to be given this token.
207+
if let Some(sender) = requests.pop_front() {
208+
sender(Acquired::new());
209+
return;
210+
}
211+
212+
return;
213+
}
214+
215+
if should_notify() {
216+
eprintln!("{}", as_json(&JobserverNotification { jobserver_event: Event::Release }));
217+
} else {
218+
GLOBAL_CLIENT.release_raw().unwrap();
219+
}
220+
}
221+
87222
pub fn request_token(&self) {
223+
log::trace!("{:?} requesting token", std::thread::current().id());
224+
// Just notify, don't actually acquire here.
88225
notify_acquiring_token();
89226
self.helper.request_token();
90227
}
91228
}
92229

93-
pub struct Acquired(());
230+
#[must_use]
231+
pub struct Acquired {
232+
armed: bool,
233+
}
94234

95235
impl Drop for Acquired {
96236
fn drop(&mut self) {
97-
release_thread();
237+
if self.armed {
238+
release_thread();
239+
}
98240
}
99241
}
100242

101-
pub fn helper_thread<F>(mut cb: F) -> HelperThread
102-
where
103-
F: FnMut(Acquired) + Send + 'static,
104-
{
105-
let thread = GLOBAL_CLIENT
106-
.clone()
107-
.into_helper_thread(move |token| {
108-
// we've acquired a token, but we need to not use it as we have our own
109-
// custom release-on-drop struct since we'll want different logic than
110-
// just normally releasing the token in this case.
111-
//
112-
// On unix this unfortunately means that we lose the specific byte that
113-
// was in the pipe (i.e., we just write back the same byte all the time)
114-
// but that's not expected to be a problem.
115-
std::mem::forget(token.expect("acquire token"));
116-
cb(Acquired(()))
117-
})
118-
.expect("failed to spawn helper thread");
243+
impl Acquired {
244+
fn new() -> Self {
245+
Self { armed: true }
246+
}
247+
248+
fn disarm(mut self) {
249+
self.armed = false;
250+
}
251+
}
252+
253+
#[derive(RustcEncodable)]
254+
enum Event {
255+
WillAcquire,
256+
Release,
257+
}
258+
259+
#[derive(RustcEncodable)]
260+
struct JobserverNotification {
261+
jobserver_event: Event,
262+
}
263+
264+
// Unlike releasing tokens, there's not really a "one size fits all" approach, as we have two
265+
// primary ways of acquiring a token: via the helper thread, and via the acquire_thread function.
266+
fn notify_acquiring_token() {
267+
if should_notify() {
268+
eprintln!("{}", as_json(&JobserverNotification { jobserver_event: Event::WillAcquire }));
269+
}
270+
}
271+
272+
pub fn request_token(f: impl FnOnce(Acquired) + Send + 'static) {
273+
HELPER.lock().unwrap().requests.lock().unwrap().push_back(Box::new(move |token| {
274+
f(token);
275+
}));
276+
}
119277

120-
HelperThread { helper: thread }
278+
pub fn acquire_from_request() -> Acquired {
279+
HELPER.lock().unwrap().acquire_token_from_prior_request()
121280
}
122281

123282
pub fn acquire_thread() {
124-
notify_acquiring_token();
125-
GLOBAL_CLIENT.acquire_raw().ok();
283+
HELPER.lock().unwrap().request_token();
284+
HELPER.lock().unwrap().acquire_token_from_prior_request().disarm();
126285
}
127286

128287
pub fn release_thread() {
129-
GLOBAL_CLIENT.release_raw().ok();
288+
HELPER.lock().unwrap().release_token();
130289
}

0 commit comments

Comments
 (0)