From ec80cf90b04eb9fe9019278fb03aef68b7ddf64b Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 19 Dec 2019 13:40:29 -0500 Subject: [PATCH 01/26] Wire up methods for informing job queue of rustc jobserver state --- src/cargo/core/compiler/context/mod.rs | 19 ++++++++ src/cargo/core/compiler/job_queue.rs | 18 +++++++- src/cargo/core/compiler/mod.rs | 62 ++++++++++++++++++++++---- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/cargo/core/compiler/context/mod.rs b/src/cargo/core/compiler/context/mod.rs index 652acaba869..e57bab18d00 100644 --- a/src/cargo/core/compiler/context/mod.rs +++ b/src/cargo/core/compiler/context/mod.rs @@ -491,4 +491,23 @@ impl<'a, 'cfg> Context<'a, 'cfg> { pub fn rmeta_required(&self, unit: &Unit<'a>) -> bool { self.rmeta_required.contains(unit) || self.bcx.config.cli_unstable().timings.is_some() } + + pub fn new_jobserver(&mut self) -> CargoResult { + let tokens = self.bcx.build_config.jobs as usize; + let client = Client::new(tokens).chain_err(|| "failed to create jobserver")?; + + // Drain the client fully + for i in 0..tokens { + while let Err(e) = client.acquire_raw() { + anyhow::bail!( + "failed to fully drain {}/{} token from jobserver at startup: {:?}", + i, + tokens, + e, + ); + } + } + + Ok(client) + } } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index bdde1297237..311fa2d7731 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -9,7 +9,7 @@ use std::time::Duration; use anyhow::format_err; use crossbeam_utils::thread::Scope; -use jobserver::{Acquired, HelperThread}; +use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; use super::context::OutputFile; @@ -128,6 +128,22 @@ impl<'a> JobState<'a> { .tx .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } + + /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) + /// on the passed client. + /// + /// This should arrange for the passed client to eventually get a token via + /// `client.release_raw()`. + pub fn will_acquire(&self, _client: &Client) { + // ... + } + + /// The rustc underlying this Job is informing us that it is done with a jobserver token. + /// + /// Note that it does *not* write that token back anywhere. + pub fn release_token(&self) { + // ... + } } impl<'a, 'cfg> JobQueue<'a, 'cfg> { diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 67c5496e9ba..66f170a6f20 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -48,6 +48,7 @@ use crate::util::errors::{self, CargoResult, CargoResultExt, Internal, ProcessEr use crate::util::machine_message::Message; use crate::util::{self, machine_message, ProcessBuilder}; use crate::util::{internal, join_paths, paths, profile}; +use jobserver::Client; /// A glorified callback for executing calls to rustc. Rather than calling rustc /// directly, we'll use an `Executor`, giving clients an opportunity to intercept @@ -171,7 +172,7 @@ fn rustc<'a, 'cfg>( unit: &Unit<'a>, exec: &Arc, ) -> CargoResult { - let mut rustc = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; + let (rustc_client, mut rustc) = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; let build_plan = cx.bcx.build_config.build_plan; let name = unit.pkg.name().to_string(); @@ -286,7 +287,16 @@ fn rustc<'a, 'cfg>( &target, mode, &mut |line| on_stdout_line(state, line, package_id, &target), - &mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), + &mut |line| { + on_stderr_line( + state, + line, + package_id, + &target, + &mut output_options, + Some(&rustc_client), + ) + }, ) .map_err(internal_if_simple_exit_code) .chain_err(|| format!("could not compile `{}`.", name))?; @@ -534,14 +544,15 @@ fn prepare_rustc<'a, 'cfg>( cx: &mut Context<'a, 'cfg>, crate_types: &[&str], unit: &Unit<'a>, -) -> CargoResult { +) -> CargoResult<(Client, ProcessBuilder)> { let is_primary = cx.is_primary_package(unit); let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; - base.inherit_jobserver(&cx.jobserver); + let client = cx.new_jobserver()?; + base.inherit_jobserver(&client); build_base_args(cx, &mut base, unit, crate_types)?; build_deps_args(&mut base, cx, unit)?; - Ok(base) + Ok((client, base)) } fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult { @@ -600,7 +611,9 @@ fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult rustdoc .exec_with_streaming( &mut |line| on_stdout_line(state, line, package_id, &target), - &mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), + &mut |line| { + on_stderr_line(state, line, package_id, &target, &mut output_options, None) + }, false, ) .chain_err(|| format!("Could not document `{}`.", name))?; @@ -683,6 +696,7 @@ fn add_error_format_and_color( _ => {} } cmd.arg(json); + cmd.arg("-Zjobserver-token-requests"); Ok(()) } @@ -1081,8 +1095,9 @@ fn on_stderr_line( package_id: PackageId, target: &Target, options: &mut OutputOptions, + rustc_client: Option<&Client>, ) -> CargoResult<()> { - if on_stderr_line_inner(state, line, package_id, target, options)? { + if on_stderr_line_inner(state, line, package_id, target, options, rustc_client)? { // Check if caching is enabled. if let Some((path, cell)) = &mut options.cache_cell { // Cache the output, which will be replayed later when Fresh. @@ -1102,6 +1117,7 @@ fn on_stderr_line_inner( package_id: PackageId, target: &Target, options: &mut OutputOptions, + rustc_client: Option<&Client>, ) -> CargoResult { // We primarily want to use this function to process JSON messages from // rustc. The compiler should always print one JSON message per line, and @@ -1210,6 +1226,36 @@ fn on_stderr_line_inner( } } + #[derive(serde::Deserialize)] + struct JobserverNotification { + jobserver_event: Event, + } + + #[derive(Debug, serde::Deserialize)] + enum Event { + WillAcquire, + Release, + } + + if let Ok(JobserverNotification { jobserver_event }) = + serde_json::from_str::(compiler_message.get()) + { + log::trace!( + "found jobserver directive from rustc: `{:?}`", + jobserver_event + ); + match rustc_client { + Some(client) => match jobserver_event { + Event::WillAcquire => state.will_acquire(client), + Event::Release => state.release_token(), + }, + None => { + panic!("Received jobserver event without a client"); + } + } + return Ok(false); + } + // And failing all that above we should have a legitimate JSON diagnostic // from the compiler, so wrap it in an external Cargo JSON message // indicating which package it came from and then emit it. @@ -1258,7 +1304,7 @@ fn replay_output_cache( break; } let trimmed = line.trim_end_matches(&['\n', '\r'][..]); - on_stderr_line(state, trimmed, package_id, &target, &mut options)?; + on_stderr_line(state, trimmed, package_id, &target, &mut options, None)?; line.clear(); } Ok(()) From 4a8237f6ad57fc959615432f49b8ca1b1e706ad3 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Fri, 20 Dec 2019 20:11:10 -0500 Subject: [PATCH 02/26] Communicate jobserver information with each rustc --- src/cargo/core/compiler/job_queue.rs | 75 +++++++++++++++++++++++++--- src/cargo/core/compiler/mod.rs | 2 +- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 311fa2d7731..86833116107 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -91,6 +91,12 @@ enum Message { FixDiagnostic(diagnostic_server::Message), Token(io::Result), Finish(u32, Artifact, CargoResult<()>), + + // This client should get release_raw called on it with one of our tokens + NeedsToken(u32, Client), + + // A token previously passed to a NeedsToken client is being released. + ReleaseToken(u32), } impl<'a> JobState<'a> { @@ -134,15 +140,15 @@ impl<'a> JobState<'a> { /// /// This should arrange for the passed client to eventually get a token via /// `client.release_raw()`. - pub fn will_acquire(&self, _client: &Client) { - // ... + pub fn will_acquire(&self, client: &Client) { + let _ = self.tx.send(Message::NeedsToken(self.id, client.clone())); } /// The rustc underlying this Job is informing us that it is done with a jobserver token. /// /// Note that it does *not* write that token back anywhere. pub fn release_token(&self) { - // ... + let _ = self.tx.send(Message::ReleaseToken(self.id)); } } @@ -285,6 +291,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { jobserver_helper: &HelperThread, ) -> CargoResult<()> { let mut tokens = Vec::new(); + let mut rustc_tokens = Vec::new(); + let mut to_send_clients: Vec<(u32, Client)> = Vec::new(); let mut queue = Vec::new(); let mut print = DiagnosticPrinter::new(cx.bcx.config); trace!("queue: {:#?}", self.queue); @@ -322,6 +330,13 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { self.run(&unit, job, cx, scope)?; } + info!( + "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}", + tokens.len(), + rustc_tokens.len(), + to_send_clients.len() + ); + // If after all that we're not actually running anything then we're // done! if self.active.is_empty() { @@ -333,6 +348,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // jobserver interface is architected we may acquire a token that we // don't actually use, and if this happens just relinquish it back // to the jobserver itself. + let extra_tokens = tokens.len() - (self.active.len() - 1); + for _ in 0..extra_tokens { + if let Some((id, client)) = to_send_clients.pop() { + let token = tokens.pop().expect("an extra token"); + rustc_tokens.push((id, token)); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } + } tokens.truncate(self.active.len() - 1); // Record some timing information if `-Ztimings` is enabled, and @@ -389,6 +414,19 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Artifact::All => { info!("end: {:?}", id); finished += 1; + while let Some(pos) = + rustc_tokens.iter().position(|(i, _)| *i == id) + { + // push all the leftover tokens back into + // the token list + tokens.push(rustc_tokens.remove(pos).1); + } + while let Some(pos) = + to_send_clients.iter().position(|(i, _)| *i == id) + { + // drain all the pending clients + to_send_clients.remove(pos); + } self.active.remove(&id).unwrap() } // ... otherwise if it hasn't finished we leave it @@ -419,9 +457,34 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } Message::Token(acquired_token) => { - tokens.push( - acquired_token.chain_err(|| "failed to acquire jobserver token")?, - ); + let token = + acquired_token.chain_err(|| "failed to acquire jobserver token")?; + if let Some((id, client)) = to_send_clients.pop() { + rustc_tokens.push((id, token)); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } else { + tokens.push(token); + } + } + Message::NeedsToken(id, client) => { + log::info!("queue token request"); + jobserver_helper.request_token(); + to_send_clients.push((id, client)); + } + Message::ReleaseToken(id) => { + // Note that this pops off potentially a completely + // different token, but all tokens of the same job are + // conceptually the same so that's fine. + if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) { + tokens.push(rustc_tokens.remove(pos).1); + } else { + panic!( + "This job (id={}) does not have tokens associated with it", + id + ); + } } } } diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 66f170a6f20..2b9dc64ffd5 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -1240,7 +1240,7 @@ fn on_stderr_line_inner( if let Ok(JobserverNotification { jobserver_event }) = serde_json::from_str::(compiler_message.get()) { - log::trace!( + log::info!( "found jobserver directive from rustc: `{:?}`", jobserver_event ); From 877fe690d9b5bf5579a18d853f7ead55934a9514 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Fri, 20 Dec 2019 21:10:31 -0500 Subject: [PATCH 03/26] Record the amount of rustc internal parallelism --- src/cargo/core/compiler/job_queue.rs | 8 ++++++-- src/cargo/core/compiler/timings.rs | 25 +++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 86833116107..7815fb3afee 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -363,8 +363,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Record some timing information if `-Ztimings` is enabled, and // this'll end up being a noop if we're not recording this // information. - self.timings - .mark_concurrency(self.active.len(), queue.len(), self.queue.len()); + self.timings.mark_concurrency( + self.active.len(), + queue.len(), + self.queue.len(), + rustc_tokens.len(), + ); self.timings.record_cpu(); // Drain all events at once to avoid displaying the progress bar diff --git a/src/cargo/core/compiler/timings.rs b/src/cargo/core/compiler/timings.rs index a205b658642..4cd151b547c 100644 --- a/src/cargo/core/compiler/timings.rs +++ b/src/cargo/core/compiler/timings.rs @@ -84,6 +84,10 @@ struct Concurrency { /// Number of units that are not yet ready, because they are waiting for /// dependencies to finish. inactive: usize, + /// Number of rustc "extra" threads -- i.e., how many tokens have been + /// provided across all current rustc instances that are not the main thread + /// tokens. + rustc_parallelism: usize, } impl<'a, 'cfg> Timings<'a, 'cfg> { @@ -232,7 +236,13 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// This is called periodically to mark the concurrency of internal structures. - pub fn mark_concurrency(&mut self, active: usize, waiting: usize, inactive: usize) { + pub fn mark_concurrency( + &mut self, + active: usize, + waiting: usize, + inactive: usize, + rustc_parallelism: usize, + ) { if !self.enabled { return; } @@ -241,6 +251,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { active, waiting, inactive, + rustc_parallelism, }; self.concurrency.push(c); } @@ -285,7 +296,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { if !self.enabled { return Ok(()); } - self.mark_concurrency(0, 0, 0); + self.mark_concurrency(0, 0, 0, 0); self.unit_times .sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap()); if self.report_html { @@ -361,6 +372,12 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { }; let total_time = format!("{:.1}s{}", duration, time_human); let max_concurrency = self.concurrency.iter().map(|c| c.active).max().unwrap(); + let max_rustc_concurrency = self + .concurrency + .iter() + .map(|c| c.rustc_parallelism) + .max() + .unwrap(); let rustc_info = render_rustc_info(bcx); write!( f, @@ -393,6 +410,9 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { rustc:{} + + Max (global) rustc threads concurrency:{} + "#, @@ -407,6 +427,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { self.start_str, total_time, rustc_info, + max_rustc_concurrency, )?; Ok(()) } From cdcd9ade8d0ad694517ffd25577d0879306fed0c Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 23 Dec 2019 14:45:33 -0500 Subject: [PATCH 04/26] Add documentation about the job queue and rustc --- src/cargo/core/compiler/job_queue.rs | 63 +++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 7815fb3afee..9c8128241f5 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -1,3 +1,54 @@ +//! This module implements the job queue which determines the ordering in which +//! rustc is spawned off. It also manages the allocation of jobserver tokens to +//! rustc beyond the implicit token each rustc owns (i.e., the ones used for +//! parallel LLVM work and parallel rustc threads). +//! +//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each +//! other, which is due to scaling issues with sharing a single jobserver +//! amongst what is potentially hundreds of threads of work on many-cored +//! systems on (at least) linux, and likely other platforms as well. +//! +//! The details of this algorithm are (also) written out in +//! src/librustc_jobserver/lib.rs. What follows is a description focusing on the +//! Cargo side of things. +//! +//! Cargo wants to complete the build as quickly as possible, fully saturating +//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn +//! more than N threads of work: the total amount of tokens we have floating +//! around must always be limited to N. +//! +//! It is not really possible to optimally choose which crate should build first +//! or last; nor is it possible to decide whether to give an additional token to +//! rustc first or rather spawn a new crate of work. For now, the algorithm we +//! implement prioritizes spawning as many crates (i.e., rustc processes) as +//! possible, and then filling each rustc with tokens on demand. +//! +//! The primary loop is in `drain_the_queue` below. +//! +//! We integrate with the jobserver, originating from GNU make, to make sure +//! that build scripts which use make to build C code can cooperate with us on +//! the number of used tokens and avoid overfilling the system we're on. +//! +//! The jobserver is unfortunately a very simple protocol, so we enhance it a +//! little when we know that there is a rustc on the other end. Via the stderr +//! pipe we have to rustc, we get messages such as "NeedsToken" and +//! "ReleaseToken" from rustc. +//! +//! "NeedsToken" indicates that a rustc is interested in acquiring a token, but +//! never that it would be impossible to make progress without one (i.e., it +//! would be incorrect for rustc to not terminate due to a unfulfilled +//! NeedsToken request); we do not usually fulfill all NeedsToken requests for a +//! given rustc. +//! +//! "ReleaseToken" indicates that a rustc is done with one of its tokens and is +//! ready for us to re-acquire ownership -- we will either release that token +//! back into the general pool or reuse it ourselves. Note that rustc will +//! inform us that it is releasing a token even if it itself is also requesting +//! tokens; is is up to us whether to return the token to that same rustc. +//! +//! The current scheduling algorithm is relatively primitive and could likely be +//! improved. + use std::cell::Cell; use std::collections::{HashMap, HashSet}; use std::io; @@ -343,11 +394,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { break; } - // And finally, before we block waiting for the next event, drop any - // excess tokens we may have accidentally acquired. Due to how our - // jobserver interface is architected we may acquire a token that we - // don't actually use, and if this happens just relinquish it back - // to the jobserver itself. + // If we managed to acquire some extra tokens, send them off to a waiting rustc. let extra_tokens = tokens.len() - (self.active.len() - 1); for _ in 0..extra_tokens { if let Some((id, client)) = to_send_clients.pop() { @@ -358,6 +405,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { .chain_err(|| "failed to release jobserver token")?; } } + + // And finally, before we block waiting for the next event, drop any + // excess tokens we may have accidentally acquired. Due to how our + // jobserver interface is architected we may acquire a token that we + // don't actually use, and if this happens just relinquish it back + // to the jobserver itself. tokens.truncate(self.active.len() - 1); // Record some timing information if `-Ztimings` is enabled, and From 494b3c0af59fcffe22cccbccae035a5b76139ec1 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 14 Jan 2020 17:13:48 -0500 Subject: [PATCH 05/26] Gate Cargo changes behind -Zjobserver-per-rustc --- src/cargo/core/compiler/mod.rs | 24 +++++++++++++++--------- src/cargo/core/features.rs | 2 ++ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 2b9dc64ffd5..5a41f58c2d6 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -294,7 +294,7 @@ fn rustc<'a, 'cfg>( package_id, &target, &mut output_options, - Some(&rustc_client), + &rustc_client, ) }, ) @@ -544,12 +544,19 @@ fn prepare_rustc<'a, 'cfg>( cx: &mut Context<'a, 'cfg>, crate_types: &[&str], unit: &Unit<'a>, -) -> CargoResult<(Client, ProcessBuilder)> { +) -> CargoResult<(Option, ProcessBuilder)> { let is_primary = cx.is_primary_package(unit); let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; - let client = cx.new_jobserver()?; - base.inherit_jobserver(&client); + let client = if cx.bcx.config.cli_unstable().jobserver_per_rustc { + let client = cx.new_jobserver()?; + base.inherit_jobserver(&client); + base.arg("-Zjobserver-token-requests"); + Some(client) + } else { + base.inherit_jobserver(&cx.jobserver); + None + }; build_base_args(cx, &mut base, unit, crate_types)?; build_deps_args(&mut base, cx, unit)?; Ok((client, base)) @@ -612,7 +619,7 @@ fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult .exec_with_streaming( &mut |line| on_stdout_line(state, line, package_id, &target), &mut |line| { - on_stderr_line(state, line, package_id, &target, &mut output_options, None) + on_stderr_line(state, line, package_id, &target, &mut output_options, &None) }, false, ) @@ -696,7 +703,6 @@ fn add_error_format_and_color( _ => {} } cmd.arg(json); - cmd.arg("-Zjobserver-token-requests"); Ok(()) } @@ -1095,7 +1101,7 @@ fn on_stderr_line( package_id: PackageId, target: &Target, options: &mut OutputOptions, - rustc_client: Option<&Client>, + rustc_client: &Option, ) -> CargoResult<()> { if on_stderr_line_inner(state, line, package_id, target, options, rustc_client)? { // Check if caching is enabled. @@ -1117,7 +1123,7 @@ fn on_stderr_line_inner( package_id: PackageId, target: &Target, options: &mut OutputOptions, - rustc_client: Option<&Client>, + rustc_client: &Option, ) -> CargoResult { // We primarily want to use this function to process JSON messages from // rustc. The compiler should always print one JSON message per line, and @@ -1304,7 +1310,7 @@ fn replay_output_cache( break; } let trimmed = line.trim_end_matches(&['\n', '\r'][..]); - on_stderr_line(state, trimmed, package_id, &target, &mut options, None)?; + on_stderr_line(state, trimmed, package_id, &target, &mut options, &None)?; line.clear(); } Ok(()) diff --git a/src/cargo/core/features.rs b/src/cargo/core/features.rs index c34fe07880d..c2ea6622fa8 100644 --- a/src/cargo/core/features.rs +++ b/src/cargo/core/features.rs @@ -341,6 +341,7 @@ pub struct CliUnstable { pub timings: Option>, pub doctest_xcompile: bool, pub panic_abort_tests: bool, + pub jobserver_per_rustc: bool, } impl CliUnstable { @@ -409,6 +410,7 @@ impl CliUnstable { "timings" => self.timings = Some(parse_timings(v)), "doctest-xcompile" => self.doctest_xcompile = parse_empty(k, v)?, "panic-abort-tests" => self.panic_abort_tests = parse_empty(k, v)?, + "jobserver-per-rustc" => self.jobserver_per_rustc = parse_empty(k, v)?, _ => bail!("unknown `-Z` flag specified: {}", k), } From f7c6d04b05cc5df192b5e70b7d65e709902ad1db Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 17:22:25 -0500 Subject: [PATCH 06/26] Introduce newtype wrapping JobId --- src/cargo/core/compiler/job_queue.rs | 31 ++++++++++++++++++---------- src/cargo/core/compiler/timings.rs | 9 ++++---- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 9c8128241f5..bd9a049e9b2 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -87,7 +87,7 @@ pub struct JobQueue<'a, 'cfg> { queue: DependencyQueue, Artifact, Job>, tx: Sender, rx: Receiver, - active: HashMap>, + active: HashMap>, compiled: HashSet, documented: HashSet, counts: HashMap, @@ -96,13 +96,22 @@ pub struct JobQueue<'a, 'cfg> { timings: Timings<'a, 'cfg>, } +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct JobId(pub u32); + +impl std::fmt::Display for JobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. tx: Sender, /// The job id that this state is associated with, used when sending /// messages back to the main thread. - id: u32, + id: JobId, /// Whether or not we're expected to have a call to `rmeta_produced`. Once /// that method is called this is dynamically set to `false` to prevent @@ -135,19 +144,19 @@ enum Artifact { } enum Message { - Run(u32, String), + Run(JobId, String), BuildPlanMsg(String, ProcessBuilder, Arc>), Stdout(String), Stderr(String), FixDiagnostic(diagnostic_server::Message), Token(io::Result), - Finish(u32, Artifact, CargoResult<()>), + Finish(JobId, Artifact, CargoResult<()>), // This client should get release_raw called on it with one of our tokens - NeedsToken(u32, Client), + NeedsToken(JobId, Client), // A token previously passed to a NeedsToken client is being released. - ReleaseToken(u32), + ReleaseToken(JobId), } impl<'a> JobState<'a> { @@ -343,7 +352,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { ) -> CargoResult<()> { let mut tokens = Vec::new(); let mut rustc_tokens = Vec::new(); - let mut to_send_clients: Vec<(u32, Client)> = Vec::new(); + let mut to_send_clients: Vec<(JobId, Client)> = Vec::new(); let mut queue = Vec::new(); let mut print = DiagnosticPrinter::new(cx.bcx.config); trace!("queue: {:#?}", self.queue); @@ -626,8 +635,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { cx: &Context<'a, '_>, scope: &Scope<'a>, ) -> CargoResult<()> { - let id = self.next_id; - self.next_id = id.checked_add(1).unwrap(); + let id = JobId(self.next_id); + self.next_id = self.next_id.checked_add(1).unwrap(); info!("start {}: {:?}", id, unit); @@ -676,7 +685,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // to make sure nothing hangs by accident. struct FinishOnDrop<'a> { tx: &'a Sender, - id: u32, + id: JobId, result: CargoResult<()>, } @@ -737,7 +746,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { fn finish( &mut self, - id: u32, + id: JobId, unit: &Unit<'a>, artifact: Artifact, cx: &mut Context<'a, '_>, diff --git a/src/cargo/core/compiler/timings.rs b/src/cargo/core/compiler/timings.rs index 4cd151b547c..ff24e77b10a 100644 --- a/src/cargo/core/compiler/timings.rs +++ b/src/cargo/core/compiler/timings.rs @@ -3,6 +3,7 @@ //! This module implements some simple tracking information for timing of how //! long it takes for different units to compile. use super::{CompileMode, Unit}; +use crate::core::compiler::job_queue::JobId; use crate::core::compiler::BuildContext; use crate::core::PackageId; use crate::util::cpu::State; @@ -41,7 +42,7 @@ pub struct Timings<'a, 'cfg> { unit_times: Vec>, /// Units that are in the process of being built. /// When they finished, they are moved to `unit_times`. - active: HashMap>, + active: HashMap>, /// Concurrency-tracking information. This is periodically updated while /// compilation progresses. concurrency: Vec, @@ -144,7 +145,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that a unit has started running. - pub fn unit_start(&mut self, id: u32, unit: Unit<'a>) { + pub fn unit_start(&mut self, id: JobId, unit: Unit<'a>) { if !self.enabled { return; } @@ -178,7 +179,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that the `.rmeta` file as generated. - pub fn unit_rmeta_finished(&mut self, id: u32, unlocked: Vec<&Unit<'a>>) { + pub fn unit_rmeta_finished(&mut self, id: JobId, unlocked: Vec<&Unit<'a>>) { if !self.enabled { return; } @@ -196,7 +197,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that a unit has finished running. - pub fn unit_finished(&mut self, id: u32, unlocked: Vec<&Unit<'a>>) { + pub fn unit_finished(&mut self, id: JobId, unlocked: Vec<&Unit<'a>>) { if !self.enabled { return; } From e2f4ce114de75a1643ea790d7eca14b03f8a4083 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 18:17:50 -0500 Subject: [PATCH 07/26] Drop crossbeam scopes from job queue These were unused for a long time (comment added in April 2019) and add some complexity to API design. --- src/cargo/core/compiler/job_queue.rs | 125 ++++++++++++--------------- 1 file changed, 53 insertions(+), 72 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index bd9a049e9b2..355f2257c4b 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -59,7 +59,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::format_err; -use crossbeam_utils::thread::Scope; use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; @@ -333,21 +332,13 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { .take() .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); - // Use `crossbeam` to create a scope in which we can execute scoped - // threads. Note that this isn't currently required by Cargo but it was - // historically required. This is left in for now in case we need the - // `'a` ability for child threads in the near future, but if this - // comment has been sitting here for a long time feel free to refactor - // away crossbeam. - crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper)) - .expect("child threads shouldn't panic") + self.drain_the_queue(cx, plan, &helper) } fn drain_the_queue( &mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan, - scope: &Scope<'a>, jobserver_helper: &HelperThread, ) -> CargoResult<()> { let mut tokens = Vec::new(); @@ -387,7 +378,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // we're able to perform some parallel work. while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() { let (unit, job) = queue.remove(0); - self.run(&unit, job, cx, scope)?; + self.run(&unit, job, cx)?; } info!( @@ -626,15 +617,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } - /// Executes a job in the `scope` given, pushing the spawned thread's - /// handled onto `threads`. - fn run( - &mut self, - unit: &Unit<'a>, - job: Job, - cx: &Context<'a, '_>, - scope: &Scope<'a>, - ) -> CargoResult<()> { + /// Executes a job, pushing the spawned thread's handled onto `threads`. + fn run(&mut self, unit: &Unit<'a>, job: Job, cx: &Context<'a, '_>) -> CargoResult<()> { let id = JobId(self.next_id); self.next_id = self.next_id.checked_add(1).unwrap(); @@ -646,56 +630,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let my_tx = self.tx.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); - let doit = move || { - let state = JobState { - id, - tx: my_tx.clone(), - rmeta_required: Cell::new(rmeta_required), - _marker: marker::PhantomData, - }; - - let mut sender = FinishOnDrop { - tx: &my_tx, - id, - result: Err(format_err!("worker panicked")), - }; - sender.result = job.run(&state); - - // If the `rmeta_required` wasn't consumed but it was set - // previously, then we either have: - // - // 1. The `job` didn't do anything because it was "fresh". - // 2. The `job` returned an error and didn't reach the point where - // it called `rmeta_produced`. - // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo. - // - // Ruling out the third, the other two are pretty common for 2 - // we'll just naturally abort the compilation operation but for 1 - // we need to make sure that the metadata is flagged as produced so - // send a synthetic message here. - if state.rmeta_required.get() && sender.result.is_ok() { - my_tx - .send(Message::Finish(id, Artifact::Metadata, Ok(()))) - .unwrap(); - } - - // Use a helper struct with a `Drop` implementation to guarantee - // that a `Finish` message is sent even if our job panics. We - // shouldn't panic unless there's a bug in Cargo, so we just need - // to make sure nothing hangs by accident. - struct FinishOnDrop<'a> { - tx: &'a Sender, - id: JobId, - result: CargoResult<()>, - } - - impl Drop for FinishOnDrop<'_> { - fn drop(&mut self) { - let msg = mem::replace(&mut self.result, Ok(())); - drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); - } - } - }; if !cx.bcx.build_config.build_plan { // Print out some nice progress information. @@ -705,11 +639,58 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { match fresh { Freshness::Fresh => { self.timings.add_fresh(); - doit() } Freshness::Dirty => { self.timings.add_dirty(); - scope.spawn(move |_| doit()); + } + } + + let state = JobState { + id, + tx: my_tx.clone(), + rmeta_required: Cell::new(rmeta_required), + _marker: marker::PhantomData, + }; + + let mut sender = FinishOnDrop { + tx: &my_tx, + id, + result: Err(format_err!("worker panicked")), + }; + sender.result = job.run(&state); + + // If the `rmeta_required` wasn't consumed but it was set + // previously, then we either have: + // + // 1. The `job` didn't do anything because it was "fresh". + // 2. The `job` returned an error and didn't reach the point where + // it called `rmeta_produced`. + // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo. + // + // Ruling out the third, the other two are pretty common for 2 + // we'll just naturally abort the compilation operation but for 1 + // we need to make sure that the metadata is flagged as produced so + // send a synthetic message here. + if state.rmeta_required.get() && sender.result.is_ok() { + my_tx + .send(Message::Finish(id, Artifact::Metadata, Ok(()))) + .unwrap(); + } + + // Use a helper struct with a `Drop` implementation to guarantee + // that a `Finish` message is sent even if our job panics. We + // shouldn't panic unless there's a bug in Cargo, so we just need + // to make sure nothing hangs by accident. + struct FinishOnDrop<'a> { + tx: &'a Sender, + id: JobId, + result: CargoResult<()>, + } + + impl Drop for FinishOnDrop<'_> { + fn drop(&mut self) { + let msg = mem::replace(&mut self.result, Ok(())); + drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); } } From 90ef289c707443eecac44f0d3bae2f2caabd9caa Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:03:58 -0500 Subject: [PATCH 08/26] Move local variables to struct fields This will facilitate splitting drain_the_queue into methods --- src/cargo/core/compiler/job_queue.rs | 69 +++++++++++++++------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 355f2257c4b..cf71e6667a1 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -93,6 +93,12 @@ pub struct JobQueue<'a, 'cfg> { progress: Progress<'cfg>, next_id: u32, timings: Timings<'a, 'cfg>, + + tokens: Vec, + rustc_tokens: Vec<(JobId, Acquired)>, + to_send_clients: Vec<(JobId, Client)>, + pending_queue: Vec<(Unit<'a>, Job)>, + print: DiagnosticPrinter<'cfg>, } #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] @@ -227,6 +233,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { progress, next_id: 0, timings, + + tokens: Vec::new(), + rustc_tokens: Vec::new(), + to_send_clients: Vec::new(), + pending_queue: Vec::new(), + print: DiagnosticPrinter::new(bcx.config), } } @@ -341,11 +353,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { plan: &mut BuildPlan, jobserver_helper: &HelperThread, ) -> CargoResult<()> { - let mut tokens = Vec::new(); - let mut rustc_tokens = Vec::new(); - let mut to_send_clients: Vec<(JobId, Client)> = Vec::new(); - let mut queue = Vec::new(); - let mut print = DiagnosticPrinter::new(cx.bcx.config); trace!("queue: {:#?}", self.queue); // Iteratively execute the entire dependency graph. Each turn of the @@ -367,8 +374,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // start requesting job tokens. Each job after the first needs to // request a token. while let Some((unit, job)) = self.queue.dequeue() { - queue.push((unit, job)); - if self.active.len() + queue.len() > 1 { + self.pending_queue.push((unit, job)); + if self.active.len() + self.pending_queue.len() > 1 { jobserver_helper.request_token(); } } @@ -376,16 +383,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Now that we've learned of all possible work that we can execute // try to spawn it so long as we've got a jobserver token which says // we're able to perform some parallel work. - while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() { - let (unit, job) = queue.remove(0); + while error.is_none() && self.active.len() < self.tokens.len() + 1 && !self.pending_queue.is_empty() { + let (unit, job) = self.pending_queue.remove(0); self.run(&unit, job, cx)?; } info!( "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}", - tokens.len(), - rustc_tokens.len(), - to_send_clients.len() + self.tokens.len(), + self.rustc_tokens.len(), + self.to_send_clients.len() ); // If after all that we're not actually running anything then we're @@ -395,11 +402,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } // If we managed to acquire some extra tokens, send them off to a waiting rustc. - let extra_tokens = tokens.len() - (self.active.len() - 1); + let extra_tokens = self.tokens.len() - (self.active.len() - 1); for _ in 0..extra_tokens { - if let Some((id, client)) = to_send_clients.pop() { - let token = tokens.pop().expect("an extra token"); - rustc_tokens.push((id, token)); + if let Some((id, client)) = self.to_send_clients.pop() { + let token = self.tokens.pop().expect("an extra token"); + self.rustc_tokens.push((id, token)); client .release_raw() .chain_err(|| "failed to release jobserver token")?; @@ -411,16 +418,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // jobserver interface is architected we may acquire a token that we // don't actually use, and if this happens just relinquish it back // to the jobserver itself. - tokens.truncate(self.active.len() - 1); + self.tokens.truncate(self.active.len() - 1); // Record some timing information if `-Ztimings` is enabled, and // this'll end up being a noop if we're not recording this // information. self.timings.mark_concurrency( self.active.len(), - queue.len(), + self.pending_queue.len(), self.queue.len(), - rustc_tokens.len(), + self.rustc_tokens.len(), ); self.timings.record_cpu(); @@ -462,7 +469,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { shell.err().write_all(b"\n")?; } Message::FixDiagnostic(msg) => { - print.print(&msg)?; + self.print.print(&msg)?; } Message::Finish(id, artifact, result) => { let unit = match artifact { @@ -472,17 +479,17 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { info!("end: {:?}", id); finished += 1; while let Some(pos) = - rustc_tokens.iter().position(|(i, _)| *i == id) + self.rustc_tokens.iter().position(|(i, _)| *i == id) { // push all the leftover tokens back into // the token list - tokens.push(rustc_tokens.remove(pos).1); + self.tokens.push(self.rustc_tokens.remove(pos).1); } while let Some(pos) = - to_send_clients.iter().position(|(i, _)| *i == id) + self.to_send_clients.iter().position(|(i, _)| *i == id) { // drain all the pending clients - to_send_clients.remove(pos); + self.to_send_clients.remove(pos); } self.active.remove(&id).unwrap() } @@ -516,26 +523,26 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Message::Token(acquired_token) => { let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?; - if let Some((id, client)) = to_send_clients.pop() { - rustc_tokens.push((id, token)); + if let Some((id, client)) = self.to_send_clients.pop() { + self.rustc_tokens.push((id, token)); client .release_raw() .chain_err(|| "failed to release jobserver token")?; } else { - tokens.push(token); + self.tokens.push(token); } } Message::NeedsToken(id, client) => { log::info!("queue token request"); jobserver_helper.request_token(); - to_send_clients.push((id, client)); + self.to_send_clients.push((id, client)); } Message::ReleaseToken(id) => { // Note that this pops off potentially a completely // different token, but all tokens of the same job are // conceptually the same so that's fine. - if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) { - tokens.push(rustc_tokens.remove(pos).1); + if let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) { + self.tokens.push(self.rustc_tokens.remove(pos).1); } else { panic!( "This job (id={}) does not have tokens associated with it", @@ -570,7 +577,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { if let Some(e) = error { Err(e) - } else if self.queue.is_empty() && queue.is_empty() { + } else if self.queue.is_empty() && self.pending_queue.is_empty() { let message = format!( "{} [{}] target(s) in {}", profile_name, opt_type, time_elapsed From f07fbb2f025bbd2578346e9da1ad31b41c7ac076 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:07:17 -0500 Subject: [PATCH 09/26] Split out work spawning --- src/cargo/core/compiler/job_queue.rs | 52 ++++++++++++++++++---------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index cf71e6667a1..a79cede9cf1 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -347,6 +347,39 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { self.drain_the_queue(cx, plan, &helper) } + fn spawn_work_if_possible( + &mut self, + cx: &mut Context<'a, '_>, + jobserver_helper: &HelperThread, + has_errored: bool, + ) -> CargoResult<()> { + // Dequeue as much work as we can, learning about everything + // possible that can run. Note that this is also the point where we + // start requesting job tokens. Each job after the first needs to + // request a token. + while let Some((unit, job)) = self.queue.dequeue() { + self.pending_queue.push((unit, job)); + if self.active.len() + self.pending_queue.len() > 1 { + jobserver_helper.request_token(); + } + } + + // Do not actually spawn the new work if we've errored out + if has_errored { + return Ok(()); + } + + // Now that we've learned of all possible work that we can execute + // try to spawn it so long as we've got a jobserver token which says + // we're able to perform some parallel work. + while self.active.len() < self.tokens.len() + 1 && !self.pending_queue.is_empty() { + let (unit, job) = self.pending_queue.remove(0); + self.run(&unit, job, cx)?; + } + + Ok(()) + } + fn drain_the_queue( &mut self, cx: &mut Context<'a, '_>, @@ -369,24 +402,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let total = self.queue.len(); let mut finished = 0; loop { - // Dequeue as much work as we can, learning about everything - // possible that can run. Note that this is also the point where we - // start requesting job tokens. Each job after the first needs to - // request a token. - while let Some((unit, job)) = self.queue.dequeue() { - self.pending_queue.push((unit, job)); - if self.active.len() + self.pending_queue.len() > 1 { - jobserver_helper.request_token(); - } - } - - // Now that we've learned of all possible work that we can execute - // try to spawn it so long as we've got a jobserver token which says - // we're able to perform some parallel work. - while error.is_none() && self.active.len() < self.tokens.len() + 1 && !self.pending_queue.is_empty() { - let (unit, job) = self.pending_queue.remove(0); - self.run(&unit, job, cx)?; - } + self.spawn_work_if_possible(cx, jobserver_helper, error.is_some())?; info!( "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}", From f6deb98acecb05b735c16117f4b5014f7fe88704 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:10:46 -0500 Subject: [PATCH 10/26] Split out granting rustc token requests --- src/cargo/core/compiler/job_queue.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index a79cede9cf1..69b523160c9 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -380,6 +380,22 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Ok(()) } + fn grant_rustc_token_requests(&mut self) -> CargoResult<()> { + // If we managed to acquire some extra tokens, send them off to a waiting rustc. + let extra_tokens = self.tokens.len() - (self.active.len() - 1); + for _ in 0..extra_tokens { + if let Some((id, client)) = self.to_send_clients.pop() { + let token = self.tokens.pop().expect("an extra token"); + self.rustc_tokens.push((id, token)); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } + } + + Ok(()) + } + fn drain_the_queue( &mut self, cx: &mut Context<'a, '_>, @@ -417,17 +433,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { break; } - // If we managed to acquire some extra tokens, send them off to a waiting rustc. - let extra_tokens = self.tokens.len() - (self.active.len() - 1); - for _ in 0..extra_tokens { - if let Some((id, client)) = self.to_send_clients.pop() { - let token = self.tokens.pop().expect("an extra token"); - self.rustc_tokens.push((id, token)); - client - .release_raw() - .chain_err(|| "failed to release jobserver token")?; - } - } + self.grant_rustc_token_requests()?; // And finally, before we block waiting for the next event, drop any // excess tokens we may have accidentally acquired. Due to how our From 445de0ed4f3825c114c21e467471625b3e5181f4 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:18:41 -0500 Subject: [PATCH 11/26] Split out event handler into separate function --- src/cargo/core/compiler/job_queue.rs | 232 ++++++++++++++------------- 1 file changed, 123 insertions(+), 109 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 69b523160c9..adefd61835e 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -99,6 +99,9 @@ pub struct JobQueue<'a, 'cfg> { to_send_clients: Vec<(JobId, Client)>, pending_queue: Vec<(Unit<'a>, Job)>, print: DiagnosticPrinter<'cfg>, + + // How many jobs we've finished + finished: usize, } #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] @@ -239,6 +242,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { to_send_clients: Vec::new(), pending_queue: Vec::new(), print: DiagnosticPrinter::new(bcx.config), + finished: 0, } } @@ -396,6 +400,116 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Ok(()) } + fn handle_event( + &mut self, + cx: &mut Context<'a, '_>, + jobserver_helper: &HelperThread, + plan: &mut BuildPlan, + event: Message, + ) -> CargoResult> { + match event { + Message::Run(id, cmd) => { + cx.bcx + .config + .shell() + .verbose(|c| c.status("Running", &cmd))?; + self.timings.unit_start(id, self.active[&id]); + } + Message::BuildPlanMsg(module_name, cmd, filenames) => { + plan.update(&module_name, &cmd, &filenames)?; + } + Message::Stdout(out) => { + cx.bcx.config.shell().stdout_println(out); + } + Message::Stderr(err) => { + let mut shell = cx.bcx.config.shell(); + shell.print_ansi(err.as_bytes())?; + shell.err().write_all(b"\n")?; + } + Message::FixDiagnostic(msg) => { + self.print.print(&msg)?; + } + Message::Finish(id, artifact, result) => { + let unit = match artifact { + // If `id` has completely finished we remove it + // from the `active` map ... + Artifact::All => { + info!("end: {:?}", id); + self.finished += 1; + while let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) { + // push all the leftover tokens back into + // the token list + self.tokens.push(self.rustc_tokens.remove(pos).1); + } + while let Some(pos) = + self.to_send_clients.iter().position(|(i, _)| *i == id) + { + // drain all the pending clients + self.to_send_clients.remove(pos); + } + self.active.remove(&id).unwrap() + } + // ... otherwise if it hasn't finished we leave it + // in there as we'll get another `Finish` later on. + Artifact::Metadata => { + info!("end (meta): {:?}", id); + self.active[&id] + } + }; + info!("end ({:?}): {:?}", unit, result); + match result { + Ok(()) => self.finish(id, &unit, artifact, cx)?, + Err(e) => { + let msg = "The following warnings were emitted during compilation:"; + self.emit_warnings(Some(msg), &unit, cx)?; + + if !self.active.is_empty() { + handle_error(&e, &mut *cx.bcx.config.shell()); + cx.bcx.config.shell().warn( + "build failed, waiting for other \ + jobs to finish...", + )?; + return Ok(Some(anyhow::format_err!("build failed"))); + } else { + return Ok(Some(e)); + } + } + } + } + Message::Token(acquired_token) => { + let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?; + if let Some((id, client)) = self.to_send_clients.pop() { + self.rustc_tokens.push((id, token)); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } else { + self.tokens.push(token); + } + } + Message::NeedsToken(id, client) => { + log::info!("queue token request"); + jobserver_helper.request_token(); + self.to_send_clients.push((id, client)); + } + Message::ReleaseToken(id) => { + // Note that this pops off potentially a completely + // different token, but all tokens of the same job are + // conceptually the same so that's fine. + if let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) { + self.tokens.push(self.rustc_tokens.remove(pos).1); + } else { + panic!( + "This job (id={}) does not have tokens associated with it", + id + ); + } + } + } + + Ok(None) + } + fn drain_the_queue( &mut self, cx: &mut Context<'a, '_>, @@ -415,8 +529,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // successful and otherwise wait for pending work to finish if it failed // and then immediately return. let mut error = None; - let total = self.queue.len(); - let mut finished = 0; loop { self.spawn_work_if_possible(cx, jobserver_helper, error.is_some())?; @@ -461,7 +573,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // previous parts of the loop again. let events: Vec<_> = self.rx.try_iter().collect(); let events = if events.is_empty() { - self.show_progress(finished, total); + self.show_progress(); match self.rx.recv_timeout(Duration::from_millis(500)) { Ok(message) => vec![message], Err(_) => continue, @@ -471,107 +583,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { }; for event in events { - match event { - Message::Run(id, cmd) => { - cx.bcx - .config - .shell() - .verbose(|c| c.status("Running", &cmd))?; - self.timings.unit_start(id, self.active[&id]); - } - Message::BuildPlanMsg(module_name, cmd, filenames) => { - plan.update(&module_name, &cmd, &filenames)?; - } - Message::Stdout(out) => { - cx.bcx.config.shell().stdout_println(out); - } - Message::Stderr(err) => { - let mut shell = cx.bcx.config.shell(); - shell.print_ansi(err.as_bytes())?; - shell.err().write_all(b"\n")?; - } - Message::FixDiagnostic(msg) => { - self.print.print(&msg)?; - } - Message::Finish(id, artifact, result) => { - let unit = match artifact { - // If `id` has completely finished we remove it - // from the `active` map ... - Artifact::All => { - info!("end: {:?}", id); - finished += 1; - while let Some(pos) = - self.rustc_tokens.iter().position(|(i, _)| *i == id) - { - // push all the leftover tokens back into - // the token list - self.tokens.push(self.rustc_tokens.remove(pos).1); - } - while let Some(pos) = - self.to_send_clients.iter().position(|(i, _)| *i == id) - { - // drain all the pending clients - self.to_send_clients.remove(pos); - } - self.active.remove(&id).unwrap() - } - // ... otherwise if it hasn't finished we leave it - // in there as we'll get another `Finish` later on. - Artifact::Metadata => { - info!("end (meta): {:?}", id); - self.active[&id] - } - }; - info!("end ({:?}): {:?}", unit, result); - match result { - Ok(()) => self.finish(id, &unit, artifact, cx)?, - Err(e) => { - let msg = "The following warnings were emitted during compilation:"; - self.emit_warnings(Some(msg), &unit, cx)?; - - if !self.active.is_empty() { - error = Some(anyhow::format_err!("build failed")); - handle_error(&e, &mut *cx.bcx.config.shell()); - cx.bcx.config.shell().warn( - "build failed, waiting for other \ - jobs to finish...", - )?; - } else { - error = Some(e); - } - } - } - } - Message::Token(acquired_token) => { - let token = - acquired_token.chain_err(|| "failed to acquire jobserver token")?; - if let Some((id, client)) = self.to_send_clients.pop() { - self.rustc_tokens.push((id, token)); - client - .release_raw() - .chain_err(|| "failed to release jobserver token")?; - } else { - self.tokens.push(token); - } - } - Message::NeedsToken(id, client) => { - log::info!("queue token request"); - jobserver_helper.request_token(); - self.to_send_clients.push((id, client)); - } - Message::ReleaseToken(id) => { - // Note that this pops off potentially a completely - // different token, but all tokens of the same job are - // conceptually the same so that's fine. - if let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) { - self.tokens.push(self.rustc_tokens.remove(pos).1); - } else { - panic!( - "This job (id={}) does not have tokens associated with it", - id - ); - } - } + if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? { + error = Some(err); } } } @@ -615,16 +628,17 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } - fn show_progress(&mut self, count: usize, total: usize) { + fn show_progress(&mut self) { let active_names = self .active .values() .map(|u| self.name_for_progress(u)) .collect::>(); - drop( - self.progress - .tick_now(count, total, &format!(": {}", active_names.join(", "))), - ); + drop(self.progress.tick_now( + self.finished, + self.queue.len(), + &format!(": {}", active_names.join(", ")), + )); } fn name_for_progress(&self, unit: &Unit<'_>) -> String { From 0d722a4d83b43fb1534c487c2bd4867ead5862f2 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:24:05 -0500 Subject: [PATCH 12/26] Split waiting for an event out of the primary drainer This has the slight behavior change where we won't ask for new dependencies and so forth if no events have been received but I believe that there's no activity that can happen if an event hasn't occurred (i.e., no state change has occurred) so there's no need for us to actually do anything in practice. To make sure we still record CPU usage and such sufficiently often that is also moved into the inner "waiting for events" loop. --- src/cargo/core/compiler/job_queue.rs | 69 +++++++++++++++------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index adefd61835e..ff29bfaa382 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -510,6 +510,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Ok(None) } + // This will also tick the progress bar as appropriate + fn wait_for_events(&mut self) -> Vec { + // Drain all events at once to avoid displaying the progress bar + // unnecessarily. If there's no events we actually block waiting for + // an event, but we keep a "heartbeat" going to allow `record_cpu` + // to run above to calculate CPU usage over time. To do this we + // listen for a message with a timeout, and on timeout we run the + // previous parts of the loop again. + let events: Vec<_> = self.rx.try_iter().collect(); + if events.is_empty() { + loop { + self.tick_progress(); + match self.rx.recv_timeout(Duration::from_millis(500)) { + Ok(message) => break vec![message], + Err(_) => continue, + } + } + } else { + events + } + } + fn drain_the_queue( &mut self, cx: &mut Context<'a, '_>, @@ -553,36 +575,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // don't actually use, and if this happens just relinquish it back // to the jobserver itself. self.tokens.truncate(self.active.len() - 1); - - // Record some timing information if `-Ztimings` is enabled, and - // this'll end up being a noop if we're not recording this - // information. - self.timings.mark_concurrency( - self.active.len(), - self.pending_queue.len(), - self.queue.len(), - self.rustc_tokens.len(), - ); - self.timings.record_cpu(); - - // Drain all events at once to avoid displaying the progress bar - // unnecessarily. If there's no events we actually block waiting for - // an event, but we keep a "heartbeat" going to allow `record_cpu` - // to run above to calculate CPU usage over time. To do this we - // listen for a message with a timeout, and on timeout we run the - // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); - let events = if events.is_empty() { - self.show_progress(); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => vec![message], - Err(_) => continue, - } - } else { - events - }; - - for event in events { + for event in self.wait_for_events() { if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? { error = Some(err); } @@ -628,7 +621,21 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } - fn show_progress(&mut self) { + // This also records CPU usage and marks concurrency; we roughly want to do + // this as often as we spin on the events receiver (at least every 500ms or + // so). + fn tick_progress(&mut self) { + // Record some timing information if `-Ztimings` is enabled, and + // this'll end up being a noop if we're not recording this + // information. + self.timings.mark_concurrency( + self.active.len(), + self.pending_queue.len(), + self.queue.len(), + self.rustc_tokens.len(), + ); + self.timings.record_cpu(); + let active_names = self .active .values() From b448d92399c1bf67b8d135267d052cf93b2164ab Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:35:51 -0500 Subject: [PATCH 13/26] Do not send acquired tokens to waiting rustc threads This removes the ad-hoc token re-send in the message processing; this sort of decision should be left up to the main loop which manages tokens. Notably, the behavior change here is that new tokens will go solely to spawning new rustc *processes* rather than increasing rustc internal parallelism, unless we can't spawn new processes. Otherwise, before this commit, we may be saturating a single rustc with tokens rather than creating lots of rustcs that can work in parallel. In particular in the beginning of a build, it's likely that this is worse (i.e., crates are small and rustc internal parallelism is not at that point all that helpful) since it severely limits the benefits of pipelining and generally makes the build nearly serial. --- src/cargo/core/compiler/job_queue.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index ff29bfaa382..fa8bee9b4a8 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -478,14 +478,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } Message::Token(acquired_token) => { let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?; - if let Some((id, client)) = self.to_send_clients.pop() { - self.rustc_tokens.push((id, token)); - client - .release_raw() - .chain_err(|| "failed to release jobserver token")?; - } else { - self.tokens.push(token); - } + self.tokens.push(token); } Message::NeedsToken(id, client) => { log::info!("queue token request"); From ae7aba2302843c8c43c534194fc2197f925ac063 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:45:43 -0500 Subject: [PATCH 14/26] Refactor rustc_tokens to a HashMap --- src/cargo/core/compiler/job_queue.rs | 31 ++++++++++++++++------------ 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index fa8bee9b4a8..bedbf19f3f4 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -95,7 +95,7 @@ pub struct JobQueue<'a, 'cfg> { timings: Timings<'a, 'cfg>, tokens: Vec, - rustc_tokens: Vec<(JobId, Acquired)>, + rustc_tokens: HashMap>, to_send_clients: Vec<(JobId, Client)>, pending_queue: Vec<(Unit<'a>, Job)>, print: DiagnosticPrinter<'cfg>, @@ -238,7 +238,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { timings, tokens: Vec::new(), - rustc_tokens: Vec::new(), + rustc_tokens: HashMap::new(), to_send_clients: Vec::new(), pending_queue: Vec::new(), print: DiagnosticPrinter::new(bcx.config), @@ -390,7 +390,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { for _ in 0..extra_tokens { if let Some((id, client)) = self.to_send_clients.pop() { let token = self.tokens.pop().expect("an extra token"); - self.rustc_tokens.push((id, token)); + self.rustc_tokens + .entry(id) + .or_insert_with(Vec::new) + .push(token); client .release_raw() .chain_err(|| "failed to release jobserver token")?; @@ -436,10 +439,14 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Artifact::All => { info!("end: {:?}", id); self.finished += 1; - while let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) { - // push all the leftover tokens back into - // the token list - self.tokens.push(self.rustc_tokens.remove(pos).1); + if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) { + // This puts back the tokens that this rustc + // acquired into our primary token list. + // + // FIXME: this represents a rustc bug: it did not + // release all of its thread tokens but finished + // completely. + self.tokens.extend(rustc_tokens); } while let Some(pos) = self.to_send_clients.iter().position(|(i, _)| *i == id) @@ -489,13 +496,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Note that this pops off potentially a completely // different token, but all tokens of the same job are // conceptually the same so that's fine. - if let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) { - self.tokens.push(self.rustc_tokens.remove(pos).1); + if let Some(rustc_tokens) = self.rustc_tokens.get_mut(&id) { + self.tokens + .push(rustc_tokens.pop().expect("rustc releases token it has")); } else { - panic!( - "This job (id={}) does not have tokens associated with it", - id - ); + panic!("This job does not have tokens associated with it"); } } } From c5f4690fd96ffae3f13222641fcaf3e7118579c3 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 21:02:37 -0500 Subject: [PATCH 15/26] Document ordering constraint on providing thread tokens --- src/cargo/core/compiler/job_queue.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index bedbf19f3f4..5582be48020 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -96,6 +96,9 @@ pub struct JobQueue<'a, 'cfg> { tokens: Vec, rustc_tokens: HashMap>, + + // We use a vec here as we don't want to order randomly which rustc we give + // tokens to. to_send_clients: Vec<(JobId, Client)>, pending_queue: Vec<(Unit<'a>, Job)>, print: DiagnosticPrinter<'cfg>, From 78afa068c2f507fa9d105fe9be3d1cc69d3866fd Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 16 Jan 2020 17:44:50 -0500 Subject: [PATCH 16/26] Take JobQueue by-value in drain_the_queue --- src/cargo/core/compiler/job_queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 5582be48020..0a872629675 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -326,7 +326,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { /// This function will spawn off `config.jobs()` workers to build all of the /// necessary dependencies, in order. Freshness is propagated as far as /// possible along each dependency chain. - pub fn execute(&mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> { + pub fn execute(mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); @@ -534,7 +534,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } fn drain_the_queue( - &mut self, + mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan, jobserver_helper: &HelperThread, From 1d9fdee250152d1f689370170553f5660498e074 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 16 Jan 2020 18:01:01 -0500 Subject: [PATCH 17/26] Move transient state to separate struct --- src/cargo/core/compiler/job_queue.rs | 66 ++++++++++++++++++---------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 0a872629675..2b4cb351015 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -77,12 +77,23 @@ use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder use crate::util::{Config, DependencyQueue}; use crate::util::{Progress, ProgressStyle}; -/// A management structure of the entire dependency graph to compile. -/// +/// This structure is backed by the `DependencyQueue` type and manages the +/// queueing of compilation steps for each package. Packages enqueue units of +/// work and then later on the entire graph is converted to DrainState and +/// executed. +pub struct JobQueue<'a, 'cfg> { + queue: DependencyQueue, Artifact, Job>, + counts: HashMap, + timings: Timings<'a, 'cfg>, +} + /// This structure is backed by the `DependencyQueue` type and manages the /// actual compilation step of each package. Packages enqueue units of work and /// then later on the entire graph is processed and compiled. -pub struct JobQueue<'a, 'cfg> { +/// +/// It is created from JobQueue when we have fully assembled the crate graph +/// (i.e., all package dependencies are known). +struct DrainState<'a, 'cfg> { queue: DependencyQueue, Artifact, Job>, tx: Sender, rx: Receiver, @@ -225,27 +236,10 @@ impl<'a> JobState<'a> { impl<'a, 'cfg> JobQueue<'a, 'cfg> { pub fn new(bcx: &BuildContext<'a, 'cfg>, root_units: &[Unit<'a>]) -> JobQueue<'a, 'cfg> { - let (tx, rx) = channel(); - let progress = Progress::with_style("Building", ProgressStyle::Ratio, bcx.config); - let timings = Timings::new(bcx, root_units); JobQueue { queue: DependencyQueue::new(), - tx, - rx, - active: HashMap::new(), - compiled: HashSet::new(), - documented: HashSet::new(), counts: HashMap::new(), - progress, - next_id: 0, - timings, - - tokens: Vec::new(), - rustc_tokens: HashMap::new(), - to_send_clients: Vec::new(), - pending_queue: Vec::new(), - print: DiagnosticPrinter::new(bcx.config), - finished: 0, + timings: Timings::new(bcx, root_units), } } @@ -330,8 +324,30 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); + let (tx, rx) = channel(); + let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config); + let state = DrainState { + queue: self.queue, + tx, + rx, + active: HashMap::new(), + compiled: HashSet::new(), + documented: HashSet::new(), + counts: self.counts, + progress, + next_id: 0, + timings: self.timings, + + tokens: Vec::new(), + rustc_tokens: HashMap::new(), + to_send_clients: Vec::new(), + pending_queue: Vec::new(), + print: DiagnosticPrinter::new(cx.bcx.config), + finished: 0, + }; + // Create a helper thread for acquiring jobserver tokens - let tx = self.tx.clone(); + let tx = state.tx.clone(); let helper = cx .jobserver .clone() @@ -342,7 +358,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Create a helper thread to manage the diagnostics for rustfix if // necessary. - let tx = self.tx.clone(); + let tx = state.tx.clone(); let _diagnostic_server = cx .bcx .build_config @@ -351,9 +367,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { .take() .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); - self.drain_the_queue(cx, plan, &helper) + state.drain_the_queue(cx, plan, &helper) } +} +impl<'a, 'cfg> DrainState<'a, 'cfg> { fn spawn_work_if_possible( &mut self, cx: &mut Context<'a, '_>, From 5a9af54205a1721379a603245ae9c9daf1446d13 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 16 Jan 2020 18:25:02 -0500 Subject: [PATCH 18/26] Add some commentary --- src/cargo/core/compiler/job_queue.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 2b4cb351015..4bde576dd2b 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -105,12 +105,28 @@ struct DrainState<'a, 'cfg> { next_id: u32, timings: Timings<'a, 'cfg>, + /// Tokens that are currently owned by this Cargo, and may be "associated" + /// with a rustc process. They may also be unused, though if so will be + /// dropped on the next loop iteration. + /// + /// Note that the length of this may be zero, but we will still spawn work, + /// as we share the implicit token given to this Cargo process with a + /// single rustc process. tokens: Vec, + + /// rustc per-thread tokens, when in jobserver-per-rustc mode. rustc_tokens: HashMap>, + /// This represents the list of rustc jobs (processes) and associated + /// clients that are interested in receiving a token. Note that each process + /// may be present many times (if it has requested multiple tokens). // We use a vec here as we don't want to order randomly which rustc we give // tokens to. to_send_clients: Vec<(JobId, Client)>, + + /// The list of jobs that we have not yet started executing, but have + /// retrieved from the `queue`. We eagerly pull jobs off the main queue to + /// allow us to request jobserver tokens pretty early. pending_queue: Vec<(Unit<'a>, Job)>, print: DiagnosticPrinter<'cfg>, From 1988dd92b8c5e0f04a415973e7d42277ed41bdc1 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 16 Jan 2020 18:28:03 -0500 Subject: [PATCH 19/26] Pop thread token requests from the front This ensures we have a first come first served ordering for both thread and process tokens. --- src/cargo/core/compiler/job_queue.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 4bde576dd2b..5411579f1b0 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -425,7 +425,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // If we managed to acquire some extra tokens, send them off to a waiting rustc. let extra_tokens = self.tokens.len() - (self.active.len() - 1); for _ in 0..extra_tokens { - if let Some((id, client)) = self.to_send_clients.pop() { + if !self.to_send_clients.is_empty() { + // remove from the front so we grant the token to the oldest + // waiter + let (id, client) = self.to_send_clients.remove(0); let token = self.tokens.pop().expect("an extra token"); self.rustc_tokens .entry(id) From 50a608340436eff052295305d02e5c5aab255903 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 16 Jan 2020 20:16:36 -0500 Subject: [PATCH 20/26] Reintroduce crossbeam threads Turns out, these are actually necessary -- we will deadlock otherwise, though it's not entirely obvious why. --- src/cargo/core/compiler/job_queue.rs | 120 +++++++++++++++------------ 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 5411579f1b0..06d93180bd4 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -59,6 +59,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::format_err; +use crossbeam_utils::thread::Scope; use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; @@ -383,7 +384,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { .take() .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); - state.drain_the_queue(cx, plan, &helper) + crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper)) + .expect("child threads shouldn't panic") } } @@ -392,6 +394,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { &mut self, cx: &mut Context<'a, '_>, jobserver_helper: &HelperThread, + scope: &Scope<'_>, has_errored: bool, ) -> CargoResult<()> { // Dequeue as much work as we can, learning about everything @@ -415,7 +418,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // we're able to perform some parallel work. while self.active.len() < self.tokens.len() + 1 && !self.pending_queue.is_empty() { let (unit, job) = self.pending_queue.remove(0); - self.run(&unit, job, cx)?; + self.run(&unit, job, cx, scope)?; } Ok(()) @@ -574,6 +577,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan, + scope: &Scope<'a>, jobserver_helper: &HelperThread, ) -> CargoResult<()> { trace!("queue: {:#?}", self.queue); @@ -590,7 +594,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // and then immediately return. let mut error = None; loop { - self.spawn_work_if_possible(cx, jobserver_helper, error.is_some())?; + self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?; info!( "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}", @@ -706,7 +710,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { } /// Executes a job, pushing the spawned thread's handled onto `threads`. - fn run(&mut self, unit: &Unit<'a>, job: Job, cx: &Context<'a, '_>) -> CargoResult<()> { + fn run( + &mut self, + unit: &Unit<'a>, + job: Job, + cx: &Context<'a, '_>, + scope: &Scope<'_>, + ) -> CargoResult<()> { let id = JobId(self.next_id); self.next_id = self.next_id.checked_add(1).unwrap(); @@ -724,61 +734,65 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { self.note_working_on(cx.bcx.config, unit, fresh)?; } + let doit = move || { + let state = JobState { + id, + tx: my_tx.clone(), + rmeta_required: Cell::new(rmeta_required), + _marker: marker::PhantomData, + }; + + let mut sender = FinishOnDrop { + tx: &my_tx, + id, + result: Err(format_err!("worker panicked")), + }; + sender.result = job.run(&state); + + // If the `rmeta_required` wasn't consumed but it was set + // previously, then we either have: + // + // 1. The `job` didn't do anything because it was "fresh". + // 2. The `job` returned an error and didn't reach the point where + // it called `rmeta_produced`. + // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo. + // + // Ruling out the third, the other two are pretty common for 2 + // we'll just naturally abort the compilation operation but for 1 + // we need to make sure that the metadata is flagged as produced so + // send a synthetic message here. + if state.rmeta_required.get() && sender.result.is_ok() { + my_tx + .send(Message::Finish(id, Artifact::Metadata, Ok(()))) + .unwrap(); + } + + // Use a helper struct with a `Drop` implementation to guarantee + // that a `Finish` message is sent even if our job panics. We + // shouldn't panic unless there's a bug in Cargo, so we just need + // to make sure nothing hangs by accident. + struct FinishOnDrop<'a> { + tx: &'a Sender, + id: JobId, + result: CargoResult<()>, + } + + impl Drop for FinishOnDrop<'_> { + fn drop(&mut self) { + let msg = mem::replace(&mut self.result, Ok(())); + drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); + } + } + }; + match fresh { Freshness::Fresh => { self.timings.add_fresh(); + doit(); } Freshness::Dirty => { self.timings.add_dirty(); - } - } - - let state = JobState { - id, - tx: my_tx.clone(), - rmeta_required: Cell::new(rmeta_required), - _marker: marker::PhantomData, - }; - - let mut sender = FinishOnDrop { - tx: &my_tx, - id, - result: Err(format_err!("worker panicked")), - }; - sender.result = job.run(&state); - - // If the `rmeta_required` wasn't consumed but it was set - // previously, then we either have: - // - // 1. The `job` didn't do anything because it was "fresh". - // 2. The `job` returned an error and didn't reach the point where - // it called `rmeta_produced`. - // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo. - // - // Ruling out the third, the other two are pretty common for 2 - // we'll just naturally abort the compilation operation but for 1 - // we need to make sure that the metadata is flagged as produced so - // send a synthetic message here. - if state.rmeta_required.get() && sender.result.is_ok() { - my_tx - .send(Message::Finish(id, Artifact::Metadata, Ok(()))) - .unwrap(); - } - - // Use a helper struct with a `Drop` implementation to guarantee - // that a `Finish` message is sent even if our job panics. We - // shouldn't panic unless there's a bug in Cargo, so we just need - // to make sure nothing hangs by accident. - struct FinishOnDrop<'a> { - tx: &'a Sender, - id: JobId, - result: CargoResult<()>, - } - - impl Drop for FinishOnDrop<'_> { - fn drop(&mut self) { - let msg = mem::replace(&mut self.result, Ok(())); - drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); + scope.spawn(move |_| doit()); } } From ec4cce9f65e14ad2b6adb5e88d4296cf2d60fd65 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 20 Jan 2020 12:39:18 -0500 Subject: [PATCH 21/26] Refactor rustc thread granting loop --- src/cargo/core/compiler/job_queue.rs | 37 +++++++++++++++------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 06d93180bd4..3b8049dcaf0 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -416,7 +416,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // Now that we've learned of all possible work that we can execute // try to spawn it so long as we've got a jobserver token which says // we're able to perform some parallel work. - while self.active.len() < self.tokens.len() + 1 && !self.pending_queue.is_empty() { + while self.has_extra_tokens() && !self.pending_queue.is_empty() { let (unit, job) = self.pending_queue.remove(0); self.run(&unit, job, cx, scope)?; } @@ -424,23 +424,26 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { Ok(()) } + fn has_extra_tokens(&self) -> bool { + self.active.len() < self.tokens.len() + 1 + } + + // If we managed to acquire some extra tokens, send them off to a waiting rustc. fn grant_rustc_token_requests(&mut self) -> CargoResult<()> { - // If we managed to acquire some extra tokens, send them off to a waiting rustc. - let extra_tokens = self.tokens.len() - (self.active.len() - 1); - for _ in 0..extra_tokens { - if !self.to_send_clients.is_empty() { - // remove from the front so we grant the token to the oldest - // waiter - let (id, client) = self.to_send_clients.remove(0); - let token = self.tokens.pop().expect("an extra token"); - self.rustc_tokens - .entry(id) - .or_insert_with(Vec::new) - .push(token); - client - .release_raw() - .chain_err(|| "failed to release jobserver token")?; - } + while !self.to_send_clients.is_empty() && self.has_extra_tokens() { + // Remove from the front so we grant the token to the oldest waiter + let (id, client) = self.to_send_clients.remove(0); + // This unwrap is guaranteed to succeed. `active` must be at least + // length 1, as otherwise there can't be a client waiting to be sent + // on, so tokens.len() must also be at least one. + let token = self.tokens.pop().unwrap(); + self.rustc_tokens + .entry(id) + .or_insert_with(Vec::new) + .push(token); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; } Ok(()) From 4bd074e7a560813f58c7d936c1b19ac3af0ed37c Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 20 Jan 2020 12:47:56 -0500 Subject: [PATCH 22/26] Use an expect instead of directly panicking --- src/cargo/core/compiler/job_queue.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 3b8049dcaf0..56f22e9962c 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -542,12 +542,12 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // Note that this pops off potentially a completely // different token, but all tokens of the same job are // conceptually the same so that's fine. - if let Some(rustc_tokens) = self.rustc_tokens.get_mut(&id) { - self.tokens - .push(rustc_tokens.pop().expect("rustc releases token it has")); - } else { - panic!("This job does not have tokens associated with it"); - } + let rustc_tokens = self + .rustc_tokens + .get_mut(&id) + .expect("no tokens associated"); + self.tokens + .push(rustc_tokens.pop().expect("rustc releases token it has")); } } From 25bf99bcc32be2eb1a6f3a9e6a4658dda7caafa1 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 20 Jan 2020 12:50:04 -0500 Subject: [PATCH 23/26] Refactor to_send_clients to use a BTreeMap This is both a performance optimization (avoiding O(n) shifting from the beginning), and communicates intent in a nicer way overall. It is plausible that we will eventually want to tie this data structure to something like the DependencyQueue, i.e., to get more information on which rustc to give tokens to. An old rustc with a very late dependency edge is less important than one we'll need sooner, probably. --- src/cargo/core/compiler/job_queue.rs | 44 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 56f22e9962c..a5c26e01d8a 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -50,7 +50,7 @@ //! improved. use std::cell::Cell; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::io; use std::marker; use std::mem; @@ -119,11 +119,8 @@ struct DrainState<'a, 'cfg> { rustc_tokens: HashMap>, /// This represents the list of rustc jobs (processes) and associated - /// clients that are interested in receiving a token. Note that each process - /// may be present many times (if it has requested multiple tokens). - // We use a vec here as we don't want to order randomly which rustc we give - // tokens to. - to_send_clients: Vec<(JobId, Client)>, + /// clients that are interested in receiving a token. + to_send_clients: BTreeMap>, /// The list of jobs that we have not yet started executing, but have /// retrieved from the `queue`. We eagerly pull jobs off the main queue to @@ -135,7 +132,7 @@ struct DrainState<'a, 'cfg> { finished: usize, } -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct JobId(pub u32); impl std::fmt::Display for JobId { @@ -357,7 +354,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { tokens: Vec::new(), rustc_tokens: HashMap::new(), - to_send_clients: Vec::new(), + to_send_clients: BTreeMap::new(), pending_queue: Vec::new(), print: DiagnosticPrinter::new(cx.bcx.config), finished: 0, @@ -428,11 +425,26 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { self.active.len() < self.tokens.len() + 1 } + // The oldest job (i.e., least job ID) is the one we grant tokens to first. + fn pop_waiting_client(&mut self) -> (JobId, Client) { + // FIXME: replace this with BTreeMap::first_entry when that stabilizes. + let key = *self + .to_send_clients + .keys() + .next() + .expect("at least one waiter"); + let clients = self.to_send_clients.get_mut(&key).unwrap(); + let client = clients.pop().unwrap(); + if clients.is_empty() { + self.to_send_clients.remove(&key); + } + (key, client) + } + // If we managed to acquire some extra tokens, send them off to a waiting rustc. fn grant_rustc_token_requests(&mut self) -> CargoResult<()> { while !self.to_send_clients.is_empty() && self.has_extra_tokens() { - // Remove from the front so we grant the token to the oldest waiter - let (id, client) = self.to_send_clients.remove(0); + let (id, client) = self.pop_waiting_client(); // This unwrap is guaranteed to succeed. `active` must be at least // length 1, as otherwise there can't be a client waiting to be sent // on, so tokens.len() must also be at least one. @@ -494,12 +506,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // completely. self.tokens.extend(rustc_tokens); } - while let Some(pos) = - self.to_send_clients.iter().position(|(i, _)| *i == id) - { - // drain all the pending clients - self.to_send_clients.remove(pos); - } + self.to_send_clients.remove(&id); self.active.remove(&id).unwrap() } // ... otherwise if it hasn't finished we leave it @@ -536,7 +543,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { Message::NeedsToken(id, client) => { log::info!("queue token request"); jobserver_helper.request_token(); - self.to_send_clients.push((id, client)); + self.to_send_clients + .entry(id) + .or_insert_with(Vec::new) + .push(client); } Message::ReleaseToken(id) => { // Note that this pops off potentially a completely From 1204a527c73204f318ab56aece5531eb6ba05b75 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 20 Jan 2020 12:53:39 -0500 Subject: [PATCH 24/26] Elaborate on some documentation --- src/cargo/core/compiler/job_queue.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index a5c26e01d8a..16245ae723f 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -501,9 +501,12 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // This puts back the tokens that this rustc // acquired into our primary token list. // - // FIXME: this represents a rustc bug: it did not + // This represents a rustc bug: it did not // release all of its thread tokens but finished - // completely. + // completely. But we want to make Cargo resilient + // to such rustc bugs, as they're generally not + // fatal in nature (i.e., Cargo can make progress + // still, and the build might not even fail). self.tokens.extend(rustc_tokens); } self.to_send_clients.remove(&id); @@ -552,6 +555,11 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // Note that this pops off potentially a completely // different token, but all tokens of the same job are // conceptually the same so that's fine. + // + // self.tokens is a "pool" -- the order doesn't matter -- and + // this transfers ownership of the token into that pool. If we + // end up using it on the next go around, then this token will + // be truncated, same as tokens obtained through Message::Token. let rustc_tokens = self .rustc_tokens .get_mut(&id) From 6117f526c08ce764f22c902e65199dc6bbf4772a Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Mon, 20 Jan 2020 13:03:05 -0500 Subject: [PATCH 25/26] Stop threading Client through into the NeedsToken message --- src/cargo/core/compiler/context/mod.rs | 6 ++++ src/cargo/core/compiler/job_queue.rs | 11 +++--- src/cargo/core/compiler/mod.rs | 46 ++++++++------------------ 3 files changed, 25 insertions(+), 38 deletions(-) diff --git a/src/cargo/core/compiler/context/mod.rs b/src/cargo/core/compiler/context/mod.rs index e57bab18d00..6adccbd4cd4 100644 --- a/src/cargo/core/compiler/context/mod.rs +++ b/src/cargo/core/compiler/context/mod.rs @@ -69,6 +69,11 @@ pub struct Context<'a, 'cfg> { /// metadata files in addition to the rlib itself. This is only filled in /// when `pipelining` above is enabled. rmeta_required: HashSet>, + + /// When we're in jobserver-per-rustc process mode, this keeps those + /// jobserver clients for each Unit (which eventually becomes a rustc + /// process). + pub rustc_clients: HashMap, Client>, } impl<'a, 'cfg> Context<'a, 'cfg> { @@ -112,6 +117,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> { unit_dependencies, files: None, rmeta_required: HashSet::new(), + rustc_clients: HashMap::new(), pipelining, }) } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 16245ae723f..e27939165f8 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -189,7 +189,7 @@ enum Message { Finish(JobId, Artifact, CargoResult<()>), // This client should get release_raw called on it with one of our tokens - NeedsToken(JobId, Client), + NeedsToken(JobId), // A token previously passed to a NeedsToken client is being released. ReleaseToken(JobId), @@ -234,10 +234,10 @@ impl<'a> JobState<'a> { /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) /// on the passed client. /// - /// This should arrange for the passed client to eventually get a token via + /// This should arrange for the associated client to eventually get a token via /// `client.release_raw()`. - pub fn will_acquire(&self, client: &Client) { - let _ = self.tx.send(Message::NeedsToken(self.id, client.clone())); + pub fn will_acquire(&self) { + let _ = self.tx.send(Message::NeedsToken(self.id)); } /// The rustc underlying this Job is informing us that it is done with a jobserver token. @@ -543,9 +543,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?; self.tokens.push(token); } - Message::NeedsToken(id, client) => { + Message::NeedsToken(id) => { log::info!("queue token request"); jobserver_helper.request_token(); + let client = cx.rustc_clients[&self.active[&id]].clone(); self.to_send_clients .entry(id) .or_insert_with(Vec::new) diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 5a41f58c2d6..29805bb3e0e 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -48,7 +48,6 @@ use crate::util::errors::{self, CargoResult, CargoResultExt, Internal, ProcessEr use crate::util::machine_message::Message; use crate::util::{self, machine_message, ProcessBuilder}; use crate::util::{internal, join_paths, paths, profile}; -use jobserver::Client; /// A glorified callback for executing calls to rustc. Rather than calling rustc /// directly, we'll use an `Executor`, giving clients an opportunity to intercept @@ -172,7 +171,7 @@ fn rustc<'a, 'cfg>( unit: &Unit<'a>, exec: &Arc, ) -> CargoResult { - let (rustc_client, mut rustc) = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; + let mut rustc = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; let build_plan = cx.bcx.build_config.build_plan; let name = unit.pkg.name().to_string(); @@ -287,16 +286,7 @@ fn rustc<'a, 'cfg>( &target, mode, &mut |line| on_stdout_line(state, line, package_id, &target), - &mut |line| { - on_stderr_line( - state, - line, - package_id, - &target, - &mut output_options, - &rustc_client, - ) - }, + &mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), ) .map_err(internal_if_simple_exit_code) .chain_err(|| format!("could not compile `{}`.", name))?; @@ -544,22 +534,21 @@ fn prepare_rustc<'a, 'cfg>( cx: &mut Context<'a, 'cfg>, crate_types: &[&str], unit: &Unit<'a>, -) -> CargoResult<(Option, ProcessBuilder)> { +) -> CargoResult { let is_primary = cx.is_primary_package(unit); let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; - let client = if cx.bcx.config.cli_unstable().jobserver_per_rustc { + if cx.bcx.config.cli_unstable().jobserver_per_rustc { let client = cx.new_jobserver()?; base.inherit_jobserver(&client); base.arg("-Zjobserver-token-requests"); - Some(client) + assert!(cx.rustc_clients.insert(*unit, client).is_none()); } else { base.inherit_jobserver(&cx.jobserver); - None - }; + } build_base_args(cx, &mut base, unit, crate_types)?; build_deps_args(&mut base, cx, unit)?; - Ok((client, base)) + Ok(base) } fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult { @@ -618,9 +607,7 @@ fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult rustdoc .exec_with_streaming( &mut |line| on_stdout_line(state, line, package_id, &target), - &mut |line| { - on_stderr_line(state, line, package_id, &target, &mut output_options, &None) - }, + &mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), false, ) .chain_err(|| format!("Could not document `{}`.", name))?; @@ -1101,9 +1088,8 @@ fn on_stderr_line( package_id: PackageId, target: &Target, options: &mut OutputOptions, - rustc_client: &Option, ) -> CargoResult<()> { - if on_stderr_line_inner(state, line, package_id, target, options, rustc_client)? { + if on_stderr_line_inner(state, line, package_id, target, options)? { // Check if caching is enabled. if let Some((path, cell)) = &mut options.cache_cell { // Cache the output, which will be replayed later when Fresh. @@ -1123,7 +1109,6 @@ fn on_stderr_line_inner( package_id: PackageId, target: &Target, options: &mut OutputOptions, - rustc_client: &Option, ) -> CargoResult { // We primarily want to use this function to process JSON messages from // rustc. The compiler should always print one JSON message per line, and @@ -1250,14 +1235,9 @@ fn on_stderr_line_inner( "found jobserver directive from rustc: `{:?}`", jobserver_event ); - match rustc_client { - Some(client) => match jobserver_event { - Event::WillAcquire => state.will_acquire(client), - Event::Release => state.release_token(), - }, - None => { - panic!("Received jobserver event without a client"); - } + match jobserver_event { + Event::WillAcquire => state.will_acquire(), + Event::Release => state.release_token(), } return Ok(false); } @@ -1310,7 +1290,7 @@ fn replay_output_cache( break; } let trimmed = line.trim_end_matches(&['\n', '\r'][..]); - on_stderr_line(state, trimmed, package_id, &target, &mut options, &None)?; + on_stderr_line(state, trimmed, package_id, &target, &mut options)?; line.clear(); } Ok(()) From 7e3e1b128c21803e6e29c6eaedd2070c2e92ee2e Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 22 Jan 2020 14:24:43 -0500 Subject: [PATCH 26/26] Move token truncation to just before waiting This also moves and enhances the message logging the state before blocking. --- src/cargo/core/compiler/job_queue.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index e27939165f8..fd86611bcc5 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -582,9 +582,23 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // listen for a message with a timeout, and on timeout we run the // previous parts of the loop again. let events: Vec<_> = self.rx.try_iter().collect(); + info!( + "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", + self.tokens.len(), + self.rustc_tokens + .iter() + .map(|(k, j)| (k, j.len())) + .collect::>(), + self.to_send_clients + .iter() + .map(|(k, j)| (k, j.len())) + .collect::>(), + events.len(), + ); if events.is_empty() { loop { self.tick_progress(); + self.tokens.truncate(self.active.len() - 1); match self.rx.recv_timeout(Duration::from_millis(500)) { Ok(message) => break vec![message], Err(_) => continue, @@ -618,13 +632,6 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { loop { self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?; - info!( - "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}", - self.tokens.len(), - self.rustc_tokens.len(), - self.to_send_clients.len() - ); - // If after all that we're not actually running anything then we're // done! if self.active.is_empty() { @@ -638,7 +645,6 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // jobserver interface is architected we may acquire a token that we // don't actually use, and if this happens just relinquish it back // to the jobserver itself. - self.tokens.truncate(self.active.len() - 1); for event in self.wait_for_events() { if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? { error = Some(err);