diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 3f57bef15..f7a3d1306 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "rustc-rayon-core" -version = "0.5.0" +version = "0.5.1" authors = ["Niko Matsakis ", "Josh Stone "] description = "Core APIs for Rayon - fork for rustc" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/rustc-rayon" documentation = "https://docs.rs/rustc-rayon-core/" -rust-version = "1.59" +rust-version = "1.63" edition = "2021" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] @@ -18,14 +18,12 @@ name = "rayon_core" # Some dependencies may not be their latest version, in order to support older rustc. [dependencies] -num_cpus = "1.2" -crossbeam-channel = "0.5.0" crossbeam-deque = "0.8.1" crossbeam-utils = "0.8.0" [dev-dependencies] -rand = "0.8" -rand_xorshift = "0.3" +rand = "0.9" +rand_xorshift = "0.4" scoped-tls = "1.0" [target.'cfg(unix)'.dev-dependencies] diff --git a/rayon-core/README.md b/rayon-core/README.md index 13b8a451e..5b8714f5d 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -2,7 +2,7 @@ Note: This is an unstable fork made for use in rustc Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. -Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. +Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirrored in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. rayon-core aims to never, or almost never, have a breaking change to its API, because each revision of rayon-core also houses the global thread-pool (and hence if you have two simultaneous versions of rayon-core, you have two thread-pools). @@ -10,4 +10,4 @@ Please see [Rayon Docs] for details about using Rayon. [Rayon Docs]: https://docs.rs/rayon/ -Rayon-core currently requires `rustc 1.59.0` or greater. +Rayon-core currently requires `rustc 1.63.0` or greater. diff --git a/rayon-core/src/broadcast/mod.rs b/rayon-core/src/broadcast/mod.rs index f9cfc47ac..442891f2d 100644 --- a/rayon-core/src/broadcast/mod.rs +++ b/rayon-core/src/broadcast/mod.rs @@ -1,7 +1,6 @@ use crate::job::{ArcJob, StackJob}; -use crate::latch::LatchRef; +use crate::latch::{CountLatch, LatchRef}; use crate::registry::{Registry, WorkerThread}; -use crate::scope::ScopeLatch; use std::fmt; use std::marker::PhantomData; use std::sync::Arc; @@ -108,7 +107,7 @@ where let n_threads = registry.num_threads(); let current_thread = WorkerThread::current().as_ref(); let tlv = crate::tlv::get(); - let latch = ScopeLatch::with_count(n_threads, current_thread); + let latch = CountLatch::with_count(n_threads, current_thread); let jobs: Vec<_> = (0..n_threads) .map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch))) .collect(); diff --git a/rayon-core/src/broadcast/test.rs b/rayon-core/src/broadcast/test.rs index 3ae11f7f6..00ab4ad7f 100644 --- a/rayon-core/src/broadcast/test.rs +++ b/rayon-core/src/broadcast/test.rs @@ -2,6 +2,7 @@ use crate::ThreadPoolBuilder; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; use std::sync::Arc; use std::{thread, time}; @@ -14,7 +15,7 @@ fn broadcast_global() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_global() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); let mut v: Vec<_> = rx.into_iter().collect(); @@ -33,7 +34,7 @@ fn broadcast_pool() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_pool() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -53,7 +54,7 @@ fn broadcast_self() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_self() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); @@ -81,7 +82,7 @@ fn broadcast_mutual() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool1.spawn({ @@ -118,7 +119,7 @@ fn broadcast_mutual_sleepy() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual_sleepy() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool1.spawn({ @@ -158,8 +159,8 @@ fn broadcast_panic_one() { #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_one() { - let (tx, rx) = crossbeam_channel::unbounded(); - let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); let pool = ThreadPoolBuilder::new() .num_threads(7) .panic_handler(move |e| panic_tx.send(e).unwrap()) @@ -196,8 +197,8 @@ fn broadcast_panic_many() { #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_many() { - let (tx, rx) = crossbeam_channel::unbounded(); - let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); let pool = ThreadPoolBuilder::new() .num_threads(7) .panic_handler(move |e| panic_tx.send(e).unwrap()) @@ -231,7 +232,7 @@ fn broadcast_sleep_race() { #[test] fn broadcast_after_spawn_broadcast() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); // Queue a non-blocking spawn_broadcast. crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -247,7 +248,7 @@ fn broadcast_after_spawn_broadcast() { #[test] fn broadcast_after_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn on a thread-local deque. crate::registry::in_worker(move |_, _| { diff --git a/rayon-core/src/join/test.rs b/rayon-core/src/join/test.rs index b303dbc81..03f4ab447 100644 --- a/rayon-core/src/join/test.rs +++ b/rayon-core/src/join/test.rs @@ -1,9 +1,8 @@ //! Tests for the join code. -use crate::join::*; -use crate::unwind; +use super::*; use crate::ThreadPoolBuilder; -use rand::distributions::Standard; +use rand::distr::StandardUniform; use rand::{Rng, SeedableRng}; use rand_xorshift::XorShiftRng; @@ -39,7 +38,7 @@ fn seeded_rng() -> XorShiftRng { #[test] fn sort() { let rng = seeded_rng(); - let mut data: Vec = rng.sample_iter(&Standard).take(6 * 1024).collect(); + let mut data: Vec = rng.sample_iter(&StandardUniform).take(6 * 1024).collect(); let mut sorted_data = data.clone(); sorted_data.sort(); quick_sort(&mut data); @@ -50,7 +49,7 @@ fn sort() { #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn sort_in_pool() { let rng = seeded_rng(); - let mut data: Vec = rng.sample_iter(&Standard).take(12 * 1024).collect(); + let mut data: Vec = rng.sample_iter(&StandardUniform).take(12 * 1024).collect(); let pool = ThreadPoolBuilder::new().build().unwrap(); let mut sorted_data = data.clone(); diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index de4327234..8903942a8 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -2,7 +2,6 @@ use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; -use std::usize; use crate::registry::{Registry, WorkerThread}; @@ -84,13 +83,6 @@ impl CoreLatch { } } - /// Returns the address of this core latch as an integer. Used - /// for logging. - #[inline] - pub(super) fn addr(&self) -> usize { - self as *const CoreLatch as usize - } - /// Invoked by owning thread as it prepares to sleep. Returns true /// if the owning thread may proceed to fall asleep, false if the /// latch was set in the meantime. @@ -142,6 +134,13 @@ impl CoreLatch { } } +impl AsCoreLatch for CoreLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + self + } +} + /// Spin latches are the simplest, most efficient kind, but they do /// not support a `wait()` operation. They just have a boolean flag /// that becomes true when `set()` is called. @@ -269,62 +268,32 @@ impl Latch for LockLatch { } } -/// Counting latches are used to implement scopes. They track a -/// counter. Unlike other latches, calling `set()` does not -/// necessarily make the latch be considered `set()`; instead, it just -/// decrements the counter. The latch is only "set" (in the sense that -/// `probe()` returns true) once the counter reaches zero. +/// Once latches are used to implement one-time blocking, primarily +/// for the termination flag of the threads in the pool. /// -/// Note: like a `SpinLatch`, count laches are always associated with +/// Note: like a `SpinLatch`, once-latches are always associated with /// some registry that is probing them, which must be tickled when /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a /// reference to that registry. This is because in some cases the -/// registry owns the count-latch, and that would create a cycle. So a -/// `CountLatch` must be given a reference to its owning registry when +/// registry owns the once-latch, and that would create a cycle. So a +/// `OnceLatch` must be given a reference to its owning registry when /// it is set. For this reason, it does not implement the `Latch` /// trait (but it doesn't have to, as it is not used in those generic /// contexts). #[derive(Debug)] -pub(super) struct CountLatch { +pub(super) struct OnceLatch { core_latch: CoreLatch, - counter: AtomicUsize, } -impl CountLatch { - #[inline] - pub(super) fn new() -> CountLatch { - Self::with_count(1) - } - +impl OnceLatch { #[inline] - pub(super) fn with_count(n: usize) -> CountLatch { - CountLatch { + pub(super) fn new() -> OnceLatch { + Self { core_latch: CoreLatch::new(), - counter: AtomicUsize::new(n), - } - } - - #[inline] - pub(super) fn increment(&self) { - debug_assert!(!self.core_latch.probe()); - self.counter.fetch_add(1, Ordering::Relaxed); - } - - /// Decrements the latch counter by one. If this is the final - /// count, then the latch is **set**, and calls to `probe()` will - /// return true. Returns whether the latch was set. - #[inline] - pub(super) unsafe fn set(this: *const Self) -> bool { - if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - CoreLatch::set(&(*this).core_latch); - true - } else { - false } } - /// Decrements the latch counter by one and possibly set it. If - /// the latch is set, then the specific worker thread is tickled, + /// Set the latch, then tickle the specific worker thread, /// which should be the one that owns this latch. #[inline] pub(super) unsafe fn set_and_tickle_one( @@ -332,31 +301,81 @@ impl CountLatch { registry: &Registry, target_worker_index: usize, ) { - if Self::set(this) { + if CoreLatch::set(&(*this).core_latch) { registry.notify_worker_latch_is_set(target_worker_index); } } } -impl AsCoreLatch for CountLatch { +impl AsCoreLatch for OnceLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { &self.core_latch } } +/// Counting latches are used to implement scopes. They track a +/// counter. Unlike other latches, calling `set()` does not +/// necessarily make the latch be considered `set()`; instead, it just +/// decrements the counter. The latch is only "set" (in the sense that +/// `probe()` returns true) once the counter reaches zero. #[derive(Debug)] -pub(super) struct CountLockLatch { - lock_latch: LockLatch, +pub(super) struct CountLatch { counter: AtomicUsize, + kind: CountLatchKind, } -impl CountLockLatch { - #[inline] - pub(super) fn with_count(n: usize) -> CountLockLatch { - CountLockLatch { - lock_latch: LockLatch::new(), - counter: AtomicUsize::new(n), +enum CountLatchKind { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CoreLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: LockLatch }, +} + +impl std::fmt::Debug for CountLatchKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CountLatchKind::Stealing { latch, .. } => { + f.debug_tuple("Stealing").field(latch).finish() + } + CountLatchKind::Blocking { latch, .. } => { + f.debug_tuple("Blocking").field(latch).finish() + } + } + } +} + +impl CountLatch { + pub(super) fn new(owner: Option<&WorkerThread>) -> Self { + Self::with_count(1, owner) + } + + pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { + Self { + counter: AtomicUsize::new(count), + kind: match owner { + Some(owner) => CountLatchKind::Stealing { + latch: CoreLatch::new(), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => CountLatchKind::Blocking { + latch: LockLatch::new(), + }, + }, } } @@ -366,16 +385,42 @@ impl CountLockLatch { debug_assert!(old_counter != 0); } - pub(super) fn wait(&self) { - self.lock_latch.wait(); + pub(super) fn wait(&self, owner: Option<&WorkerThread>) { + match &self.kind { + CountLatchKind::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + CountLatchKind::Blocking { latch } => latch.wait(), + } } } -impl Latch for CountLockLatch { +impl Latch for CountLatch { #[inline] unsafe fn set(this: *const Self) { if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - LockLatch::set(&(*this).lock_latch); + // NOTE: Once we call `set` on the internal `latch`, + // the target may proceed and invalidate `this`! + match (*this).kind { + CountLatchKind::Stealing { + ref latch, + ref registry, + worker_index, + } => { + let registry = Arc::clone(registry); + if CoreLatch::set(latch) { + registry.notify_worker_latch_is_set(worker_index); + } + } + CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), + } } } } diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 354bd8f40..72064547e 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -70,9 +70,8 @@ use std::fmt; use std::io; use std::marker::PhantomData; use std::str::FromStr; +use std::thread; -#[macro_use] -mod log; #[macro_use] mod private; @@ -233,7 +232,7 @@ type DeadlockHandler = dyn Fn() + Send + Sync; type StartHandler = dyn Fn(usize) + Send + Sync; /// The type for a closure that gets invoked when a thread exits. The -/// closure is passed the index of the thread on which is is invoked. +/// closure is passed the index of the thread on which it is invoked. /// Note that this same closure may be invoked multiple times in parallel. type ExitHandler = dyn Fn(usize) + Send + Sync; @@ -309,12 +308,12 @@ where impl ThreadPoolBuilder { /// Creates a scoped `ThreadPool` initialized using this configuration. /// - /// This is a convenience function for building a pool using [`crossbeam::scope`] + /// This is a convenience function for building a pool using [`std::thread::scope`] /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). /// The threads in this pool will start by calling `wrapper`, which should /// do initialization and continue by calling `ThreadBuilder::run()`. /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// # Examples /// @@ -349,18 +348,17 @@ impl ThreadPoolBuilder { W: Fn(ThreadBuilder) + Sync, // expected to call `run()` F: FnOnce(&ThreadPool) -> R, { - let result = crossbeam_utils::thread::scope(|scope| { - let wrapper = &wrapper; + std::thread::scope(|scope| { let pool = self .spawn_handler(|thread| { - let mut builder = scope.builder(); + let mut builder = std::thread::Builder::new(); if let Some(name) = thread.name() { builder = builder.name(name.to_string()); } if let Some(size) = thread.stack_size() { builder = builder.stack_size(size); } - builder.spawn(move |_| wrapper(thread))?; + builder.spawn_scoped(scope, || wrapper(thread))?; Ok(()) }) .build()?; @@ -370,12 +368,7 @@ impl ThreadPoolBuilder { Ok(result) => Ok(result), Err(err) => unwind::resume_unwinding(err), } - }); - - match result { - Ok(result) => result, - Err(err) => unwind::resume_unwinding(err), - } + }) } } @@ -384,13 +377,11 @@ impl ThreadPoolBuilder { /// /// Note that the threads will not exit until after the pool is dropped. It /// is up to the caller to wait for thread termination if that is important - /// for any invariants. For instance, threads created in [`crossbeam::scope`] + /// for any invariants. For instance, threads created in [`std::thread::scope`] /// will be joined before that scope returns, and this will block indefinitely /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate /// until the entire process exits! /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html - /// /// # Examples /// /// A minimal spawn handler just needs to call `run()` from an independent thread. @@ -439,6 +430,7 @@ impl ThreadPoolBuilder { /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in /// [`build_scoped`](#method.build_scoped). /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// ``` @@ -498,12 +490,18 @@ impl ThreadPoolBuilder { if self.num_threads > 0 { self.num_threads } else { + let default = || { + thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + }; + match env::var("RAYON_NUM_THREADS") .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => return x, - Some(x) if x == 0 => return num_cpus::get(), + Some(x @ 1..) => return x, + Some(0) => return default(), _ => {} } @@ -512,8 +510,8 @@ impl ThreadPoolBuilder { .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => x, - _ => num_cpus::get(), + Some(x @ 1..) => x, + _ => default(), } } } @@ -552,9 +550,8 @@ impl ThreadPoolBuilder { /// may change in the future, if you wish to rely on a fixed /// number of threads, you should use this function to specify /// that number. To reproduce the current default behavior, you - /// may wish to use the [`num_cpus` - /// crate](https://crates.io/crates/num_cpus) to query the number - /// of CPUs dynamically. + /// may wish to use [`std::thread::available_parallelism`] + /// to query the number of CPUs dynamically. /// /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment diff --git a/rayon-core/src/log.rs b/rayon-core/src/log.rs deleted file mode 100644 index 7b6daf0ab..000000000 --- a/rayon-core/src/log.rs +++ /dev/null @@ -1,413 +0,0 @@ -//! Debug Logging -//! -//! To use in a debug build, set the env var `RAYON_LOG` as -//! described below. In a release build, logs are compiled out by -//! default unless Rayon is built with `--cfg rayon_rs_log` (try -//! `RUSTFLAGS="--cfg rayon_rs_log"`). -//! -//! Note that logs are an internally debugging tool and their format -//! is considered unstable, as are the details of how to enable them. -//! -//! # Valid values for RAYON_LOG -//! -//! The `RAYON_LOG` variable can take on the following values: -//! -//! * `tail:` -- dumps the last 10,000 events into the given file; -//! useful for tracking down deadlocks -//! * `profile:` -- dumps only those events needed to reconstruct how -//! many workers are active at a given time -//! * `all:` -- dumps every event to the file; useful for debugging - -use crossbeam_channel::{self, Receiver, Sender}; -use std::collections::VecDeque; -use std::env; -use std::fs::File; -use std::io::{self, BufWriter, Write}; - -/// True if logs are compiled in. -pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); - -#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] -pub(super) enum Event { - /// Flushes events to disk, used to terminate benchmarking. - Flush, - - /// Indicates that a worker thread started execution. - ThreadStart { - worker: usize, - terminate_addr: usize, - }, - - /// Indicates that a worker thread started execution. - ThreadTerminate { worker: usize }, - - /// Indicates that a worker thread became idle, blocked on `latch_addr`. - ThreadIdle { worker: usize, latch_addr: usize }, - - /// Indicates that an idle worker thread found work to do, after - /// yield rounds. It should no longer be considered idle. - ThreadFoundWork { worker: usize, yields: u32 }, - - /// Indicates that a worker blocked on a latch observed that it was set. - /// - /// Internal debugging event that does not affect the state - /// machine. - ThreadSawLatchSet { worker: usize, latch_addr: usize }, - - /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal - /// sleep state that we saw at the time. - ThreadSleepy { worker: usize, jobs_counter: usize }, - - /// Indicates that the thread's attempt to fall asleep was - /// interrupted because the latch was set. (This is not, in and of - /// itself, a change to the thread state.) - ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, - - /// Indicates that the thread's attempt to fall asleep was - /// interrupted because a job was posted. (This is not, in and of - /// itself, a change to the thread state.) - ThreadSleepInterruptedByJob { worker: usize }, - - /// Indicates that an idle worker has gone to sleep. - ThreadSleeping { worker: usize, latch_addr: usize }, - - /// Indicates that a sleeping worker has awoken. - ThreadAwoken { worker: usize, latch_addr: usize }, - - /// Indicates that the given worker thread was notified it should - /// awaken. - ThreadNotify { worker: usize }, - - /// The given worker has pushed a job to its local deque. - JobPushed { worker: usize }, - - /// The given worker has popped a job from its local deque. - JobPopped { worker: usize }, - - /// The given worker has stolen a job from the deque of another. - JobStolen { worker: usize, victim: usize }, - - /// N jobs were injected into the global queue. - JobsInjected { count: usize }, - - /// A job was removed from the global queue. - JobUninjected { worker: usize }, - - /// A job was broadcasted to N threads. - JobBroadcast { count: usize }, - - /// When announcing a job, this was the value of the counters we observed. - /// - /// No effect on thread state, just a debugging event. - JobThreadCounts { - worker: usize, - num_idle: u16, - num_sleepers: u16, - }, -} - -/// Handle to the logging thread, if any. You can use this to deliver -/// logs. You can also clone it freely. -#[derive(Clone)] -pub(super) struct Logger { - sender: Option>, -} - -impl Logger { - pub(super) fn new(num_workers: usize) -> Logger { - if !LOG_ENABLED { - return Self::disabled(); - } - - // see the doc comment for the format - let env_log = match env::var("RAYON_LOG") { - Ok(s) => s, - Err(_) => return Self::disabled(), - }; - - let (sender, receiver) = crossbeam_channel::unbounded(); - - if let Some(filename) = env_log.strip_prefix("tail:") { - let filename = filename.to_string(); - ::std::thread::spawn(move || { - Self::tail_logger_thread(num_workers, filename, 10_000, receiver) - }); - } else if env_log == "all" { - ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); - } else if let Some(filename) = env_log.strip_prefix("profile:") { - let filename = filename.to_string(); - ::std::thread::spawn(move || { - Self::profile_logger_thread(num_workers, filename, 10_000, receiver) - }); - } else { - panic!("RAYON_LOG should be 'tail:' or 'profile:'"); - } - - Logger { - sender: Some(sender), - } - } - - fn disabled() -> Logger { - Logger { sender: None } - } - - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> Event) { - if !LOG_ENABLED { - return; - } - - if let Some(sender) = &self.sender { - sender.send(event()).unwrap(); - } - } - - fn profile_logger_thread( - num_workers: usize, - log_filename: String, - capacity: usize, - receiver: Receiver, - ) { - let file = File::create(&log_filename) - .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); - - let mut writer = BufWriter::new(file); - let mut events = Vec::with_capacity(capacity); - let mut state = SimulatorState::new(num_workers); - let timeout = std::time::Duration::from_secs(30); - - loop { - while let Ok(event) = receiver.recv_timeout(timeout) { - if let Event::Flush = event { - break; - } - - events.push(event); - if events.len() == capacity { - break; - } - } - - for event in events.drain(..) { - if state.simulate(&event) { - state.dump(&mut writer, &event).unwrap(); - } - } - - writer.flush().unwrap(); - } - } - - fn tail_logger_thread( - num_workers: usize, - log_filename: String, - capacity: usize, - receiver: Receiver, - ) { - let file = File::create(&log_filename) - .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); - - let mut writer = BufWriter::new(file); - let mut events: VecDeque = VecDeque::with_capacity(capacity); - let mut state = SimulatorState::new(num_workers); - let timeout = std::time::Duration::from_secs(30); - let mut skipped = false; - - loop { - while let Ok(event) = receiver.recv_timeout(timeout) { - if let Event::Flush = event { - // We ignore Flush events in tail mode -- - // we're really just looking for - // deadlocks. - continue; - } else { - if events.len() == capacity { - let event = events.pop_front().unwrap(); - state.simulate(&event); - skipped = true; - } - - events.push_back(event); - } - } - - if skipped { - writeln!(writer, "...").unwrap(); - skipped = false; - } - - for event in events.drain(..) { - // In tail mode, we dump *all* events out, whether or - // not they were 'interesting' to the state machine. - state.simulate(&event); - state.dump(&mut writer, &event).unwrap(); - } - - writer.flush().unwrap(); - } - } - - fn all_logger_thread(num_workers: usize, receiver: Receiver) { - let stderr = std::io::stderr(); - let mut state = SimulatorState::new(num_workers); - - for event in receiver { - let mut writer = BufWriter::new(stderr.lock()); - state.simulate(&event); - state.dump(&mut writer, &event).unwrap(); - writer.flush().unwrap(); - } - } -} - -#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] -enum State { - Working, - Idle, - Notified, - Sleeping, - Terminated, -} - -impl State { - fn letter(&self) -> char { - match self { - State::Working => 'W', - State::Idle => 'I', - State::Notified => 'N', - State::Sleeping => 'S', - State::Terminated => 'T', - } - } -} - -struct SimulatorState { - local_queue_size: Vec, - thread_states: Vec, - injector_size: usize, -} - -impl SimulatorState { - fn new(num_workers: usize) -> Self { - Self { - local_queue_size: (0..num_workers).map(|_| 0).collect(), - thread_states: (0..num_workers).map(|_| State::Working).collect(), - injector_size: 0, - } - } - - fn simulate(&mut self, event: &Event) -> bool { - match *event { - Event::ThreadIdle { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Working); - self.thread_states[worker] = State::Idle; - true - } - - Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { - self.thread_states[worker] = State::Working; - true - } - - Event::ThreadTerminate { worker, .. } => { - self.thread_states[worker] = State::Terminated; - true - } - - Event::ThreadSleeping { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Idle); - self.thread_states[worker] = State::Sleeping; - true - } - - Event::ThreadAwoken { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Notified); - self.thread_states[worker] = State::Idle; - true - } - - Event::JobPushed { worker } => { - self.local_queue_size[worker] += 1; - true - } - - Event::JobPopped { worker } => { - self.local_queue_size[worker] -= 1; - true - } - - Event::JobStolen { victim, .. } => { - self.local_queue_size[victim] -= 1; - true - } - - Event::JobsInjected { count } => { - self.injector_size += count; - true - } - - Event::JobUninjected { .. } => { - self.injector_size -= 1; - true - } - - Event::ThreadNotify { worker } => { - // Currently, this log event occurs while holding the - // thread lock, so we should *always* see it before - // the worker awakens. - assert_eq!(self.thread_states[worker], State::Sleeping); - self.thread_states[worker] = State::Notified; - true - } - - // remaining events are no-ops from pov of simulating the - // thread state - _ => false, - } - } - - fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { - let num_idle_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Idle) - .count(); - - let num_sleeping_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Sleeping) - .count(); - - let num_notified_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Notified) - .count(); - - let num_pending_jobs: usize = self.local_queue_size.iter().sum(); - - write!(w, "{:2},", num_idle_threads)?; - write!(w, "{:2},", num_sleeping_threads)?; - write!(w, "{:2},", num_notified_threads)?; - write!(w, "{:4},", num_pending_jobs)?; - write!(w, "{:4},", self.injector_size)?; - - let event_str = format!("{:?}", event); - write!(w, r#""{:60}","#, event_str)?; - - for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { - write!(w, " T{:02},{}", i, state.letter(),)?; - - if *queue_size > 0 { - write!(w, ",{:03},", queue_size)?; - } else { - write!(w, ", ,")?; - } - } - - writeln!(w)?; - Ok(()) - } -} diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 15ceb6b0c..781b6827b 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,7 +1,5 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; -use crate::log::Event::*; -use crate::log::Logger; +use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; use crate::sleep::Sleep; use crate::tlv::Tlv; use crate::unwind; @@ -20,7 +18,6 @@ use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Once}; use std::thread; -use std::usize; /// Thread builder used for customization via /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). @@ -131,7 +128,6 @@ where } pub struct Registry { - logger: Logger, thread_infos: Vec, sleep: Sleep, injected_jobs: Injector, @@ -170,7 +166,13 @@ static THE_REGISTRY_SET: Once = Once::new(); /// configuration. pub(super) fn global_registry() -> &'static Arc { set_global_registry(default_global_registry) - .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) + .or_else(|err| { + // SAFETY: we only create a shared reference to `THE_REGISTRY` after the `call_once` + // that initializes it, and there will be no more mutable accesses at all. + debug_assert!(THE_REGISTRY_SET.is_completed()); + let the_registry = unsafe { &*ptr::addr_of!(THE_REGISTRY) }; + the_registry.as_ref().ok_or(err) + }) .expect("The global thread pool has not been initialized.") } @@ -196,8 +198,14 @@ where )); THE_REGISTRY_SET.call_once(|| { - result = registry() - .map(|registry: Arc| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) + result = registry().map(|registry: Arc| { + // SAFETY: this is the only mutable access to `THE_REGISTRY`, thanks to `Once`, and + // `global_registry()` only takes a shared reference **after** this `call_once`. + unsafe { + ptr::addr_of_mut!(THE_REGISTRY).write(Some(registry)); + (*ptr::addr_of!(THE_REGISTRY)).as_ref().unwrap_unchecked() + } + }) }); result @@ -284,11 +292,9 @@ impl Registry { }) .unzip(); - let logger = Logger::new(n_threads); let registry = Arc::new(Registry { - logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), - sleep: Sleep::new(logger, n_threads), + sleep: Sleep::new(n_threads), injected_jobs: Injector::new(), broadcasts: Mutex::new(broadcasts), terminate_count: AtomicUsize::new(1), @@ -370,11 +376,6 @@ impl Registry { } } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.logger.log(event) - } - pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } @@ -446,8 +447,6 @@ impl Registry { /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. pub(super) fn inject(&self, injected_job: JobRef) { - self.log(|| JobsInjected { count: 1 }); - // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be @@ -462,22 +461,17 @@ impl Registry { let queue_was_empty = self.injected_jobs.is_empty(); self.injected_jobs.push(injected_job); - self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty); + self.sleep.new_injected_jobs(1, queue_was_empty); } pub(crate) fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } - fn pop_injected_job(&self, worker_index: usize) -> Option { + fn pop_injected_job(&self) -> Option { loop { match self.injected_jobs.steal() { - Steal::Success(job) => { - self.log(|| JobUninjected { - worker: worker_index, - }); - return Some(job); - } + Steal::Success(job) => return Some(job), Steal::Empty => return None, Steal::Retry => {} } @@ -491,9 +485,6 @@ impl Registry { /// **Panics** if not given exactly as many jobs as there are threads. pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator) { assert_eq!(self.num_threads(), injected_jobs.len()); - self.log(|| JobBroadcast { - count: self.num_threads(), - }); { let broadcasts = self.broadcasts.lock().unwrap(); @@ -568,9 +559,6 @@ impl Registry { job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. self.acquire_thread(); - // flush accumulated logs as we exit the thread - self.logger.log(|| Flush); - job.into_result() }) } @@ -622,10 +610,7 @@ impl Registry { pub(super) fn increment_terminate_count(&self) { let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); debug_assert!(previous != 0, "registry ref count incremented from zero"); - assert!( - previous != std::usize::MAX, - "overflow in registry ref count" - ); + assert!(previous != usize::MAX, "overflow in registry ref count"); } /// Signals that the thread-pool which owns this registry has been @@ -634,7 +619,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; + unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -682,10 +667,7 @@ struct ThreadInfo { /// This latch is *set* by the `terminate` method on the /// `Registry`, once the registry's main "terminate" counter /// reaches zero. - /// - /// NB. We use a `CountLatch` here because it has no lifetimes and is - /// meant for async use, but the count never gets higher than one. - terminate: CountLatch, + terminate: OnceLatch, /// the "stealer" half of the worker's deque stealer: Stealer, @@ -696,7 +678,7 @@ impl ThreadInfo { ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), - terminate: CountLatch::new(), + terminate: OnceLatch::new(), stealer, } } @@ -779,11 +761,6 @@ impl WorkerThread { &self.registry } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.registry.logger.log(event) - } - /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). #[inline] pub(super) fn index(&self) -> usize { @@ -792,12 +769,9 @@ impl WorkerThread { #[inline] pub(super) unsafe fn push(&self, job: JobRef) { - self.log(|| JobPushed { worker: self.index }); let queue_was_empty = self.worker.is_empty(); self.worker.push(job); - self.registry - .sleep - .new_internal_jobs(self.index, 1, queue_was_empty); + self.registry.sleep.new_internal_jobs(1, queue_was_empty); } #[inline] @@ -819,7 +793,6 @@ impl WorkerThread { let popped_job = self.worker.pop(); if popped_job.is_some() { - self.log(|| JobPopped { worker: self.index }); return popped_job; } @@ -855,31 +828,52 @@ impl WorkerThread { // accesses, which would be *very bad* let abort_guard = unwind::AbortIfPanic; - let mut idle_state = self.registry.sleep.start_looking(self.index, latch); - while !latch.probe() { - if let Some(job) = self.find_work() { - self.registry.sleep.work_found(idle_state); + 'outer: while !latch.probe() { + // Check for local work *before* we start marking ourself idle, + // especially to avoid modifying shared sleep state. + if let Some(job) = self.take_local_job() { self.execute(job); - idle_state = self.registry.sleep.start_looking(self.index, latch); - } else { - self.registry - .sleep - .no_work_found(&mut idle_state, latch, &self) + continue; } - } - // If we were sleepy, we are not anymore. We "found work" -- - // whatever the surrounding thread was doing before it had to - // wait. - self.registry.sleep.work_found(idle_state); + let mut idle_state = self.registry.sleep.start_looking(self.index); + while !latch.probe() { + if let Some(job) = self.find_work() { + self.registry.sleep.work_found(); + self.execute(job); + // The job might have injected local work, so go back to the outer loop. + continue 'outer; + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, &self) + } + } + + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(); + break; + } - self.log(|| ThreadSawLatchSet { - worker: self.index, - latch_addr: latch.addr(), - }); mem::forget(abort_guard); // successful execution, do not abort } + unsafe fn wait_until_out_of_work(&self) { + debug_assert_eq!(self as *const _, WorkerThread::current()); + let registry = &*self.registry; + let index = self.index; + + registry.acquire_thread(); + self.wait_until(®istry.thread_infos[index].terminate); + + // Should not be any work left in our queue. + debug_assert!(self.take_local_job().is_none()); + + // Let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); + } + fn find_work(&self) -> Option { // Try to find some work to do. We give preference first // to things in our local deque, then in other workers @@ -888,7 +882,7 @@ impl WorkerThread { // we take on something new. self.take_local_job() .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job(self.index)) + .or_else(|| self.registry.pop_injected_job()) } pub(super) fn yield_now(&self) -> Yield { @@ -940,13 +934,7 @@ impl WorkerThread { .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { - Steal::Success(job) => { - self.log(|| JobStolen { - worker: self.index, - victim: victim_index, - }); - Some(job) - } + Steal::Success(job) => Some(job), Steal::Empty => None, Steal::Retry => { retry = true; @@ -982,25 +970,11 @@ unsafe fn main_loop(thread: ThreadBuilder) { registry.catch_unwind(|| handler(index)); } - let my_terminate_latch = ®istry.thread_infos[index].terminate; - worker_thread.log(|| ThreadStart { - worker: index, - terminate_addr: my_terminate_latch.as_core_latch().addr(), - }); - registry.acquire_thread(); - worker_thread.wait_until(my_terminate_latch); - - // Should not be any work left in our queue. - debug_assert!(worker_thread.take_local_job().is_none()); - - // let registry know we are done - Latch::set(®istry.thread_infos[index].stopped); + worker_thread.wait_until_out_of_work(); // Normal termination, do not abort. mem::forget(abort_guard); - worker_thread.log(|| ThreadTerminate { worker: index }); - // Inform a user callback that we exited a thread. if let Some(ref handler) = registry.exit_handler { registry.catch_unwind(|| handler(index)); diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 1b74f274d..364b322ba 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -7,7 +7,7 @@ use crate::broadcast::BroadcastContext; use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; -use crate::latch::{CountLatch, CountLockLatch, Latch}; +use crate::latch::{CountLatch, Latch}; use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::tlv::{self, Tlv}; use crate::unwind; @@ -40,26 +40,6 @@ pub struct ScopeFifo<'scope> { fifos: Vec, } -pub(super) enum ScopeLatch { - /// A latch for scopes created on a rayon thread which will participate in work- - /// stealing while it waits for completion. This thread is not necessarily part - /// of the same registry as the scope itself! - Stealing { - latch: CountLatch, - /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool - /// with registry B, when a job completes in a thread of registry B, we may - /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A. - /// That means we need a reference to registry A (since at that point we will - /// only have a reference to registry B), so we stash it here. - registry: Arc, - /// The index of the worker to wake in `registry` - worker_index: usize, - }, - - /// A latch for scopes created on a non-rayon thread which will block to wait. - Blocking { latch: CountLockLatch }, -} - struct ScopeBase<'scope> { /// thread registry where `scope()` was executed or where `in_place_scope()` /// should spawn jobs. @@ -70,12 +50,13 @@ struct ScopeBase<'scope> { panic: AtomicPtr>, /// latch to track job counts - job_completed_latch: ScopeLatch, + job_completed_latch: CountLatch, /// You can think of a scope as containing a list of closures to execute, /// all of which outlive `'scope`. They're not actually required to be /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. + #[allow(clippy::type_complexity)] marker: PhantomData) + Send + Sync + 'scope>>, /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. @@ -654,22 +635,18 @@ impl<'scope> ScopeBase<'scope> { ScopeBase { registry: Arc::clone(registry), panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: ScopeLatch::new(owner), + job_completed_latch: CountLatch::new(owner), marker: PhantomData, tlv: tlv::get(), } } - fn increment(&self) { - self.job_completed_latch.increment(); - } - fn heap_job_ref(&self, job: Box>) -> JobRef where FUNC: FnOnce() + Send + 'scope, { unsafe { - self.increment(); + self.job_completed_latch.increment(); job.into_job_ref() } } @@ -680,7 +657,7 @@ impl<'scope> ScopeBase<'scope> { { let n_threads = self.registry.num_threads(); let job_refs = (0..n_threads).map(|_| unsafe { - self.increment(); + self.job_completed_latch.increment(); ArcJob::as_job_ref(&job) }); @@ -719,17 +696,15 @@ impl<'scope> ScopeBase<'scope> { where FUNC: FnOnce() -> R, { - match unwind::halt_unwinding(func) { - Ok(r) => { - Latch::set(&(*this).job_completed_latch); - Some(r) - } + let result = match unwind::halt_unwinding(func) { + Ok(r) => Some(r), Err(err) => { (*this).job_panicked(err); - Latch::set(&(*this).job_completed_latch); None } - } + }; + Latch::set(&(*this).job_completed_latch); + result } fn job_panicked(&self, err: Box) { @@ -767,61 +742,6 @@ impl<'scope> ScopeBase<'scope> { } } -impl ScopeLatch { - fn new(owner: Option<&WorkerThread>) -> Self { - Self::with_count(1, owner) - } - - pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { - match owner { - Some(owner) => ScopeLatch::Stealing { - latch: CountLatch::with_count(count), - registry: Arc::clone(owner.registry()), - worker_index: owner.index(), - }, - None => ScopeLatch::Blocking { - latch: CountLockLatch::with_count(count), - }, - } - } - - fn increment(&self) { - match self { - ScopeLatch::Stealing { latch, .. } => latch.increment(), - ScopeLatch::Blocking { latch } => latch.increment(), - } - } - - pub(super) fn wait(&self, owner: Option<&WorkerThread>) { - match self { - ScopeLatch::Stealing { - latch, - registry, - worker_index, - } => unsafe { - let owner = owner.expect("owner thread"); - debug_assert_eq!(registry.id(), owner.registry().id()); - debug_assert_eq!(*worker_index, owner.index()); - owner.wait_until(latch); - }, - ScopeLatch::Blocking { latch } => latch.wait(), - } - } -} - -impl Latch for ScopeLatch { - unsafe fn set(this: *const Self) { - match &*this { - ScopeLatch::Stealing { - latch, - registry, - worker_index, - } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), - ScopeLatch::Blocking { latch } => Latch::set(latch), - } - } -} - impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Scope") @@ -843,21 +763,6 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> { } } -impl fmt::Debug for ScopeLatch { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ScopeLatch::Stealing { latch, .. } => fmt - .debug_tuple("ScopeLatch::Stealing") - .field(latch) - .finish(), - ScopeLatch::Blocking { latch } => fmt - .debug_tuple("ScopeLatch::Blocking") - .field(latch) - .finish(), - } - } -} - /// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. /// /// Unsafe code is still required to dereference the pointer, but that's fine in diff --git a/rayon-core/src/scope/test.rs b/rayon-core/src/scope/test.rs index ad8c4af0b..4505ba7c4 100644 --- a/rayon-core/src/scope/test.rs +++ b/rayon-core/src/scope/test.rs @@ -3,7 +3,6 @@ use crate::ThreadPoolBuilder; use crate::{scope, scope_fifo, Scope, ScopeFifo}; use rand::{Rng, SeedableRng}; use rand_xorshift::XorShiftRng; -use std::cmp; use std::iter::once; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Barrier, Mutex}; @@ -120,13 +119,13 @@ fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree { let children = if depth == 0 { vec![] } else { - (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level + (0..rng.random_range(0..4)) // somewhere between 0 and 3 children at each level .map(|_| random_tree1(depth - 1, rng)) .collect() }; Tree { - value: rng.gen_range(0..1_000_000), + value: rng.random_range(0..1_000_000), children, } } @@ -182,7 +181,7 @@ fn the_final_countdown<'scope>( let diff = if p > q { p - q } else { q - p }; let mut data = max.lock().unwrap(); - *data = cmp::max(diff, *data); + *data = Ord::max(diff, *data); if n > 0 { s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1)); @@ -217,12 +216,13 @@ fn panic_propagate_nested_scope_spawn() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_1() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| panic!("Hello, world!")); // job A s.spawn(|_| x = true); // job B, should still execute even though A panics }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "job b failed to execute"), } @@ -232,12 +232,13 @@ fn panic_propagate_still_execute_1() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_2() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| x = true); // job B, should still execute even though A panics s.spawn(|_| panic!("Hello, world!")); // job A }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "job b failed to execute"), } @@ -247,12 +248,13 @@ fn panic_propagate_still_execute_2() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_3() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| x = true); // spawned job should still execute despite later panic panic!("Hello, world!"); }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), } @@ -262,12 +264,13 @@ fn panic_propagate_still_execute_3() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_4() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| panic!("Hello, world!")); x = true; }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "panic in spawn tainted scope"), } diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index 6f27df51d..e79efd15c 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -139,7 +139,7 @@ The full protocol for a thread to fall asleep is as follows: * Checks the JEC to see that it has not changed from `final_value`. * If it has, then the thread goes back to searching for work. We reset to just before we got sleepy, so that we will do one more search - before attending to sleep again (rather than searching for many rounds). + before attempting to sleep again (rather than searching for many rounds). * Increments the number of sleeping threads by 1. * The thread then executes a seq-cst fence operation (see below). * The thread then does one final check for injected jobs (see below). If any diff --git a/rayon-core/src/sleep/counters.rs b/rayon-core/src/sleep/counters.rs index f2a3de3e1..05941becd 100644 --- a/rayon-core/src/sleep/counters.rs +++ b/rayon-core/src/sleep/counters.rs @@ -27,7 +27,7 @@ pub(super) struct Counters { pub(super) struct JobsEventCounter(usize); impl JobsEventCounter { - pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX); + pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(usize::MAX); #[inline] pub(super) fn as_usize(self) -> usize { @@ -166,7 +166,7 @@ impl AtomicCounters { // Current heuristic: whenever an inactive thread goes away, if // there are any sleeping threads, wake 'em up. let sleeping_threads = old_value.sleeping_threads(); - std::cmp::min(sleeping_threads, 2) + Ord::min(sleeping_threads, 2) } /// Subtracts a sleeping thread. This cannot fail, but it is only @@ -212,12 +212,12 @@ impl AtomicCounters { #[inline] fn select_thread(word: usize, shift: usize) -> usize { - ((word >> shift) as usize) & THREADS_MAX + (word >> shift) & THREADS_MAX } #[inline] fn select_jec(word: usize) -> usize { - (word >> JEC_SHIFT) as usize + word >> JEC_SHIFT } impl Counters { diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 48c92802e..7d88ece21 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -2,15 +2,12 @@ //! for an overview. use crate::latch::CoreLatch; -use crate::log::Event::*; -use crate::log::Logger; use crate::registry::WorkerThread; use crate::DeadlockHandler; use crossbeam_utils::CachePadded; use std::sync::atomic::Ordering; use std::sync::{Condvar, Mutex}; use std::thread; -use std::usize; mod counters; pub(crate) use self::counters::THREADS_MAX; @@ -47,8 +44,6 @@ impl SleepData { /// /// [`README.md`] README.md pub(super) struct Sleep { - logger: Logger, - /// One "sleep state" per worker. Used to track if a worker is sleeping and to have /// them block. worker_sleep_states: Vec>, @@ -89,10 +84,9 @@ const ROUNDS_UNTIL_SLEEPY: u32 = 32; const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; impl Sleep { - pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep { + pub(super) fn new(n_threads: usize) -> Sleep { assert!(n_threads <= THREADS_MAX); Sleep { - logger, worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), counters: AtomicCounters::new(), data: Mutex::new(SleepData { @@ -128,12 +122,7 @@ impl Sleep { } #[inline] - pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { - self.logger.log(|| ThreadIdle { - worker: worker_index, - latch_addr: latch.addr(), - }); - + pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { self.counters.add_inactive_thread(); IdleState { @@ -144,12 +133,7 @@ impl Sleep { } #[inline] - pub(super) fn work_found(&self, idle_state: IdleState) { - self.logger.log(|| ThreadFoundWork { - worker: idle_state.worker_index, - yields: idle_state.rounds, - }); - + pub(super) fn work_found(&self) { // If we were the last idle thread and other threads are still sleeping, // then we should wake up another thread. let threads_to_wake = self.counters.sub_inactive_thread(); @@ -167,7 +151,7 @@ impl Sleep { thread::yield_now(); idle_state.rounds += 1; } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { - idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index); + idle_state.jobs_counter = self.announce_sleepy(); idle_state.rounds += 1; thread::yield_now(); } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { @@ -180,16 +164,10 @@ impl Sleep { } #[cold] - fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { - let counters = self - .counters - .increment_jobs_event_counter_if(JobsEventCounter::is_active); - let jobs_counter = counters.jobs_counter(); - self.logger.log(|| ThreadSleepy { - worker: worker_index, - jobs_counter: jobs_counter.as_usize(), - }); - jobs_counter + fn announce_sleepy(&self) -> JobsEventCounter { + self.counters + .increment_jobs_event_counter_if(JobsEventCounter::is_active) + .jobs_counter() } #[cold] @@ -197,11 +175,6 @@ impl Sleep { let worker_index = idle_state.worker_index; if !latch.get_sleepy() { - self.logger.log(|| ThreadSleepInterruptedByLatch { - worker: worker_index, - latch_addr: latch.addr(), - }); - return; } @@ -212,11 +185,6 @@ impl Sleep { // Our latch was signalled. We should wake back up fully as we // will have some stuff to do. if !latch.fall_asleep() { - self.logger.log(|| ThreadSleepInterruptedByLatch { - worker: worker_index, - latch_addr: latch.addr(), - }); - idle_state.wake_fully(); return; } @@ -231,10 +199,6 @@ impl Sleep { // we didn't see it. We should return to just before the SLEEPY // state so we can do another search and (if we fail to find // work) go back to sleep. - self.logger.log(|| ThreadSleepInterruptedByJob { - worker: worker_index, - }); - idle_state.wake_partly(); latch.wake_up(); return; @@ -248,11 +212,6 @@ impl Sleep { // Successfully registered as asleep. - self.logger.log(|| ThreadSleeping { - worker: worker_index, - latch_addr: latch.addr(), - }); - // We have one last check for injected jobs to do. This protects against // deadlock in the very unlikely event that // @@ -296,11 +255,6 @@ impl Sleep { // Update other state: idle_state.wake_fully(); latch.wake_up(); - - self.logger.log(|| ThreadAwoken { - worker: worker_index, - latch_addr: latch.addr(), - }); } /// Notify the given thread that it should wake up (if it is @@ -318,24 +272,16 @@ impl Sleep { /// /// # Parameters /// - /// - `source_worker_index` -- index of the thread that did the - /// push, or `usize::MAX` if this came from outside the thread - /// pool -- it is used only for logging. /// - `num_jobs` -- lower bound on number of jobs available for stealing. /// We'll try to get at least one thread per job. #[inline] - pub(super) fn new_injected_jobs( - &self, - source_worker_index: usize, - num_jobs: u32, - queue_was_empty: bool, - ) { + pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { // This fence is needed to guarantee that threads // as they are about to fall asleep, observe any // new jobs that may have been injected. std::sync::atomic::fence(Ordering::SeqCst); - self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + self.new_jobs(num_jobs, queue_was_empty) } /// Signals that `num_jobs` new jobs were pushed onto a thread's @@ -348,24 +294,16 @@ impl Sleep { /// /// # Parameters /// - /// - `source_worker_index` -- index of the thread that did the - /// push, or `usize::MAX` if this came from outside the thread - /// pool -- it is used only for logging. /// - `num_jobs` -- lower bound on number of jobs available for stealing. /// We'll try to get at least one thread per job. #[inline] - pub(super) fn new_internal_jobs( - &self, - source_worker_index: usize, - num_jobs: u32, - queue_was_empty: bool, - ) { - self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { + self.new_jobs(num_jobs, queue_was_empty) } /// Common helper for `new_injected_jobs` and `new_internal_jobs`. #[inline] - fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) { + fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { // Read the counters and -- if sleepy workers have announced themselves // -- announce that there is now work available. The final value of `counters` // with which we exit the loop thus corresponds to a state when @@ -375,12 +313,6 @@ impl Sleep { let num_awake_but_idle = counters.awake_but_idle_threads(); let num_sleepers = counters.sleeping_threads(); - self.logger.log(|| JobThreadCounts { - worker: source_worker_index, - num_idle: num_awake_but_idle as u16, - num_sleepers: num_sleepers as u16, - }); - if num_sleepers == 0 { // nobody to wake return; @@ -395,10 +327,10 @@ impl Sleep { // -- clearly the existing idle jobs aren't enough. Otherwise, // check to see if we have enough idle workers. if !queue_was_empty { - let num_to_wake = std::cmp::min(num_jobs, num_sleepers); + let num_to_wake = Ord::min(num_jobs, num_sleepers); self.wake_any_threads(num_to_wake); } else if num_awake_but_idle < num_jobs { - let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); + let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers); self.wake_any_threads(num_to_wake); } } @@ -440,8 +372,6 @@ impl Sleep { // do. self.counters.sub_sleeping_thread(); - self.logger.log(|| ThreadNotify { worker: index }); - true } else { false diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index c2a8b3b53..034df30dc 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -5,8 +5,8 @@ use crate::unwind; use std::mem; use std::sync::Arc; -/// Fires off a task into the Rayon threadpool in the "static" or -/// "global" scope. Just like a standard thread, this task is not +/// Puts the task into the Rayon threadpool's job queue in the "static" +/// or "global" scope. Just like a standard thread, this task is not /// tied to the current stack frame, and hence it cannot hold any /// references other than those with `'static` lifetime. If you want /// to spawn a task that references stack data, use [the `scope()` diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index fd890c9fd..65af6d710 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -80,6 +80,43 @@ impl ThreadPool { /// thread-local data from the current thread will not be /// accessible. /// + /// # Warning: execution order + /// + /// If the current thread is part of a different thread pool, it will try to + /// keep busy while the `op` completes in its target pool, similar to + /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may + /// potentially schedule other tasks to run on the current thread in the + /// meantime. For example + /// + /// ```rust + /// # use rayon_core as rayon; + /// fn main() { + /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap(); + /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap(); + /// let do_it = || { + /// print!("one "); + /// pool.install(||{}); + /// print!("two "); + /// }; + /// rayon::join(|| do_it(), || do_it()); + /// } + /// ``` + /// + /// Since we configured just one thread in the global pool, one might + /// expect `do_it()` to run sequentially, producing: + /// + /// ```ascii + /// one two one two + /// ``` + /// + /// However each call to `install()` yields implicitly, allowing rayon to + /// run multiple instances of `do_it()` concurrently on the single, global + /// thread. The following output would be equally valid: + /// + /// ```ascii + /// one one two two + /// ``` + /// /// # Panics /// /// If `op` should panic, that panic will be propagated. diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 6143e5799..88b36282d 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -383,7 +383,7 @@ fn in_place_scope_fifo_no_deadlock() { #[test] fn yield_now_to_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); @@ -401,7 +401,7 @@ fn yield_now_to_spawn() { #[test] fn yield_local_to_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); diff --git a/rayon-core/tests/scoped_threadpool.rs b/rayon-core/tests/scoped_threadpool.rs index 534e8bbf4..932147179 100644 --- a/rayon-core/tests/scoped_threadpool.rs +++ b/rayon-core/tests/scoped_threadpool.rs @@ -93,7 +93,7 @@ fn build_scoped_tls_threadpool() { }, ) .expect("thread pool created"); - // Internally, `crossbeam::scope` will wait for the threads to exit before returning. + // Internally, `std::thread::scope` will wait for the threads to exit before returning. }); }); } diff --git a/rayon-core/tests/stack_overflow_crash.rs b/rayon-core/tests/stack_overflow_crash.rs index 7dcde43c4..a64940692 100644 --- a/rayon-core/tests/stack_overflow_crash.rs +++ b/rayon-core/tests/stack_overflow_crash.rs @@ -8,6 +8,7 @@ use std::os::unix::process::ExitStatusExt; fn force_stack_overflow(depth: u32) { let mut buffer = [0u8; 1024 * 1024]; + #[allow(clippy::incompatible_msrv)] std::hint::black_box(&mut buffer); if depth > 0 { force_stack_overflow(depth - 1);