diff --git a/ci/compat-Cargo.lock b/ci/compat-Cargo.lock index 2e98c07b0..aef1589f1 100644 --- a/ci/compat-Cargo.lock +++ b/ci/compat-Cargo.lock @@ -220,16 +220,6 @@ dependencies = [ "scopeguard 1.1.0 (registry+/~https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "crossbeam-queue" -version = "0.2.3" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.10 (registry+/~https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.7.2 (registry+/~https://github.com/rust-lang/crates.io-index)", - "maybe-uninit 2.0.0 (registry+/~https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -1051,7 +1041,6 @@ version = "1.7.1" dependencies = [ "crossbeam-channel 0.4.2 (registry+/~https://github.com/rust-lang/crates.io-index)", "crossbeam-deque 0.7.3 (registry+/~https://github.com/rust-lang/crates.io-index)", - "crossbeam-queue 0.2.3 (registry+/~https://github.com/rust-lang/crates.io-index)", "crossbeam-utils 0.7.2 (registry+/~https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+/~https://github.com/rust-lang/crates.io-index)", "libc 0.2.74 (registry+/~https://github.com/rust-lang/crates.io-index)", @@ -1457,7 +1446,6 @@ source = "registry+/~https://github.com/rust-lang/crates.io-index" "checksum crossbeam-channel 0.4.2 (registry+/~https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" "checksum crossbeam-deque 0.7.3 (registry+/~https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" "checksum crossbeam-epoch 0.8.2 (registry+/~https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" -"checksum crossbeam-queue 0.2.3 (registry+/~https://github.com/rust-lang/crates.io-index)" = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" "checksum crossbeam-utils 0.7.2 (registry+/~https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" "checksum derivative 2.1.1 (registry+/~https://github.com/rust-lang/crates.io-index)" = "cb582b60359da160a9477ee80f15c8d784c477e69c217ef2cdd4169c24ea380f" "checksum dispatch 0.2.0 (registry+/~https://github.com/rust-lang/crates.io-index)" = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b" diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index fd72d1263..97c0c2527 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -20,7 +20,6 @@ num_cpus = "1.2" lazy_static = "1" crossbeam-channel = "0.4.2" crossbeam-deque = "0.7.2" -crossbeam-queue = "0.2" crossbeam-utils = "0.7" [dev-dependencies] diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 27510cff2..a71f1b0e9 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -1,6 +1,6 @@ use crate::latch::Latch; use crate::unwind; -use crossbeam_queue::SegQueue; +use crossbeam_deque::{Injector, Steal}; use std::any::Any; use std::cell::UnsafeCell; use std::mem; @@ -184,13 +184,13 @@ impl JobResult { /// Indirect queue to provide FIFO job priority. pub(super) struct JobFifo { - inner: SegQueue, + inner: Injector, } impl JobFifo { pub(super) fn new() -> Self { JobFifo { - inner: SegQueue::new(), + inner: Injector::new(), } } @@ -206,6 +206,12 @@ impl JobFifo { impl Job for JobFifo { unsafe fn execute(this: *const Self) { // We "execute" a queue by executing its first job, FIFO. - (*this).inner.pop().expect("job in fifo queue").execute() + loop { + match (*this).inner.steal() { + Steal::Success(job_ref) => break job_ref.execute(), + Steal::Empty => panic!("FIFO is empty"), + Steal::Retry => {} + } + } } } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 53032cca3..46ae10a20 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -8,8 +8,7 @@ use crate::util::leak; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, }; -use crossbeam_deque::{Steal, Stealer, Worker}; -use crossbeam_queue::SegQueue; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::any::Any; use std::cell::Cell; use std::collections::hash_map::DefaultHasher; @@ -136,7 +135,7 @@ pub(super) struct Registry { logger: Logger, thread_infos: Vec, sleep: Sleep, - injected_jobs: SegQueue, + injected_jobs: Injector, panic_handler: Option>, start_handler: Option>, exit_handler: Option>, @@ -240,7 +239,7 @@ impl Registry { logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), sleep: Sleep::new(logger, n_threads), - injected_jobs: SegQueue::new(), + injected_jobs: Injector::new(), terminate_count: AtomicUsize::new(1), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), @@ -413,13 +412,18 @@ impl Registry { } fn pop_injected_job(&self, worker_index: usize) -> Option { - let job = self.injected_jobs.pop().ok(); - if job.is_some() { - self.log(|| JobUninjected { - worker: worker_index, - }); + loop { + match self.injected_jobs.steal() { + Steal::Success(job) => { + self.log(|| JobUninjected { + worker: worker_index, + }); + return Some(job); + } + Steal::Empty => return None, + Steal::Retry => {} + } } - job } /// If already in a worker-thread of this registry, just execute `op`. @@ -758,32 +762,39 @@ impl WorkerThread { debug_assert!(self.local_deque_is_empty()); // otherwise, try to steal - let num_threads = self.registry.thread_infos.len(); + let thread_infos = &self.registry.thread_infos.as_slice(); + let num_threads = thread_infos.len(); if num_threads <= 1 { return None; } - let start = self.rng.next_usize(num_threads); - (start..num_threads) - .chain(0..start) - .filter(|&i| i != self.index) - .filter_map(|victim_index| { - let victim = &self.registry.thread_infos[victim_index]; - loop { + loop { + let mut retry = false; + let start = self.rng.next_usize(num_threads); + let job = (start..num_threads) + .chain(0..start) + .filter(move |&i| i != self.index) + .find_map(|victim_index| { + let victim = &thread_infos[victim_index]; match victim.stealer.steal() { - Steal::Empty => return None, - Steal::Success(d) => { + Steal::Success(job) => { self.log(|| JobStolen { worker: self.index, victim: victim_index, }); - return Some(d); + Some(job) + } + Steal::Empty => None, + Steal::Retry => { + retry = true; + None } - Steal::Retry => {} } - } - }) - .next() + }); + if job.is_some() || !retry { + return job; + } + } } } diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 7d274920b..a41d408e1 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -285,7 +285,7 @@ struct ScopeBase<'scope> { /// propagated at that point. pub fn scope<'scope, OP, R>(op: OP) -> R where - OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, + OP: FnOnce(&Scope<'scope>) -> R + Send, R: Send, { in_worker(|owner_thread, _| { @@ -376,7 +376,7 @@ where /// panics are propagated at that point. pub fn scope_fifo<'scope, OP, R>(op: OP) -> R where - OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, R: Send, { in_worker(|owner_thread, _| { diff --git a/rayon-core/src/scope/test.rs b/rayon-core/src/scope/test.rs index 3d855ecda..8cb82b615 100644 --- a/rayon-core/src/scope/test.rs +++ b/rayon-core/src/scope/test.rs @@ -1,6 +1,6 @@ use crate::unwind; use crate::ThreadPoolBuilder; -use crate::{scope, scope_fifo, Scope}; +use crate::{scope, scope_fifo, Scope, ScopeFifo}; use rand::{Rng, SeedableRng}; use rand_xorshift::XorShiftRng; use std::cmp; @@ -433,3 +433,83 @@ fn mixed_fifo_lifo_order() { let expected = vec![-3, 0, -2, 1, -1, 2, 3]; assert_eq!(vec, expected); } + +#[test] +fn static_scope() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope(|s: &Scope<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn static_scope_fifo() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope_fifo(|s: &ScopeFifo<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn_fifo(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn mixed_lifetime_scope() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope(move |s: &Scope<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn mixed_lifetime_scope_fifo() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope_fifo(move |s: &ScopeFifo<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn_fifo(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index 065d236e0..2209f6304 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -200,7 +200,7 @@ impl ThreadPool { /// [scope]: fn.scope.html pub fn scope<'scope, OP, R>(&self, op: OP) -> R where - OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, + OP: FnOnce(&Scope<'scope>) -> R + Send, R: Send, { self.install(|| scope(op)) @@ -215,7 +215,7 @@ impl ThreadPool { /// [scope_fifo]: fn.scope_fifo.html pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R where - OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, R: Send, { self.install(|| scope_fifo(op)) diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 0d6815e12..8d1c90ca1 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -4,12 +4,9 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; -use crate::join; -use crate::thread_pool::ThreadPool; - #[allow(deprecated)] use crate::Configuration; -use crate::ThreadPoolBuilder; +use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; #[test] #[should_panic(expected = "Hello, world!")] @@ -267,3 +264,75 @@ fn spawn_fifo_order() { let expected: Vec = (0..10).collect(); // FIFO -> natural order assert_eq!(vec, expected); } + +#[test] +fn nested_scopes() { + // Create matching scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) + where + OP: FnOnce(&[&Scope<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +fn nested_fifo_scopes() { + // Create matching fifo scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) + where + OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope_fifo(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn_fifo(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +}