Skip to content

Commit

Permalink
Add jobserver support to sccache
Browse files Browse the repository at this point in the history
This commit alters the main sccache server to operate and orchestrate its own
GNU make style jobserver. This is primarily intended for interoperation with
rustc itself.

The Rust compiler currently has a multithreaded mode where it will execute code
generation and optimization on the LLVM side of things in parallel. This
parallelism, however, can overload a machine quickly if not properly accounted
for (e.g. if 10 rustcs all spawn 10 threads...). The usage of a GNU make style
jobserver is intended to arbitrate and rate limit all these rustc instances to
ensure that one build's maximal parallelism never exceeds a particular amount.

Currently for Rust Cargo is the primary driver for setting up a jobserver. Cargo
will create this and manage this per compilation, ensuring that any one `cargo
build` invocation never exceeds a maximal parallelism. When sccache enters the
picture, however, the story gets slightly more odd.

The jobserver implementation on Unix relies on inheritance of file descriptors
in spawned processes. With sccache, however, there's no inheritance as the
actual rustc invocation is spawned by the server, not the client. In this case
the env vars used to configure the jobsever are usually incorrect.

To handle this problem this commit bakes a jobserver directly into sccache
itself. The jobserver then overrides whatever jobserver the client has
configured in its own env vars to ensure correct operation. The settings of each
jobserver may be misconfigured (there's no way to configure sccache's jobserver
right now), but hopefully that's not too much of a problem for the forseeable
future.

The implementation here was to provide a thin wrapper around the `jobserver`
crate with a futures-based interface. This interface was then hooked into the
mock command infrastructure to automatically acquire a jobserver token when
spawning a process and automatically drop the token when the process exits.
Additionally, all spawned processes will now automatically receive a configured
jobserver.

cc rust-lang/rust#42867, the original motivation for this commit
  • Loading branch information
alexcrichton authored and jrobsonchase committed Oct 10, 2017
1 parent aa5f93f commit a38da80
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 111 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ futures = "0.1.11"
futures-cpupool = "0.1"
hyper = { version = "0.11", optional = true }
hyper-tls = { version = "0.1", optional = true }
jobserver = "0.1"
jsonwebtoken = { version = "2.0", optional = true }
libc = "0.2.10"
local-encoding = "0.2.0"
log = "0.3.6"
lru-disk-cache = { path = "lru-disk-cache", version = "0.1.0" }
native-tls = "0.1"
num_cpus = "1.0"
number_prefix = "0.2.5"
openssl = { version = "0.9", optional = true }
redis = { version = "0.8.0", optional = true }
Expand Down
9 changes: 4 additions & 5 deletions src/cache/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use cache::{
CacheWrite,
Storage,
};
use futures::Future;
use futures_cpupool::CpuPool;
use lru_disk_cache::LruDiskCache;
use lru_disk_cache::Error as LruError;
Expand Down Expand Up @@ -62,7 +61,7 @@ impl Storage for DiskCache {
let path = make_key_path(key);
let lru = self.lru.clone();
let key = key.to_owned();
self.pool.spawn_fn(move || {
Box::new(self.pool.spawn_fn(move || {
let mut lru = lru.lock().unwrap();
let f = match lru.get(&path) {
Ok(f) => f,
Expand All @@ -78,7 +77,7 @@ impl Storage for DiskCache {
};
let hit = CacheRead::from(f)?;
Ok(Cache::Hit(hit))
}).boxed()
}))
}

fn put(&self, key: &str, entry: CacheWrite) -> SFuture<Duration> {
Expand All @@ -87,12 +86,12 @@ impl Storage for DiskCache {
trace!("DiskCache::finish_put({})", key);
let lru = self.lru.clone();
let key = make_key_path(key);
self.pool.spawn_fn(move || {
Box::new(self.pool.spawn_fn(move || {
let start = Instant::now();
let v = entry.finish()?;
lru.lock().unwrap().insert_bytes(key, &v)?;
Ok(start.elapsed())
}).boxed()
}))
}

fn location(&self) -> String {
Expand Down
4 changes: 3 additions & 1 deletion src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use client::{
ServerConnection,
};
use cmdline::{Command, StatsFormat};
use jobserver::Client;
use log::LogLevel::Trace;
use mock_command::{
CommandCreatorSync,
Expand Down Expand Up @@ -601,9 +602,10 @@ pub fn run_command(cmd: Command) -> Result<i32> {
}
Command::Compile { exe, cmdline, cwd, env_vars } => {
trace!("Command::Compile {{ {:?}, {:?}, {:?} }}", exe, cmdline, cwd);
let jobserver = unsafe { Client::new() };
let conn = connect_or_start_server(get_port())?;
let mut core = Core::new()?;
let res = do_compile(ProcessCommandCreator::new(&core.handle()),
let res = do_compile(ProcessCommandCreator::new(&core.handle(), &jobserver),
&mut core,
conn,
exe.as_ref(),
Expand Down
44 changes: 16 additions & 28 deletions src/compiler/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,13 @@ fn detect_compiler<T>(creator: &T, executable: &Path, pool: &CpuPool)
};
let is_rustc = if filename.to_string_lossy().to_lowercase() == "rustc" {
// Sanity check that it's really rustc.
let executable = executable.to_path_buf();
let child = creator.clone().new_command_sync(&executable)
.stdout(Stdio::piped())
.stderr(Stdio::null())
.args(&["--version"])
.spawn().chain_err(|| {
format!("failed to execute {:?}", executable)
});
let output = child.into_future().and_then(move |child| {
.spawn();
let output = child.and_then(move |child| {
child.wait_with_output()
.chain_err(|| "failed to read child output")
});
Expand Down Expand Up @@ -530,10 +529,7 @@ gcc
let output = write.and_then(move |(tempdir, src)| {
cmd.arg("-E").arg(src);
trace!("compiler {:?}", cmd);
let child = cmd.spawn().chain_err(|| {
format!("failed to execute {:?}", cmd)
});
child.into_future().and_then(|child| {
cmd.spawn().and_then(|child| {
child.wait_with_output().chain_err(|| "failed to read child output")
}).map(|e| {
drop(tempdir);
Expand Down Expand Up @@ -724,11 +720,9 @@ mod test {
let o = obj.clone();
next_command_calls(&creator, move |_| {
// Pretend to compile something.
match File::create(&o)
.and_then(|mut f| f.write_all(b"file contents")) {
Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)),
Err(e) => Err(e),
}
let mut f = File::create(&o)?;
f.write_all(b"file contents")?;
Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR))
});
let cwd = f.tempdir.path();
let arguments = ovec!["-c", "foo.c", "-o", "foo.o"];
Expand Down Expand Up @@ -805,11 +799,9 @@ mod test {
let o = obj.clone();
next_command_calls(&creator, move |_| {
// Pretend to compile something.
match File::create(&o)
.and_then(|mut f| f.write_all(b"file contents")) {
Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)),
Err(e) => Err(e),
}
let mut f = File::create(&o)?;
f.write_all(b"file contents")?;
Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR))
});
let cwd = f.tempdir.path();
let arguments = ovec!["-c", "foo.c", "-o", "foo.o"];
Expand Down Expand Up @@ -887,11 +879,9 @@ mod test {
let o = obj.clone();
next_command_calls(&creator, move |_| {
// Pretend to compile something.
match File::create(&o)
.and_then(|mut f| f.write_all(b"file contents")) {
Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)),
Err(e) => Err(e),
}
let mut f = File::create(&o)?;
f.write_all(b"file contents")?;
Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR))
});
let cwd = f.tempdir.path();
let arguments = ovec!["-c", "foo.c", "-o", "foo.o"];
Expand Down Expand Up @@ -954,11 +944,9 @@ mod test {
let o = obj.clone();
next_command_calls(&creator, move |_| {
// Pretend to compile something.
match File::create(&o)
.and_then(|mut f| f.write_all(b"file contents")) {
Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)),
Err(e) => Err(e),
}
let mut f = File::create(&o)?;
f.write_all(b"file contents")?;
Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR))
});
}
let cwd = f.tempdir.path();
Expand Down
71 changes: 71 additions & 0 deletions src/jobserver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
extern crate jobserver;

use std::io;
use std::process::Command;
use std::sync::Arc;

use futures::prelude::*;
use futures::sync::mpsc;
use futures::sync::oneshot;
use num_cpus;

use errors::*;

pub use self::jobserver::Acquired;

#[derive(Clone)]
pub struct Client {
helper: Arc<jobserver::HelperThread>,
inner: jobserver::Client,
tx: mpsc::UnboundedSender<oneshot::Sender<io::Result<Acquired>>>
}

impl Client {
// unsafe because `from_env` is unsafe (can use the wrong fds)
pub unsafe fn new() -> Client {
match jobserver::Client::from_env() {
Some(c) => Client::_new(c),
None => Client::new_num(num_cpus::get()),
}
}

pub fn new_num(num: usize) -> Client {
let inner = jobserver::Client::new(num)
.expect("failed to create jobserver");
Client::_new(inner)
}

fn _new(inner: jobserver::Client) -> Client {
let (tx, rx) = mpsc::unbounded::<oneshot::Sender<_>>();
let mut rx = rx.wait();
let helper = inner.clone().into_helper_thread(move |token| {
if let Some(Ok(sender)) = rx.next() {
drop(sender.send(token));
}
}).expect("failed to spawn helper thread");

Client {
inner: inner,
helper: Arc::new(helper),
tx: tx,
}
}

/// Configures this jobserver to be inherited by the specified command
pub fn configure(&self, cmd: &mut Command) {
self.inner.configure(cmd)
}

/// Returns a future that represents an acquired jobserver token.
///
/// This should be invoked before any "work" is spawend (for whatever the
/// defnition of "work" is) to ensure that the system is properly
/// rate-limiting itself.
pub fn acquire(&self) -> SFuture<Acquired> {
let (tx, rx) = oneshot::channel();
self.helper.request_token();
self.tx.unbounded_send(tx).unwrap();
Box::new(rx.chain_err(|| "jobserver helper panicked")
.and_then(|t| t.chain_err(|| "failed to acquire jobserver token")))
}
}
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ extern crate libc;
#[cfg(windows)]
extern crate mio_named_pipes;
extern crate native_tls;
extern crate num_cpus;
extern crate number_prefix;
#[cfg(feature = "openssl")]
extern crate openssl;
Expand Down Expand Up @@ -93,6 +94,7 @@ mod client;
mod cmdline;
mod commands;
mod compiler;
mod jobserver;
mod mock_command;
mod protocol;
mod server;
Expand Down
Loading

0 comments on commit a38da80

Please sign in to comment.