diff --git a/src/cargo/core/compiler/context/mod.rs b/src/cargo/core/compiler/context/mod.rs index 652acaba869..6adccbd4cd4 100644 --- a/src/cargo/core/compiler/context/mod.rs +++ b/src/cargo/core/compiler/context/mod.rs @@ -69,6 +69,11 @@ pub struct Context<'a, 'cfg> { /// metadata files in addition to the rlib itself. This is only filled in /// when `pipelining` above is enabled. rmeta_required: HashSet>, + + /// When we're in jobserver-per-rustc process mode, this keeps those + /// jobserver clients for each Unit (which eventually becomes a rustc + /// process). + pub rustc_clients: HashMap, Client>, } impl<'a, 'cfg> Context<'a, 'cfg> { @@ -112,6 +117,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> { unit_dependencies, files: None, rmeta_required: HashSet::new(), + rustc_clients: HashMap::new(), pipelining, }) } @@ -491,4 +497,23 @@ impl<'a, 'cfg> Context<'a, 'cfg> { pub fn rmeta_required(&self, unit: &Unit<'a>) -> bool { self.rmeta_required.contains(unit) || self.bcx.config.cli_unstable().timings.is_some() } + + pub fn new_jobserver(&mut self) -> CargoResult { + let tokens = self.bcx.build_config.jobs as usize; + let client = Client::new(tokens).chain_err(|| "failed to create jobserver")?; + + // Drain the client fully + for i in 0..tokens { + while let Err(e) = client.acquire_raw() { + anyhow::bail!( + "failed to fully drain {}/{} token from jobserver at startup: {:?}", + i, + tokens, + e, + ); + } + } + + Ok(client) + } } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index bdde1297237..fd86611bcc5 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -1,5 +1,56 @@ +//! This module implements the job queue which determines the ordering in which +//! rustc is spawned off. It also manages the allocation of jobserver tokens to +//! rustc beyond the implicit token each rustc owns (i.e., the ones used for +//! parallel LLVM work and parallel rustc threads). +//! +//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each +//! other, which is due to scaling issues with sharing a single jobserver +//! amongst what is potentially hundreds of threads of work on many-cored +//! systems on (at least) linux, and likely other platforms as well. +//! +//! The details of this algorithm are (also) written out in +//! src/librustc_jobserver/lib.rs. What follows is a description focusing on the +//! Cargo side of things. +//! +//! Cargo wants to complete the build as quickly as possible, fully saturating +//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn +//! more than N threads of work: the total amount of tokens we have floating +//! around must always be limited to N. +//! +//! It is not really possible to optimally choose which crate should build first +//! or last; nor is it possible to decide whether to give an additional token to +//! rustc first or rather spawn a new crate of work. For now, the algorithm we +//! implement prioritizes spawning as many crates (i.e., rustc processes) as +//! possible, and then filling each rustc with tokens on demand. +//! +//! The primary loop is in `drain_the_queue` below. +//! +//! We integrate with the jobserver, originating from GNU make, to make sure +//! that build scripts which use make to build C code can cooperate with us on +//! the number of used tokens and avoid overfilling the system we're on. +//! +//! The jobserver is unfortunately a very simple protocol, so we enhance it a +//! little when we know that there is a rustc on the other end. Via the stderr +//! pipe we have to rustc, we get messages such as "NeedsToken" and +//! "ReleaseToken" from rustc. +//! +//! "NeedsToken" indicates that a rustc is interested in acquiring a token, but +//! never that it would be impossible to make progress without one (i.e., it +//! would be incorrect for rustc to not terminate due to a unfulfilled +//! NeedsToken request); we do not usually fulfill all NeedsToken requests for a +//! given rustc. +//! +//! "ReleaseToken" indicates that a rustc is done with one of its tokens and is +//! ready for us to re-acquire ownership -- we will either release that token +//! back into the general pool or reuse it ourselves. Note that rustc will +//! inform us that it is releasing a token even if it itself is also requesting +//! tokens; is is up to us whether to return the token to that same rustc. +//! +//! The current scheduling algorithm is relatively primitive and could likely be +//! improved. + use std::cell::Cell; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::io; use std::marker; use std::mem; @@ -9,7 +60,7 @@ use std::time::Duration; use anyhow::format_err; use crossbeam_utils::thread::Scope; -use jobserver::{Acquired, HelperThread}; +use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; use super::context::OutputFile; @@ -27,22 +78,67 @@ use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder use crate::util::{Config, DependencyQueue}; use crate::util::{Progress, ProgressStyle}; -/// A management structure of the entire dependency graph to compile. -/// +/// This structure is backed by the `DependencyQueue` type and manages the +/// queueing of compilation steps for each package. Packages enqueue units of +/// work and then later on the entire graph is converted to DrainState and +/// executed. +pub struct JobQueue<'a, 'cfg> { + queue: DependencyQueue, Artifact, Job>, + counts: HashMap, + timings: Timings<'a, 'cfg>, +} + /// This structure is backed by the `DependencyQueue` type and manages the /// actual compilation step of each package. Packages enqueue units of work and /// then later on the entire graph is processed and compiled. -pub struct JobQueue<'a, 'cfg> { +/// +/// It is created from JobQueue when we have fully assembled the crate graph +/// (i.e., all package dependencies are known). +struct DrainState<'a, 'cfg> { queue: DependencyQueue, Artifact, Job>, tx: Sender, rx: Receiver, - active: HashMap>, + active: HashMap>, compiled: HashSet, documented: HashSet, counts: HashMap, progress: Progress<'cfg>, next_id: u32, timings: Timings<'a, 'cfg>, + + /// Tokens that are currently owned by this Cargo, and may be "associated" + /// with a rustc process. They may also be unused, though if so will be + /// dropped on the next loop iteration. + /// + /// Note that the length of this may be zero, but we will still spawn work, + /// as we share the implicit token given to this Cargo process with a + /// single rustc process. + tokens: Vec, + + /// rustc per-thread tokens, when in jobserver-per-rustc mode. + rustc_tokens: HashMap>, + + /// This represents the list of rustc jobs (processes) and associated + /// clients that are interested in receiving a token. + to_send_clients: BTreeMap>, + + /// The list of jobs that we have not yet started executing, but have + /// retrieved from the `queue`. We eagerly pull jobs off the main queue to + /// allow us to request jobserver tokens pretty early. + pending_queue: Vec<(Unit<'a>, Job)>, + print: DiagnosticPrinter<'cfg>, + + // How many jobs we've finished + finished: usize, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct JobId(pub u32); + +impl std::fmt::Display for JobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } } pub struct JobState<'a> { @@ -51,7 +147,7 @@ pub struct JobState<'a> { /// The job id that this state is associated with, used when sending /// messages back to the main thread. - id: u32, + id: JobId, /// Whether or not we're expected to have a call to `rmeta_produced`. Once /// that method is called this is dynamically set to `false` to prevent @@ -84,13 +180,19 @@ enum Artifact { } enum Message { - Run(u32, String), + Run(JobId, String), BuildPlanMsg(String, ProcessBuilder, Arc>), Stdout(String), Stderr(String), FixDiagnostic(diagnostic_server::Message), Token(io::Result), - Finish(u32, Artifact, CargoResult<()>), + Finish(JobId, Artifact, CargoResult<()>), + + // This client should get release_raw called on it with one of our tokens + NeedsToken(JobId), + + // A token previously passed to a NeedsToken client is being released. + ReleaseToken(JobId), } impl<'a> JobState<'a> { @@ -128,24 +230,30 @@ impl<'a> JobState<'a> { .tx .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } + + /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) + /// on the passed client. + /// + /// This should arrange for the associated client to eventually get a token via + /// `client.release_raw()`. + pub fn will_acquire(&self) { + let _ = self.tx.send(Message::NeedsToken(self.id)); + } + + /// The rustc underlying this Job is informing us that it is done with a jobserver token. + /// + /// Note that it does *not* write that token back anywhere. + pub fn release_token(&self) { + let _ = self.tx.send(Message::ReleaseToken(self.id)); + } } impl<'a, 'cfg> JobQueue<'a, 'cfg> { pub fn new(bcx: &BuildContext<'a, 'cfg>, root_units: &[Unit<'a>]) -> JobQueue<'a, 'cfg> { - let (tx, rx) = channel(); - let progress = Progress::with_style("Building", ProgressStyle::Ratio, bcx.config); - let timings = Timings::new(bcx, root_units); JobQueue { queue: DependencyQueue::new(), - tx, - rx, - active: HashMap::new(), - compiled: HashSet::new(), - documented: HashSet::new(), counts: HashMap::new(), - progress, - next_id: 0, - timings, + timings: Timings::new(bcx, root_units), } } @@ -226,12 +334,34 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { /// This function will spawn off `config.jobs()` workers to build all of the /// necessary dependencies, in order. Freshness is propagated as far as /// possible along each dependency chain. - pub fn execute(&mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> { + pub fn execute(mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); + let (tx, rx) = channel(); + let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config); + let state = DrainState { + queue: self.queue, + tx, + rx, + active: HashMap::new(), + compiled: HashSet::new(), + documented: HashSet::new(), + counts: self.counts, + progress, + next_id: 0, + timings: self.timings, + + tokens: Vec::new(), + rustc_tokens: HashMap::new(), + to_send_clients: BTreeMap::new(), + pending_queue: Vec::new(), + print: DiagnosticPrinter::new(cx.bcx.config), + finished: 0, + }; + // Create a helper thread for acquiring jobserver tokens - let tx = self.tx.clone(); + let tx = state.tx.clone(); let helper = cx .jobserver .clone() @@ -242,7 +372,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Create a helper thread to manage the diagnostics for rustfix if // necessary. - let tx = self.tx.clone(); + let tx = state.tx.clone(); let _diagnostic_server = cx .bcx .build_config @@ -251,26 +381,241 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { .take() .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); - // Use `crossbeam` to create a scope in which we can execute scoped - // threads. Note that this isn't currently required by Cargo but it was - // historically required. This is left in for now in case we need the - // `'a` ability for child threads in the near future, but if this - // comment has been sitting here for a long time feel free to refactor - // away crossbeam. - crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper)) + crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper)) .expect("child threads shouldn't panic") } +} - fn drain_the_queue( +impl<'a, 'cfg> DrainState<'a, 'cfg> { + fn spawn_work_if_possible( + &mut self, + cx: &mut Context<'a, '_>, + jobserver_helper: &HelperThread, + scope: &Scope<'_>, + has_errored: bool, + ) -> CargoResult<()> { + // Dequeue as much work as we can, learning about everything + // possible that can run. Note that this is also the point where we + // start requesting job tokens. Each job after the first needs to + // request a token. + while let Some((unit, job)) = self.queue.dequeue() { + self.pending_queue.push((unit, job)); + if self.active.len() + self.pending_queue.len() > 1 { + jobserver_helper.request_token(); + } + } + + // Do not actually spawn the new work if we've errored out + if has_errored { + return Ok(()); + } + + // Now that we've learned of all possible work that we can execute + // try to spawn it so long as we've got a jobserver token which says + // we're able to perform some parallel work. + while self.has_extra_tokens() && !self.pending_queue.is_empty() { + let (unit, job) = self.pending_queue.remove(0); + self.run(&unit, job, cx, scope)?; + } + + Ok(()) + } + + fn has_extra_tokens(&self) -> bool { + self.active.len() < self.tokens.len() + 1 + } + + // The oldest job (i.e., least job ID) is the one we grant tokens to first. + fn pop_waiting_client(&mut self) -> (JobId, Client) { + // FIXME: replace this with BTreeMap::first_entry when that stabilizes. + let key = *self + .to_send_clients + .keys() + .next() + .expect("at least one waiter"); + let clients = self.to_send_clients.get_mut(&key).unwrap(); + let client = clients.pop().unwrap(); + if clients.is_empty() { + self.to_send_clients.remove(&key); + } + (key, client) + } + + // If we managed to acquire some extra tokens, send them off to a waiting rustc. + fn grant_rustc_token_requests(&mut self) -> CargoResult<()> { + while !self.to_send_clients.is_empty() && self.has_extra_tokens() { + let (id, client) = self.pop_waiting_client(); + // This unwrap is guaranteed to succeed. `active` must be at least + // length 1, as otherwise there can't be a client waiting to be sent + // on, so tokens.len() must also be at least one. + let token = self.tokens.pop().unwrap(); + self.rustc_tokens + .entry(id) + .or_insert_with(Vec::new) + .push(token); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } + + Ok(()) + } + + fn handle_event( &mut self, cx: &mut Context<'a, '_>, + jobserver_helper: &HelperThread, + plan: &mut BuildPlan, + event: Message, + ) -> CargoResult> { + match event { + Message::Run(id, cmd) => { + cx.bcx + .config + .shell() + .verbose(|c| c.status("Running", &cmd))?; + self.timings.unit_start(id, self.active[&id]); + } + Message::BuildPlanMsg(module_name, cmd, filenames) => { + plan.update(&module_name, &cmd, &filenames)?; + } + Message::Stdout(out) => { + cx.bcx.config.shell().stdout_println(out); + } + Message::Stderr(err) => { + let mut shell = cx.bcx.config.shell(); + shell.print_ansi(err.as_bytes())?; + shell.err().write_all(b"\n")?; + } + Message::FixDiagnostic(msg) => { + self.print.print(&msg)?; + } + Message::Finish(id, artifact, result) => { + let unit = match artifact { + // If `id` has completely finished we remove it + // from the `active` map ... + Artifact::All => { + info!("end: {:?}", id); + self.finished += 1; + if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) { + // This puts back the tokens that this rustc + // acquired into our primary token list. + // + // This represents a rustc bug: it did not + // release all of its thread tokens but finished + // completely. But we want to make Cargo resilient + // to such rustc bugs, as they're generally not + // fatal in nature (i.e., Cargo can make progress + // still, and the build might not even fail). + self.tokens.extend(rustc_tokens); + } + self.to_send_clients.remove(&id); + self.active.remove(&id).unwrap() + } + // ... otherwise if it hasn't finished we leave it + // in there as we'll get another `Finish` later on. + Artifact::Metadata => { + info!("end (meta): {:?}", id); + self.active[&id] + } + }; + info!("end ({:?}): {:?}", unit, result); + match result { + Ok(()) => self.finish(id, &unit, artifact, cx)?, + Err(e) => { + let msg = "The following warnings were emitted during compilation:"; + self.emit_warnings(Some(msg), &unit, cx)?; + + if !self.active.is_empty() { + handle_error(&e, &mut *cx.bcx.config.shell()); + cx.bcx.config.shell().warn( + "build failed, waiting for other \ + jobs to finish...", + )?; + return Ok(Some(anyhow::format_err!("build failed"))); + } else { + return Ok(Some(e)); + } + } + } + } + Message::Token(acquired_token) => { + let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?; + self.tokens.push(token); + } + Message::NeedsToken(id) => { + log::info!("queue token request"); + jobserver_helper.request_token(); + let client = cx.rustc_clients[&self.active[&id]].clone(); + self.to_send_clients + .entry(id) + .or_insert_with(Vec::new) + .push(client); + } + Message::ReleaseToken(id) => { + // Note that this pops off potentially a completely + // different token, but all tokens of the same job are + // conceptually the same so that's fine. + // + // self.tokens is a "pool" -- the order doesn't matter -- and + // this transfers ownership of the token into that pool. If we + // end up using it on the next go around, then this token will + // be truncated, same as tokens obtained through Message::Token. + let rustc_tokens = self + .rustc_tokens + .get_mut(&id) + .expect("no tokens associated"); + self.tokens + .push(rustc_tokens.pop().expect("rustc releases token it has")); + } + } + + Ok(None) + } + + // This will also tick the progress bar as appropriate + fn wait_for_events(&mut self) -> Vec { + // Drain all events at once to avoid displaying the progress bar + // unnecessarily. If there's no events we actually block waiting for + // an event, but we keep a "heartbeat" going to allow `record_cpu` + // to run above to calculate CPU usage over time. To do this we + // listen for a message with a timeout, and on timeout we run the + // previous parts of the loop again. + let events: Vec<_> = self.rx.try_iter().collect(); + info!( + "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", + self.tokens.len(), + self.rustc_tokens + .iter() + .map(|(k, j)| (k, j.len())) + .collect::>(), + self.to_send_clients + .iter() + .map(|(k, j)| (k, j.len())) + .collect::>(), + events.len(), + ); + if events.is_empty() { + loop { + self.tick_progress(); + self.tokens.truncate(self.active.len() - 1); + match self.rx.recv_timeout(Duration::from_millis(500)) { + Ok(message) => break vec![message], + Err(_) => continue, + } + } + } else { + events + } + } + + fn drain_the_queue( + mut self, + cx: &mut Context<'a, '_>, plan: &mut BuildPlan, scope: &Scope<'a>, jobserver_helper: &HelperThread, ) -> CargoResult<()> { - let mut tokens = Vec::new(); - let mut queue = Vec::new(); - let mut print = DiagnosticPrinter::new(cx.bcx.config); trace!("queue: {:#?}", self.queue); // Iteratively execute the entire dependency graph. Each turn of the @@ -284,27 +629,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // successful and otherwise wait for pending work to finish if it failed // and then immediately return. let mut error = None; - let total = self.queue.len(); - let mut finished = 0; loop { - // Dequeue as much work as we can, learning about everything - // possible that can run. Note that this is also the point where we - // start requesting job tokens. Each job after the first needs to - // request a token. - while let Some((unit, job)) = self.queue.dequeue() { - queue.push((unit, job)); - if self.active.len() + queue.len() > 1 { - jobserver_helper.request_token(); - } - } - - // Now that we've learned of all possible work that we can execute - // try to spawn it so long as we've got a jobserver token which says - // we're able to perform some parallel work. - while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() { - let (unit, job) = queue.remove(0); - self.run(&unit, job, cx, scope)?; - } + self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?; // If after all that we're not actually running anything then we're // done! @@ -312,101 +638,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { break; } + self.grant_rustc_token_requests()?; + // And finally, before we block waiting for the next event, drop any // excess tokens we may have accidentally acquired. Due to how our // jobserver interface is architected we may acquire a token that we // don't actually use, and if this happens just relinquish it back // to the jobserver itself. - tokens.truncate(self.active.len() - 1); - - // Record some timing information if `-Ztimings` is enabled, and - // this'll end up being a noop if we're not recording this - // information. - self.timings - .mark_concurrency(self.active.len(), queue.len(), self.queue.len()); - self.timings.record_cpu(); - - // Drain all events at once to avoid displaying the progress bar - // unnecessarily. If there's no events we actually block waiting for - // an event, but we keep a "heartbeat" going to allow `record_cpu` - // to run above to calculate CPU usage over time. To do this we - // listen for a message with a timeout, and on timeout we run the - // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); - let events = if events.is_empty() { - self.show_progress(finished, total); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => vec![message], - Err(_) => continue, - } - } else { - events - }; - - for event in events { - match event { - Message::Run(id, cmd) => { - cx.bcx - .config - .shell() - .verbose(|c| c.status("Running", &cmd))?; - self.timings.unit_start(id, self.active[&id]); - } - Message::BuildPlanMsg(module_name, cmd, filenames) => { - plan.update(&module_name, &cmd, &filenames)?; - } - Message::Stdout(out) => { - cx.bcx.config.shell().stdout_println(out); - } - Message::Stderr(err) => { - let mut shell = cx.bcx.config.shell(); - shell.print_ansi(err.as_bytes())?; - shell.err().write_all(b"\n")?; - } - Message::FixDiagnostic(msg) => { - print.print(&msg)?; - } - Message::Finish(id, artifact, result) => { - let unit = match artifact { - // If `id` has completely finished we remove it - // from the `active` map ... - Artifact::All => { - info!("end: {:?}", id); - finished += 1; - self.active.remove(&id).unwrap() - } - // ... otherwise if it hasn't finished we leave it - // in there as we'll get another `Finish` later on. - Artifact::Metadata => { - info!("end (meta): {:?}", id); - self.active[&id] - } - }; - info!("end ({:?}): {:?}", unit, result); - match result { - Ok(()) => self.finish(id, &unit, artifact, cx)?, - Err(e) => { - let msg = "The following warnings were emitted during compilation:"; - self.emit_warnings(Some(msg), &unit, cx)?; - - if !self.active.is_empty() { - error = Some(anyhow::format_err!("build failed")); - handle_error(&e, &mut *cx.bcx.config.shell()); - cx.bcx.config.shell().warn( - "build failed, waiting for other \ - jobs to finish...", - )?; - } else { - error = Some(e); - } - } - } - } - Message::Token(acquired_token) => { - tokens.push( - acquired_token.chain_err(|| "failed to acquire jobserver token")?, - ); - } + for event in self.wait_for_events() { + if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? { + error = Some(err); } } } @@ -434,7 +675,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { if let Some(e) = error { Err(e) - } else if self.queue.is_empty() && queue.is_empty() { + } else if self.queue.is_empty() && self.pending_queue.is_empty() { let message = format!( "{} [{}] target(s) in {}", profile_name, opt_type, time_elapsed @@ -450,16 +691,31 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } - fn show_progress(&mut self, count: usize, total: usize) { + // This also records CPU usage and marks concurrency; we roughly want to do + // this as often as we spin on the events receiver (at least every 500ms or + // so). + fn tick_progress(&mut self) { + // Record some timing information if `-Ztimings` is enabled, and + // this'll end up being a noop if we're not recording this + // information. + self.timings.mark_concurrency( + self.active.len(), + self.pending_queue.len(), + self.queue.len(), + self.rustc_tokens.len(), + ); + self.timings.record_cpu(); + let active_names = self .active .values() .map(|u| self.name_for_progress(u)) .collect::>(); - drop( - self.progress - .tick_now(count, total, &format!(": {}", active_names.join(", "))), - ); + drop(self.progress.tick_now( + self.finished, + self.queue.len(), + &format!(": {}", active_names.join(", ")), + )); } fn name_for_progress(&self, unit: &Unit<'_>) -> String { @@ -481,17 +737,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } - /// Executes a job in the `scope` given, pushing the spawned thread's - /// handled onto `threads`. + /// Executes a job, pushing the spawned thread's handled onto `threads`. fn run( &mut self, unit: &Unit<'a>, job: Job, cx: &Context<'a, '_>, - scope: &Scope<'a>, + scope: &Scope<'_>, ) -> CargoResult<()> { - let id = self.next_id; - self.next_id = id.checked_add(1).unwrap(); + let id = JobId(self.next_id); + self.next_id = self.next_id.checked_add(1).unwrap(); info!("start {}: {:?}", id, unit); @@ -501,6 +756,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let my_tx = self.tx.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); + + if !cx.bcx.build_config.build_plan { + // Print out some nice progress information. + self.note_working_on(cx.bcx.config, unit, fresh)?; + } + let doit = move || { let state = JobState { id, @@ -540,7 +801,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // to make sure nothing hangs by accident. struct FinishOnDrop<'a> { tx: &'a Sender, - id: u32, + id: JobId, result: CargoResult<()>, } @@ -552,15 +813,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } }; - if !cx.bcx.build_config.build_plan { - // Print out some nice progress information. - self.note_working_on(cx.bcx.config, unit, fresh)?; - } - match fresh { Freshness::Fresh => { self.timings.add_fresh(); - doit() + doit(); } Freshness::Dirty => { self.timings.add_dirty(); @@ -601,7 +857,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { fn finish( &mut self, - id: u32, + id: JobId, unit: &Unit<'a>, artifact: Artifact, cx: &mut Context<'a, '_>, diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 67c5496e9ba..29805bb3e0e 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -538,7 +538,14 @@ fn prepare_rustc<'a, 'cfg>( let is_primary = cx.is_primary_package(unit); let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; - base.inherit_jobserver(&cx.jobserver); + if cx.bcx.config.cli_unstable().jobserver_per_rustc { + let client = cx.new_jobserver()?; + base.inherit_jobserver(&client); + base.arg("-Zjobserver-token-requests"); + assert!(cx.rustc_clients.insert(*unit, client).is_none()); + } else { + base.inherit_jobserver(&cx.jobserver); + } build_base_args(cx, &mut base, unit, crate_types)?; build_deps_args(&mut base, cx, unit)?; Ok(base) @@ -1210,6 +1217,31 @@ fn on_stderr_line_inner( } } + #[derive(serde::Deserialize)] + struct JobserverNotification { + jobserver_event: Event, + } + + #[derive(Debug, serde::Deserialize)] + enum Event { + WillAcquire, + Release, + } + + if let Ok(JobserverNotification { jobserver_event }) = + serde_json::from_str::(compiler_message.get()) + { + log::info!( + "found jobserver directive from rustc: `{:?}`", + jobserver_event + ); + match jobserver_event { + Event::WillAcquire => state.will_acquire(), + Event::Release => state.release_token(), + } + return Ok(false); + } + // And failing all that above we should have a legitimate JSON diagnostic // from the compiler, so wrap it in an external Cargo JSON message // indicating which package it came from and then emit it. diff --git a/src/cargo/core/compiler/timings.rs b/src/cargo/core/compiler/timings.rs index a205b658642..ff24e77b10a 100644 --- a/src/cargo/core/compiler/timings.rs +++ b/src/cargo/core/compiler/timings.rs @@ -3,6 +3,7 @@ //! This module implements some simple tracking information for timing of how //! long it takes for different units to compile. use super::{CompileMode, Unit}; +use crate::core::compiler::job_queue::JobId; use crate::core::compiler::BuildContext; use crate::core::PackageId; use crate::util::cpu::State; @@ -41,7 +42,7 @@ pub struct Timings<'a, 'cfg> { unit_times: Vec>, /// Units that are in the process of being built. /// When they finished, they are moved to `unit_times`. - active: HashMap>, + active: HashMap>, /// Concurrency-tracking information. This is periodically updated while /// compilation progresses. concurrency: Vec, @@ -84,6 +85,10 @@ struct Concurrency { /// Number of units that are not yet ready, because they are waiting for /// dependencies to finish. inactive: usize, + /// Number of rustc "extra" threads -- i.e., how many tokens have been + /// provided across all current rustc instances that are not the main thread + /// tokens. + rustc_parallelism: usize, } impl<'a, 'cfg> Timings<'a, 'cfg> { @@ -140,7 +145,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that a unit has started running. - pub fn unit_start(&mut self, id: u32, unit: Unit<'a>) { + pub fn unit_start(&mut self, id: JobId, unit: Unit<'a>) { if !self.enabled { return; } @@ -174,7 +179,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that the `.rmeta` file as generated. - pub fn unit_rmeta_finished(&mut self, id: u32, unlocked: Vec<&Unit<'a>>) { + pub fn unit_rmeta_finished(&mut self, id: JobId, unlocked: Vec<&Unit<'a>>) { if !self.enabled { return; } @@ -192,7 +197,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that a unit has finished running. - pub fn unit_finished(&mut self, id: u32, unlocked: Vec<&Unit<'a>>) { + pub fn unit_finished(&mut self, id: JobId, unlocked: Vec<&Unit<'a>>) { if !self.enabled { return; } @@ -232,7 +237,13 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// This is called periodically to mark the concurrency of internal structures. - pub fn mark_concurrency(&mut self, active: usize, waiting: usize, inactive: usize) { + pub fn mark_concurrency( + &mut self, + active: usize, + waiting: usize, + inactive: usize, + rustc_parallelism: usize, + ) { if !self.enabled { return; } @@ -241,6 +252,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { active, waiting, inactive, + rustc_parallelism, }; self.concurrency.push(c); } @@ -285,7 +297,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { if !self.enabled { return Ok(()); } - self.mark_concurrency(0, 0, 0); + self.mark_concurrency(0, 0, 0, 0); self.unit_times .sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap()); if self.report_html { @@ -361,6 +373,12 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { }; let total_time = format!("{:.1}s{}", duration, time_human); let max_concurrency = self.concurrency.iter().map(|c| c.active).max().unwrap(); + let max_rustc_concurrency = self + .concurrency + .iter() + .map(|c| c.rustc_parallelism) + .max() + .unwrap(); let rustc_info = render_rustc_info(bcx); write!( f, @@ -393,6 +411,9 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { rustc:{} + + Max (global) rustc threads concurrency:{} + "#, @@ -407,6 +428,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { self.start_str, total_time, rustc_info, + max_rustc_concurrency, )?; Ok(()) } diff --git a/src/cargo/core/features.rs b/src/cargo/core/features.rs index c34fe07880d..c2ea6622fa8 100644 --- a/src/cargo/core/features.rs +++ b/src/cargo/core/features.rs @@ -341,6 +341,7 @@ pub struct CliUnstable { pub timings: Option>, pub doctest_xcompile: bool, pub panic_abort_tests: bool, + pub jobserver_per_rustc: bool, } impl CliUnstable { @@ -409,6 +410,7 @@ impl CliUnstable { "timings" => self.timings = Some(parse_timings(v)), "doctest-xcompile" => self.doctest_xcompile = parse_empty(k, v)?, "panic-abort-tests" => self.panic_abort_tests = parse_empty(k, v)?, + "jobserver-per-rustc" => self.jobserver_per_rustc = parse_empty(k, v)?, _ => bail!("unknown `-Z` flag specified: {}", k), }