From 65574c37664fce8d2ab0c1ac7d4d346acb2027e4 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Tue, 20 Jun 2023 13:53:19 -0700 Subject: [PATCH 01/27] Improve inlining of scope latch counters The `increment` and `set` methods now have `#[inline]` hints, and the `counter` fields in `CountLatch` and `CountLockLatch` are now listed first to increase the chance that layout puts them at the same offset. (That layout is not critical to ensure, but works out nicely.) (cherry picked from commit f98eb5754aef15617bca78eb9d025ddccb00a70f) --- rayon-core/src/latch.rs | 6 ++++-- rayon-core/src/scope/mod.rs | 15 ++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index de4327234..40a9f14fa 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -286,8 +286,9 @@ impl Latch for LockLatch { /// contexts). #[derive(Debug)] pub(super) struct CountLatch { - core_latch: CoreLatch, + // counter is first to nudge layout like CountLockLatch counter: AtomicUsize, + core_latch: CoreLatch, } impl CountLatch { @@ -347,8 +348,9 @@ impl AsCoreLatch for CountLatch { #[derive(Debug)] pub(super) struct CountLockLatch { - lock_latch: LockLatch, + // counter is first to nudge layout like CountLatch counter: AtomicUsize, + lock_latch: LockLatch, } impl CountLockLatch { diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 1b74f274d..06d599992 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -660,6 +660,7 @@ impl<'scope> ScopeBase<'scope> { } } + #[inline] fn increment(&self) { self.job_completed_latch.increment(); } @@ -719,17 +720,15 @@ impl<'scope> ScopeBase<'scope> { where FUNC: FnOnce() -> R, { - match unwind::halt_unwinding(func) { - Ok(r) => { - Latch::set(&(*this).job_completed_latch); - Some(r) - } + let result = match unwind::halt_unwinding(func) { + Ok(r) => Some(r), Err(err) => { (*this).job_panicked(err); - Latch::set(&(*this).job_completed_latch); None } - } + }; + Latch::set(&(*this).job_completed_latch); + result } fn job_panicked(&self, err: Box) { @@ -785,6 +784,7 @@ impl ScopeLatch { } } + #[inline] fn increment(&self) { match self { ScopeLatch::Stealing { latch, .. } => latch.increment(), @@ -810,6 +810,7 @@ impl ScopeLatch { } impl Latch for ScopeLatch { + #[inline] unsafe fn set(this: *const Self) { match &*this { ScopeLatch::Stealing { From f46f1d48957833149343be878ea372e4b87d9181 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Tue, 20 Jun 2023 14:12:55 -0700 Subject: [PATCH 02/27] Try local jobs first in `wait_until_cold` If a worker thread has jobs in its own queue, then it's not really headed for an inactive / sleepy state yet, so we can avoid modifying the registry-wide sleep state. (cherry picked from commit 99e6fc1232237b39f933107e4bd4ccba8839e39a) --- rayon-core/src/registry.rs | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 15ceb6b0c..9deabde2a 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -855,23 +855,33 @@ impl WorkerThread { // accesses, which would be *very bad* let abort_guard = unwind::AbortIfPanic; - let mut idle_state = self.registry.sleep.start_looking(self.index, latch); - while !latch.probe() { - if let Some(job) = self.find_work() { - self.registry.sleep.work_found(idle_state); + 'outer: while !latch.probe() { + // Check for local work *before* we start marking ourself idle, + // especially to avoid modifying shared sleep state. + if let Some(job) = self.take_local_job() { self.execute(job); - idle_state = self.registry.sleep.start_looking(self.index, latch); - } else { - self.registry - .sleep - .no_work_found(&mut idle_state, latch, &self) + continue; } - } - // If we were sleepy, we are not anymore. We "found work" -- - // whatever the surrounding thread was doing before it had to - // wait. - self.registry.sleep.work_found(idle_state); + let mut idle_state = self.registry.sleep.start_looking(self.index, latch); + while !latch.probe() { + if let Some(job) = self.find_work() { + self.registry.sleep.work_found(idle_state); + self.execute(job); + // The job might have injected local work, so go back to the outer loop. + continue 'outer; + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, &self) + } + } + + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(idle_state); + break; + } self.log(|| ThreadSawLatchSet { worker: self.index, From b56511f6749a560b4908fb6a138b090bb8cc5fb0 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 21 Jun 2023 14:28:16 -0700 Subject: [PATCH 03/27] Refactor scope latches to reduce matching The former `enum ScopeLatch` forced a `match` during both `increment` and `set` (decrement), even though both variants only need to update an `AtomicUsize` most of the time. #1057 helped hide that for `increment`, but `set` branching still showed up in perf profiles. Now this is refactored to a unified `CountLatch` that has a direct field for its `counter` used in the frequent case, and then an internal enum for the one-time notification variants. Therefore, most of its updates will have no `match` reached at all. The only other use of the former `CountLatch` was the one-shot termination latch in `WorkerThread`, so that's now renamed to `OnceLatch`. (cherry picked from commit 32d3774a8e25eb336c6da1c4a018f86bd6bc17af) --- rayon-core/src/broadcast/mod.rs | 5 +- rayon-core/src/latch.rs | 163 +++++++++++++++++++++----------- rayon-core/src/registry.rs | 11 +-- rayon-core/src/scope/mod.rs | 107 +-------------------- 4 files changed, 118 insertions(+), 168 deletions(-) diff --git a/rayon-core/src/broadcast/mod.rs b/rayon-core/src/broadcast/mod.rs index f9cfc47ac..442891f2d 100644 --- a/rayon-core/src/broadcast/mod.rs +++ b/rayon-core/src/broadcast/mod.rs @@ -1,7 +1,6 @@ use crate::job::{ArcJob, StackJob}; -use crate::latch::LatchRef; +use crate::latch::{CountLatch, LatchRef}; use crate::registry::{Registry, WorkerThread}; -use crate::scope::ScopeLatch; use std::fmt; use std::marker::PhantomData; use std::sync::Arc; @@ -108,7 +107,7 @@ where let n_threads = registry.num_threads(); let current_thread = WorkerThread::current().as_ref(); let tlv = crate::tlv::get(); - let latch = ScopeLatch::with_count(n_threads, current_thread); + let latch = CountLatch::with_count(n_threads, current_thread); let jobs: Vec<_> = (0..n_threads) .map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch))) .collect(); diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index 40a9f14fa..f06829ca3 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -142,6 +142,13 @@ impl CoreLatch { } } +impl AsCoreLatch for CoreLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + self + } +} + /// Spin latches are the simplest, most efficient kind, but they do /// not support a `wait()` operation. They just have a boolean flag /// that becomes true when `set()` is called. @@ -269,63 +276,32 @@ impl Latch for LockLatch { } } -/// Counting latches are used to implement scopes. They track a -/// counter. Unlike other latches, calling `set()` does not -/// necessarily make the latch be considered `set()`; instead, it just -/// decrements the counter. The latch is only "set" (in the sense that -/// `probe()` returns true) once the counter reaches zero. +/// Once latches are used to implement one-time blocking, primarily +/// for the termination flag of the threads in the pool. /// -/// Note: like a `SpinLatch`, count laches are always associated with +/// Note: like a `SpinLatch`, once-latches are always associated with /// some registry that is probing them, which must be tickled when /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a /// reference to that registry. This is because in some cases the -/// registry owns the count-latch, and that would create a cycle. So a -/// `CountLatch` must be given a reference to its owning registry when +/// registry owns the once-latch, and that would create a cycle. So a +/// `OnceLatch` must be given a reference to its owning registry when /// it is set. For this reason, it does not implement the `Latch` /// trait (but it doesn't have to, as it is not used in those generic /// contexts). #[derive(Debug)] -pub(super) struct CountLatch { - // counter is first to nudge layout like CountLockLatch - counter: AtomicUsize, +pub(super) struct OnceLatch { core_latch: CoreLatch, } -impl CountLatch { - #[inline] - pub(super) fn new() -> CountLatch { - Self::with_count(1) - } - +impl OnceLatch { #[inline] - pub(super) fn with_count(n: usize) -> CountLatch { - CountLatch { + pub(super) fn new() -> OnceLatch { + Self { core_latch: CoreLatch::new(), - counter: AtomicUsize::new(n), } } - #[inline] - pub(super) fn increment(&self) { - debug_assert!(!self.core_latch.probe()); - self.counter.fetch_add(1, Ordering::Relaxed); - } - - /// Decrements the latch counter by one. If this is the final - /// count, then the latch is **set**, and calls to `probe()` will - /// return true. Returns whether the latch was set. - #[inline] - pub(super) unsafe fn set(this: *const Self) -> bool { - if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - CoreLatch::set(&(*this).core_latch); - true - } else { - false - } - } - - /// Decrements the latch counter by one and possibly set it. If - /// the latch is set, then the specific worker thread is tickled, + /// Set the latch, then tickle the specific worker thread, /// which should be the one that owns this latch. #[inline] pub(super) unsafe fn set_and_tickle_one( @@ -333,32 +309,81 @@ impl CountLatch { registry: &Registry, target_worker_index: usize, ) { - if Self::set(this) { + if CoreLatch::set(&(*this).core_latch) { registry.notify_worker_latch_is_set(target_worker_index); } } } -impl AsCoreLatch for CountLatch { +impl AsCoreLatch for OnceLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { &self.core_latch } } +/// Counting latches are used to implement scopes. They track a +/// counter. Unlike other latches, calling `set()` does not +/// necessarily make the latch be considered `set()`; instead, it just +/// decrements the counter. The latch is only "set" (in the sense that +/// `probe()` returns true) once the counter reaches zero. #[derive(Debug)] -pub(super) struct CountLockLatch { - // counter is first to nudge layout like CountLatch +pub(super) struct CountLatch { counter: AtomicUsize, - lock_latch: LockLatch, + kind: CountLatchKind, } -impl CountLockLatch { - #[inline] - pub(super) fn with_count(n: usize) -> CountLockLatch { - CountLockLatch { - lock_latch: LockLatch::new(), - counter: AtomicUsize::new(n), +enum CountLatchKind { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CoreLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: LockLatch }, +} + +impl std::fmt::Debug for CountLatchKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CountLatchKind::Stealing { latch, .. } => { + f.debug_tuple("Stealing").field(latch).finish() + } + CountLatchKind::Blocking { latch, .. } => { + f.debug_tuple("Blocking").field(latch).finish() + } + } + } +} + +impl CountLatch { + pub(super) fn new(owner: Option<&WorkerThread>) -> Self { + Self::with_count(1, owner) + } + + pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { + Self { + counter: AtomicUsize::new(count), + kind: match owner { + Some(owner) => CountLatchKind::Stealing { + latch: CoreLatch::new(), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => CountLatchKind::Blocking { + latch: LockLatch::new(), + }, + }, } } @@ -368,16 +393,42 @@ impl CountLockLatch { debug_assert!(old_counter != 0); } - pub(super) fn wait(&self) { - self.lock_latch.wait(); + pub(super) fn wait(&self, owner: Option<&WorkerThread>) { + match &self.kind { + CountLatchKind::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + CountLatchKind::Blocking { latch } => latch.wait(), + } } } -impl Latch for CountLockLatch { +impl Latch for CountLatch { #[inline] unsafe fn set(this: *const Self) { if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - LockLatch::set(&(*this).lock_latch); + // NOTE: Once we call `set` on the internal `latch`, + // the target may proceed and invalidate `this`! + match (*this).kind { + CountLatchKind::Stealing { + ref latch, + ref registry, + worker_index, + } => { + let registry = Arc::clone(registry); + if CoreLatch::set(latch) { + registry.notify_worker_latch_is_set(worker_index); + } + } + CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), + } } } } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 9deabde2a..97f9182fa 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,5 +1,5 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; +use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; @@ -634,7 +634,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; + unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -682,10 +682,7 @@ struct ThreadInfo { /// This latch is *set* by the `terminate` method on the /// `Registry`, once the registry's main "terminate" counter /// reaches zero. - /// - /// NB. We use a `CountLatch` here because it has no lifetimes and is - /// meant for async use, but the count never gets higher than one. - terminate: CountLatch, + terminate: OnceLatch, /// the "stealer" half of the worker's deque stealer: Stealer, @@ -696,7 +693,7 @@ impl ThreadInfo { ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), - terminate: CountLatch::new(), + terminate: OnceLatch::new(), stealer, } } diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 06d599992..f9fb8d559 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -7,7 +7,7 @@ use crate::broadcast::BroadcastContext; use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; -use crate::latch::{CountLatch, CountLockLatch, Latch}; +use crate::latch::{CountLatch, Latch}; use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::tlv::{self, Tlv}; use crate::unwind; @@ -40,26 +40,6 @@ pub struct ScopeFifo<'scope> { fifos: Vec, } -pub(super) enum ScopeLatch { - /// A latch for scopes created on a rayon thread which will participate in work- - /// stealing while it waits for completion. This thread is not necessarily part - /// of the same registry as the scope itself! - Stealing { - latch: CountLatch, - /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool - /// with registry B, when a job completes in a thread of registry B, we may - /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A. - /// That means we need a reference to registry A (since at that point we will - /// only have a reference to registry B), so we stash it here. - registry: Arc, - /// The index of the worker to wake in `registry` - worker_index: usize, - }, - - /// A latch for scopes created on a non-rayon thread which will block to wait. - Blocking { latch: CountLockLatch }, -} - struct ScopeBase<'scope> { /// thread registry where `scope()` was executed or where `in_place_scope()` /// should spawn jobs. @@ -70,7 +50,7 @@ struct ScopeBase<'scope> { panic: AtomicPtr>, /// latch to track job counts - job_completed_latch: ScopeLatch, + job_completed_latch: CountLatch, /// You can think of a scope as containing a list of closures to execute, /// all of which outlive `'scope`. They're not actually required to be @@ -654,23 +634,18 @@ impl<'scope> ScopeBase<'scope> { ScopeBase { registry: Arc::clone(registry), panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: ScopeLatch::new(owner), + job_completed_latch: CountLatch::new(owner), marker: PhantomData, tlv: tlv::get(), } } - #[inline] - fn increment(&self) { - self.job_completed_latch.increment(); - } - fn heap_job_ref(&self, job: Box>) -> JobRef where FUNC: FnOnce() + Send + 'scope, { unsafe { - self.increment(); + self.job_completed_latch.increment(); job.into_job_ref() } } @@ -681,7 +656,7 @@ impl<'scope> ScopeBase<'scope> { { let n_threads = self.registry.num_threads(); let job_refs = (0..n_threads).map(|_| unsafe { - self.increment(); + self.job_completed_latch.increment(); ArcJob::as_job_ref(&job) }); @@ -766,63 +741,6 @@ impl<'scope> ScopeBase<'scope> { } } -impl ScopeLatch { - fn new(owner: Option<&WorkerThread>) -> Self { - Self::with_count(1, owner) - } - - pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { - match owner { - Some(owner) => ScopeLatch::Stealing { - latch: CountLatch::with_count(count), - registry: Arc::clone(owner.registry()), - worker_index: owner.index(), - }, - None => ScopeLatch::Blocking { - latch: CountLockLatch::with_count(count), - }, - } - } - - #[inline] - fn increment(&self) { - match self { - ScopeLatch::Stealing { latch, .. } => latch.increment(), - ScopeLatch::Blocking { latch } => latch.increment(), - } - } - - pub(super) fn wait(&self, owner: Option<&WorkerThread>) { - match self { - ScopeLatch::Stealing { - latch, - registry, - worker_index, - } => unsafe { - let owner = owner.expect("owner thread"); - debug_assert_eq!(registry.id(), owner.registry().id()); - debug_assert_eq!(*worker_index, owner.index()); - owner.wait_until(latch); - }, - ScopeLatch::Blocking { latch } => latch.wait(), - } - } -} - -impl Latch for ScopeLatch { - #[inline] - unsafe fn set(this: *const Self) { - match &*this { - ScopeLatch::Stealing { - latch, - registry, - worker_index, - } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), - ScopeLatch::Blocking { latch } => Latch::set(latch), - } - } -} - impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Scope") @@ -844,21 +762,6 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> { } } -impl fmt::Debug for ScopeLatch { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ScopeLatch::Stealing { latch, .. } => fmt - .debug_tuple("ScopeLatch::Stealing") - .field(latch) - .finish(), - ScopeLatch::Blocking { latch } => fmt - .debug_tuple("ScopeLatch::Blocking") - .field(latch) - .finish(), - } - } -} - /// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. /// /// Unsafe code is still required to dereference the pointer, but that's fine in From 3d5d03ed890402d72fab007ee2e799fc6ef13589 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 22 Jun 2022 13:41:49 -0700 Subject: [PATCH 04/27] Switch build_scoped to std::thread::scope (Rust 1.63) (cherry picked from commit e39b730ae995ea491a7c39521af7201d65bca8ff) --- rayon-core/src/lib.rs | 23 ++++++++--------------- rayon-core/tests/scoped_threadpool.rs | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 354bd8f40..1fa20c299 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -309,12 +309,12 @@ where impl ThreadPoolBuilder { /// Creates a scoped `ThreadPool` initialized using this configuration. /// - /// This is a convenience function for building a pool using [`crossbeam::scope`] + /// This is a convenience function for building a pool using [`std::thread::scope`] /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). /// The threads in this pool will start by calling `wrapper`, which should /// do initialization and continue by calling `ThreadBuilder::run()`. /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// # Examples /// @@ -349,18 +349,17 @@ impl ThreadPoolBuilder { W: Fn(ThreadBuilder) + Sync, // expected to call `run()` F: FnOnce(&ThreadPool) -> R, { - let result = crossbeam_utils::thread::scope(|scope| { - let wrapper = &wrapper; + std::thread::scope(|scope| { let pool = self .spawn_handler(|thread| { - let mut builder = scope.builder(); + let mut builder = std::thread::Builder::new(); if let Some(name) = thread.name() { builder = builder.name(name.to_string()); } if let Some(size) = thread.stack_size() { builder = builder.stack_size(size); } - builder.spawn(move |_| wrapper(thread))?; + builder.spawn_scoped(scope, || wrapper(thread))?; Ok(()) }) .build()?; @@ -370,12 +369,7 @@ impl ThreadPoolBuilder { Ok(result) => Ok(result), Err(err) => unwind::resume_unwinding(err), } - }); - - match result { - Ok(result) => result, - Err(err) => unwind::resume_unwinding(err), - } + }) } } @@ -384,13 +378,11 @@ impl ThreadPoolBuilder { /// /// Note that the threads will not exit until after the pool is dropped. It /// is up to the caller to wait for thread termination if that is important - /// for any invariants. For instance, threads created in [`crossbeam::scope`] + /// for any invariants. For instance, threads created in [`std::thread::scope`] /// will be joined before that scope returns, and this will block indefinitely /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate /// until the entire process exits! /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html - /// /// # Examples /// /// A minimal spawn handler just needs to call `run()` from an independent thread. @@ -439,6 +431,7 @@ impl ThreadPoolBuilder { /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in /// [`build_scoped`](#method.build_scoped). /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// ``` diff --git a/rayon-core/tests/scoped_threadpool.rs b/rayon-core/tests/scoped_threadpool.rs index 534e8bbf4..932147179 100644 --- a/rayon-core/tests/scoped_threadpool.rs +++ b/rayon-core/tests/scoped_threadpool.rs @@ -93,7 +93,7 @@ fn build_scoped_tls_threadpool() { }, ) .expect("thread pool created"); - // Internally, `crossbeam::scope` will wait for the threads to exit before returning. + // Internally, `std::thread::scope` will wait for the threads to exit before returning. }); }); } From 2e82ca68b65029f851628159c2ca3e432e9d4473 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 21 Aug 2023 17:59:53 -0700 Subject: [PATCH 05/27] Bump MSRV to 1.63 (cherry picked from commit 0b73db24d687682535dbb966756b6237356b6c27) --- rayon-core/Cargo.toml | 2 +- rayon-core/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 3f57bef15..2bdd4e3bd 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -7,7 +7,7 @@ description = "Core APIs for Rayon - fork for rustc" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/rustc-rayon" documentation = "https://docs.rs/rustc-rayon-core/" -rust-version = "1.59" +rust-version = "1.63" edition = "2021" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] diff --git a/rayon-core/README.md b/rayon-core/README.md index 13b8a451e..43a1ba3cb 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -10,4 +10,4 @@ Please see [Rayon Docs] for details about using Rayon. [Rayon Docs]: https://docs.rs/rayon/ -Rayon-core currently requires `rustc 1.59.0` or greater. +Rayon-core currently requires `rustc 1.63.0` or greater. From e7d827554aac211fba1f307b563ae6fd30e706ff Mon Sep 17 00:00:00 2001 From: Andrew Mackenzie Date: Mon, 16 May 2022 15:37:26 +0200 Subject: [PATCH 06/27] Use std::thread::available_parallelism() instead of num_cpus dependency (cherry picked from commit 936093fd2ec2418189daee3958a3bcb41685801b) --- rayon-core/Cargo.toml | 1 - rayon-core/src/lib.rs | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 2bdd4e3bd..f7b592cbc 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -18,7 +18,6 @@ name = "rayon_core" # Some dependencies may not be their latest version, in order to support older rustc. [dependencies] -num_cpus = "1.2" crossbeam-channel = "0.5.0" crossbeam-deque = "0.8.1" crossbeam-utils = "0.8.0" diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 1fa20c299..1dda7668e 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -70,6 +70,7 @@ use std::fmt; use std::io; use std::marker::PhantomData; use std::str::FromStr; +use std::thread; #[macro_use] mod log; @@ -496,7 +497,8 @@ impl ThreadPoolBuilder { .and_then(|s| usize::from_str(&s).ok()) { Some(x) if x > 0 => return x, - Some(x) if x == 0 => return num_cpus::get(), + Some(x) if x == 0 => return thread::available_parallelism() + .map(|n| n.get()).unwrap_or(1), _ => {} } @@ -506,7 +508,7 @@ impl ThreadPoolBuilder { .and_then(|s| usize::from_str(&s).ok()) { Some(x) if x > 0 => x, - _ => num_cpus::get(), + _ => thread::available_parallelism().map(|n| n.get()).unwrap_or(1) } } } From b27f5339227c066175cd777b173743ea127546d8 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 18 Sep 2023 16:05:23 -0700 Subject: [PATCH 07/27] Refactor common calls to available_parallelism (cherry picked from commit dca9bd0f4ce09e159de2384c98035edb43e465fc) --- rayon-core/src/lib.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 1dda7668e..c6ca0554d 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -492,13 +492,18 @@ impl ThreadPoolBuilder { if self.num_threads > 0 { self.num_threads } else { + let default = || { + thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + }; + match env::var("RAYON_NUM_THREADS") .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => return x, - Some(x) if x == 0 => return thread::available_parallelism() - .map(|n| n.get()).unwrap_or(1), + Some(x @ 1..) => return x, + Some(0) => return default(), _ => {} } @@ -507,8 +512,8 @@ impl ThreadPoolBuilder { .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => x, - _ => thread::available_parallelism().map(|n| n.get()).unwrap_or(1) + Some(x @ 1..) => x, + _ => default(), } } } From 0b242c8b1a1ca9d36c9e956df6dde13b87ecee56 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 18 Sep 2023 16:05:45 -0700 Subject: [PATCH 08/27] Document the use of available_parallelism (cherry picked from commit 24723fafdf815c48e58eb4d104be6eef31a2b169) --- rayon-core/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index c6ca0554d..d202f79f0 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -552,9 +552,8 @@ impl ThreadPoolBuilder { /// may change in the future, if you wish to rely on a fixed /// number of threads, you should use this function to specify /// that number. To reproduce the current default behavior, you - /// may wish to use the [`num_cpus` - /// crate](https://crates.io/crates/num_cpus) to query the number - /// of CPUs dynamically. + /// may wish to use [`std::thread::available_parallelism`] + /// to query the number of CPUs dynamically. /// /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment From 8a680df511e96760661486b663945b3f548a28d7 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 18 Sep 2023 18:21:14 -0700 Subject: [PATCH 09/27] Fix clippy::unnecessary_cast (cherry picked from commit c8bb6ca97dc7c5d6ecacee2bbef8131d64ad65c2) --- rayon-core/src/sleep/counters.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/sleep/counters.rs b/rayon-core/src/sleep/counters.rs index f2a3de3e1..53d2c5527 100644 --- a/rayon-core/src/sleep/counters.rs +++ b/rayon-core/src/sleep/counters.rs @@ -212,12 +212,12 @@ impl AtomicCounters { #[inline] fn select_thread(word: usize, shift: usize) -> usize { - ((word >> shift) as usize) & THREADS_MAX + (word >> shift) & THREADS_MAX } #[inline] fn select_jec(word: usize) -> usize { - (word >> JEC_SHIFT) as usize + word >> JEC_SHIFT } impl Counters { From 7e35a4df17daeb64e857cb982cba29b819dfc2e3 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 18 Sep 2023 17:35:04 -0700 Subject: [PATCH 10/27] Remove the semi-secret logging Rayon has long had some logging functionality, but not well advertised. The only mention is in the (private) module docs: > To use in a debug build, set the env var `RAYON_LOG` as > described below. In a release build, logs are compiled out by > default unless Rayon is built with `--cfg rayon_rs_log` (try > `RUSTFLAGS="--cfg rayon_rs_log"`). > > Note that logs are an internally debugging tool and their format > is considered unstable, as are the details of how to enable them. I, for one, have not "internally" used this for debugging at all, yet it comes at some cost to all users, even disabled in release builds. At the very least it requires `crossbeam-channel` that we're not using anywhere else except tests. Besides that, this code also bloats the compiled size of `rayon-core` by about 30%, and similar for its compile time. **So let's just rip out the logger!** The remaining uses of `crossbeam-channel` in test cases are easily avoidable too, since `std::sync::mpsc::Sender` is now `Sync`. (cherry picked from commit 191ade2c1f2ec27a6b8d7dc9d7e3881335ba4e91) --- rayon-core/Cargo.toml | 1 - rayon-core/src/broadcast/test.rs | 23 +- rayon-core/src/latch.rs | 7 - rayon-core/src/lib.rs | 2 - rayon-core/src/log.rs | 413 ----------------------------- rayon-core/src/registry.rs | 68 +---- rayon-core/src/sleep/mod.rs | 87 +----- rayon-core/src/thread_pool/test.rs | 4 +- 8 files changed, 34 insertions(+), 571 deletions(-) delete mode 100644 rayon-core/src/log.rs diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index f7b592cbc..917714b32 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -18,7 +18,6 @@ name = "rayon_core" # Some dependencies may not be their latest version, in order to support older rustc. [dependencies] -crossbeam-channel = "0.5.0" crossbeam-deque = "0.8.1" crossbeam-utils = "0.8.0" diff --git a/rayon-core/src/broadcast/test.rs b/rayon-core/src/broadcast/test.rs index 3ae11f7f6..00ab4ad7f 100644 --- a/rayon-core/src/broadcast/test.rs +++ b/rayon-core/src/broadcast/test.rs @@ -2,6 +2,7 @@ use crate::ThreadPoolBuilder; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; use std::sync::Arc; use std::{thread, time}; @@ -14,7 +15,7 @@ fn broadcast_global() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_global() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); let mut v: Vec<_> = rx.into_iter().collect(); @@ -33,7 +34,7 @@ fn broadcast_pool() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_pool() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -53,7 +54,7 @@ fn broadcast_self() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_self() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); @@ -81,7 +82,7 @@ fn broadcast_mutual() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool1.spawn({ @@ -118,7 +119,7 @@ fn broadcast_mutual_sleepy() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual_sleepy() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool1.spawn({ @@ -158,8 +159,8 @@ fn broadcast_panic_one() { #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_one() { - let (tx, rx) = crossbeam_channel::unbounded(); - let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); let pool = ThreadPoolBuilder::new() .num_threads(7) .panic_handler(move |e| panic_tx.send(e).unwrap()) @@ -196,8 +197,8 @@ fn broadcast_panic_many() { #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_many() { - let (tx, rx) = crossbeam_channel::unbounded(); - let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); let pool = ThreadPoolBuilder::new() .num_threads(7) .panic_handler(move |e| panic_tx.send(e).unwrap()) @@ -231,7 +232,7 @@ fn broadcast_sleep_race() { #[test] fn broadcast_after_spawn_broadcast() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); // Queue a non-blocking spawn_broadcast. crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -247,7 +248,7 @@ fn broadcast_after_spawn_broadcast() { #[test] fn broadcast_after_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn on a thread-local deque. crate::registry::in_worker(move |_, _| { diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index f06829ca3..b0cbbd833 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -84,13 +84,6 @@ impl CoreLatch { } } - /// Returns the address of this core latch as an integer. Used - /// for logging. - #[inline] - pub(super) fn addr(&self) -> usize { - self as *const CoreLatch as usize - } - /// Invoked by owning thread as it prepares to sleep. Returns true /// if the owning thread may proceed to fall asleep, false if the /// latch was set in the meantime. diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index d202f79f0..c0cb2927c 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -72,8 +72,6 @@ use std::marker::PhantomData; use std::str::FromStr; use std::thread; -#[macro_use] -mod log; #[macro_use] mod private; diff --git a/rayon-core/src/log.rs b/rayon-core/src/log.rs deleted file mode 100644 index 7b6daf0ab..000000000 --- a/rayon-core/src/log.rs +++ /dev/null @@ -1,413 +0,0 @@ -//! Debug Logging -//! -//! To use in a debug build, set the env var `RAYON_LOG` as -//! described below. In a release build, logs are compiled out by -//! default unless Rayon is built with `--cfg rayon_rs_log` (try -//! `RUSTFLAGS="--cfg rayon_rs_log"`). -//! -//! Note that logs are an internally debugging tool and their format -//! is considered unstable, as are the details of how to enable them. -//! -//! # Valid values for RAYON_LOG -//! -//! The `RAYON_LOG` variable can take on the following values: -//! -//! * `tail:` -- dumps the last 10,000 events into the given file; -//! useful for tracking down deadlocks -//! * `profile:` -- dumps only those events needed to reconstruct how -//! many workers are active at a given time -//! * `all:` -- dumps every event to the file; useful for debugging - -use crossbeam_channel::{self, Receiver, Sender}; -use std::collections::VecDeque; -use std::env; -use std::fs::File; -use std::io::{self, BufWriter, Write}; - -/// True if logs are compiled in. -pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); - -#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] -pub(super) enum Event { - /// Flushes events to disk, used to terminate benchmarking. - Flush, - - /// Indicates that a worker thread started execution. - ThreadStart { - worker: usize, - terminate_addr: usize, - }, - - /// Indicates that a worker thread started execution. - ThreadTerminate { worker: usize }, - - /// Indicates that a worker thread became idle, blocked on `latch_addr`. - ThreadIdle { worker: usize, latch_addr: usize }, - - /// Indicates that an idle worker thread found work to do, after - /// yield rounds. It should no longer be considered idle. - ThreadFoundWork { worker: usize, yields: u32 }, - - /// Indicates that a worker blocked on a latch observed that it was set. - /// - /// Internal debugging event that does not affect the state - /// machine. - ThreadSawLatchSet { worker: usize, latch_addr: usize }, - - /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal - /// sleep state that we saw at the time. - ThreadSleepy { worker: usize, jobs_counter: usize }, - - /// Indicates that the thread's attempt to fall asleep was - /// interrupted because the latch was set. (This is not, in and of - /// itself, a change to the thread state.) - ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, - - /// Indicates that the thread's attempt to fall asleep was - /// interrupted because a job was posted. (This is not, in and of - /// itself, a change to the thread state.) - ThreadSleepInterruptedByJob { worker: usize }, - - /// Indicates that an idle worker has gone to sleep. - ThreadSleeping { worker: usize, latch_addr: usize }, - - /// Indicates that a sleeping worker has awoken. - ThreadAwoken { worker: usize, latch_addr: usize }, - - /// Indicates that the given worker thread was notified it should - /// awaken. - ThreadNotify { worker: usize }, - - /// The given worker has pushed a job to its local deque. - JobPushed { worker: usize }, - - /// The given worker has popped a job from its local deque. - JobPopped { worker: usize }, - - /// The given worker has stolen a job from the deque of another. - JobStolen { worker: usize, victim: usize }, - - /// N jobs were injected into the global queue. - JobsInjected { count: usize }, - - /// A job was removed from the global queue. - JobUninjected { worker: usize }, - - /// A job was broadcasted to N threads. - JobBroadcast { count: usize }, - - /// When announcing a job, this was the value of the counters we observed. - /// - /// No effect on thread state, just a debugging event. - JobThreadCounts { - worker: usize, - num_idle: u16, - num_sleepers: u16, - }, -} - -/// Handle to the logging thread, if any. You can use this to deliver -/// logs. You can also clone it freely. -#[derive(Clone)] -pub(super) struct Logger { - sender: Option>, -} - -impl Logger { - pub(super) fn new(num_workers: usize) -> Logger { - if !LOG_ENABLED { - return Self::disabled(); - } - - // see the doc comment for the format - let env_log = match env::var("RAYON_LOG") { - Ok(s) => s, - Err(_) => return Self::disabled(), - }; - - let (sender, receiver) = crossbeam_channel::unbounded(); - - if let Some(filename) = env_log.strip_prefix("tail:") { - let filename = filename.to_string(); - ::std::thread::spawn(move || { - Self::tail_logger_thread(num_workers, filename, 10_000, receiver) - }); - } else if env_log == "all" { - ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); - } else if let Some(filename) = env_log.strip_prefix("profile:") { - let filename = filename.to_string(); - ::std::thread::spawn(move || { - Self::profile_logger_thread(num_workers, filename, 10_000, receiver) - }); - } else { - panic!("RAYON_LOG should be 'tail:' or 'profile:'"); - } - - Logger { - sender: Some(sender), - } - } - - fn disabled() -> Logger { - Logger { sender: None } - } - - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> Event) { - if !LOG_ENABLED { - return; - } - - if let Some(sender) = &self.sender { - sender.send(event()).unwrap(); - } - } - - fn profile_logger_thread( - num_workers: usize, - log_filename: String, - capacity: usize, - receiver: Receiver, - ) { - let file = File::create(&log_filename) - .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); - - let mut writer = BufWriter::new(file); - let mut events = Vec::with_capacity(capacity); - let mut state = SimulatorState::new(num_workers); - let timeout = std::time::Duration::from_secs(30); - - loop { - while let Ok(event) = receiver.recv_timeout(timeout) { - if let Event::Flush = event { - break; - } - - events.push(event); - if events.len() == capacity { - break; - } - } - - for event in events.drain(..) { - if state.simulate(&event) { - state.dump(&mut writer, &event).unwrap(); - } - } - - writer.flush().unwrap(); - } - } - - fn tail_logger_thread( - num_workers: usize, - log_filename: String, - capacity: usize, - receiver: Receiver, - ) { - let file = File::create(&log_filename) - .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); - - let mut writer = BufWriter::new(file); - let mut events: VecDeque = VecDeque::with_capacity(capacity); - let mut state = SimulatorState::new(num_workers); - let timeout = std::time::Duration::from_secs(30); - let mut skipped = false; - - loop { - while let Ok(event) = receiver.recv_timeout(timeout) { - if let Event::Flush = event { - // We ignore Flush events in tail mode -- - // we're really just looking for - // deadlocks. - continue; - } else { - if events.len() == capacity { - let event = events.pop_front().unwrap(); - state.simulate(&event); - skipped = true; - } - - events.push_back(event); - } - } - - if skipped { - writeln!(writer, "...").unwrap(); - skipped = false; - } - - for event in events.drain(..) { - // In tail mode, we dump *all* events out, whether or - // not they were 'interesting' to the state machine. - state.simulate(&event); - state.dump(&mut writer, &event).unwrap(); - } - - writer.flush().unwrap(); - } - } - - fn all_logger_thread(num_workers: usize, receiver: Receiver) { - let stderr = std::io::stderr(); - let mut state = SimulatorState::new(num_workers); - - for event in receiver { - let mut writer = BufWriter::new(stderr.lock()); - state.simulate(&event); - state.dump(&mut writer, &event).unwrap(); - writer.flush().unwrap(); - } - } -} - -#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] -enum State { - Working, - Idle, - Notified, - Sleeping, - Terminated, -} - -impl State { - fn letter(&self) -> char { - match self { - State::Working => 'W', - State::Idle => 'I', - State::Notified => 'N', - State::Sleeping => 'S', - State::Terminated => 'T', - } - } -} - -struct SimulatorState { - local_queue_size: Vec, - thread_states: Vec, - injector_size: usize, -} - -impl SimulatorState { - fn new(num_workers: usize) -> Self { - Self { - local_queue_size: (0..num_workers).map(|_| 0).collect(), - thread_states: (0..num_workers).map(|_| State::Working).collect(), - injector_size: 0, - } - } - - fn simulate(&mut self, event: &Event) -> bool { - match *event { - Event::ThreadIdle { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Working); - self.thread_states[worker] = State::Idle; - true - } - - Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { - self.thread_states[worker] = State::Working; - true - } - - Event::ThreadTerminate { worker, .. } => { - self.thread_states[worker] = State::Terminated; - true - } - - Event::ThreadSleeping { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Idle); - self.thread_states[worker] = State::Sleeping; - true - } - - Event::ThreadAwoken { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Notified); - self.thread_states[worker] = State::Idle; - true - } - - Event::JobPushed { worker } => { - self.local_queue_size[worker] += 1; - true - } - - Event::JobPopped { worker } => { - self.local_queue_size[worker] -= 1; - true - } - - Event::JobStolen { victim, .. } => { - self.local_queue_size[victim] -= 1; - true - } - - Event::JobsInjected { count } => { - self.injector_size += count; - true - } - - Event::JobUninjected { .. } => { - self.injector_size -= 1; - true - } - - Event::ThreadNotify { worker } => { - // Currently, this log event occurs while holding the - // thread lock, so we should *always* see it before - // the worker awakens. - assert_eq!(self.thread_states[worker], State::Sleeping); - self.thread_states[worker] = State::Notified; - true - } - - // remaining events are no-ops from pov of simulating the - // thread state - _ => false, - } - } - - fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { - let num_idle_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Idle) - .count(); - - let num_sleeping_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Sleeping) - .count(); - - let num_notified_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Notified) - .count(); - - let num_pending_jobs: usize = self.local_queue_size.iter().sum(); - - write!(w, "{:2},", num_idle_threads)?; - write!(w, "{:2},", num_sleeping_threads)?; - write!(w, "{:2},", num_notified_threads)?; - write!(w, "{:4},", num_pending_jobs)?; - write!(w, "{:4},", self.injector_size)?; - - let event_str = format!("{:?}", event); - write!(w, r#""{:60}","#, event_str)?; - - for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { - write!(w, " T{:02},{}", i, state.letter(),)?; - - if *queue_size > 0 { - write!(w, ",{:03},", queue_size)?; - } else { - write!(w, ", ,")?; - } - } - - writeln!(w)?; - Ok(()) - } -} diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 97f9182fa..4734a085f 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,7 +1,5 @@ use crate::job::{JobFifo, JobRef, StackJob}; use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; -use crate::log::Event::*; -use crate::log::Logger; use crate::sleep::Sleep; use crate::tlv::Tlv; use crate::unwind; @@ -131,7 +129,6 @@ where } pub struct Registry { - logger: Logger, thread_infos: Vec, sleep: Sleep, injected_jobs: Injector, @@ -284,11 +281,9 @@ impl Registry { }) .unzip(); - let logger = Logger::new(n_threads); let registry = Arc::new(Registry { - logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), - sleep: Sleep::new(logger, n_threads), + sleep: Sleep::new(n_threads), injected_jobs: Injector::new(), broadcasts: Mutex::new(broadcasts), terminate_count: AtomicUsize::new(1), @@ -370,11 +365,6 @@ impl Registry { } } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.logger.log(event) - } - pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } @@ -446,8 +436,6 @@ impl Registry { /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. pub(super) fn inject(&self, injected_job: JobRef) { - self.log(|| JobsInjected { count: 1 }); - // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be @@ -462,22 +450,17 @@ impl Registry { let queue_was_empty = self.injected_jobs.is_empty(); self.injected_jobs.push(injected_job); - self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty); + self.sleep.new_injected_jobs(1, queue_was_empty); } pub(crate) fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } - fn pop_injected_job(&self, worker_index: usize) -> Option { + fn pop_injected_job(&self) -> Option { loop { match self.injected_jobs.steal() { - Steal::Success(job) => { - self.log(|| JobUninjected { - worker: worker_index, - }); - return Some(job); - } + Steal::Success(job) => return Some(job), Steal::Empty => return None, Steal::Retry => {} } @@ -491,9 +474,6 @@ impl Registry { /// **Panics** if not given exactly as many jobs as there are threads. pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator) { assert_eq!(self.num_threads(), injected_jobs.len()); - self.log(|| JobBroadcast { - count: self.num_threads(), - }); { let broadcasts = self.broadcasts.lock().unwrap(); @@ -568,9 +548,6 @@ impl Registry { job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. self.acquire_thread(); - // flush accumulated logs as we exit the thread - self.logger.log(|| Flush); - job.into_result() }) } @@ -776,11 +753,6 @@ impl WorkerThread { &self.registry } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.registry.logger.log(event) - } - /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). #[inline] pub(super) fn index(&self) -> usize { @@ -789,12 +761,9 @@ impl WorkerThread { #[inline] pub(super) unsafe fn push(&self, job: JobRef) { - self.log(|| JobPushed { worker: self.index }); let queue_was_empty = self.worker.is_empty(); self.worker.push(job); - self.registry - .sleep - .new_internal_jobs(self.index, 1, queue_was_empty); + self.registry.sleep.new_internal_jobs(1, queue_was_empty); } #[inline] @@ -816,7 +785,6 @@ impl WorkerThread { let popped_job = self.worker.pop(); if popped_job.is_some() { - self.log(|| JobPopped { worker: self.index }); return popped_job; } @@ -860,10 +828,10 @@ impl WorkerThread { continue; } - let mut idle_state = self.registry.sleep.start_looking(self.index, latch); + let mut idle_state = self.registry.sleep.start_looking(self.index); while !latch.probe() { if let Some(job) = self.find_work() { - self.registry.sleep.work_found(idle_state); + self.registry.sleep.work_found(); self.execute(job); // The job might have injected local work, so go back to the outer loop. continue 'outer; @@ -876,14 +844,10 @@ impl WorkerThread { // If we were sleepy, we are not anymore. We "found work" -- // whatever the surrounding thread was doing before it had to wait. - self.registry.sleep.work_found(idle_state); + self.registry.sleep.work_found(); break; } - self.log(|| ThreadSawLatchSet { - worker: self.index, - latch_addr: latch.addr(), - }); mem::forget(abort_guard); // successful execution, do not abort } @@ -895,7 +859,7 @@ impl WorkerThread { // we take on something new. self.take_local_job() .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job(self.index)) + .or_else(|| self.registry.pop_injected_job()) } pub(super) fn yield_now(&self) -> Yield { @@ -947,13 +911,7 @@ impl WorkerThread { .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { - Steal::Success(job) => { - self.log(|| JobStolen { - worker: self.index, - victim: victim_index, - }); - Some(job) - } + Steal::Success(job) => Some(job), Steal::Empty => None, Steal::Retry => { retry = true; @@ -990,10 +948,6 @@ unsafe fn main_loop(thread: ThreadBuilder) { } let my_terminate_latch = ®istry.thread_infos[index].terminate; - worker_thread.log(|| ThreadStart { - worker: index, - terminate_addr: my_terminate_latch.as_core_latch().addr(), - }); registry.acquire_thread(); worker_thread.wait_until(my_terminate_latch); @@ -1006,8 +960,6 @@ unsafe fn main_loop(thread: ThreadBuilder) { // Normal termination, do not abort. mem::forget(abort_guard); - worker_thread.log(|| ThreadTerminate { worker: index }); - // Inform a user callback that we exited a thread. if let Some(ref handler) = registry.exit_handler { registry.catch_unwind(|| handler(index)); diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 48c92802e..fa6e45a28 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -2,8 +2,6 @@ //! for an overview. use crate::latch::CoreLatch; -use crate::log::Event::*; -use crate::log::Logger; use crate::registry::WorkerThread; use crate::DeadlockHandler; use crossbeam_utils::CachePadded; @@ -47,8 +45,6 @@ impl SleepData { /// /// [`README.md`] README.md pub(super) struct Sleep { - logger: Logger, - /// One "sleep state" per worker. Used to track if a worker is sleeping and to have /// them block. worker_sleep_states: Vec>, @@ -89,10 +85,9 @@ const ROUNDS_UNTIL_SLEEPY: u32 = 32; const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; impl Sleep { - pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep { + pub(super) fn new(n_threads: usize) -> Sleep { assert!(n_threads <= THREADS_MAX); Sleep { - logger, worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), counters: AtomicCounters::new(), data: Mutex::new(SleepData { @@ -128,12 +123,7 @@ impl Sleep { } #[inline] - pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { - self.logger.log(|| ThreadIdle { - worker: worker_index, - latch_addr: latch.addr(), - }); - + pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { self.counters.add_inactive_thread(); IdleState { @@ -144,12 +134,7 @@ impl Sleep { } #[inline] - pub(super) fn work_found(&self, idle_state: IdleState) { - self.logger.log(|| ThreadFoundWork { - worker: idle_state.worker_index, - yields: idle_state.rounds, - }); - + pub(super) fn work_found(&self) { // If we were the last idle thread and other threads are still sleeping, // then we should wake up another thread. let threads_to_wake = self.counters.sub_inactive_thread(); @@ -167,7 +152,7 @@ impl Sleep { thread::yield_now(); idle_state.rounds += 1; } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { - idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index); + idle_state.jobs_counter = self.announce_sleepy(); idle_state.rounds += 1; thread::yield_now(); } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { @@ -180,15 +165,11 @@ impl Sleep { } #[cold] - fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { + fn announce_sleepy(&self) -> JobsEventCounter { let counters = self .counters .increment_jobs_event_counter_if(JobsEventCounter::is_active); let jobs_counter = counters.jobs_counter(); - self.logger.log(|| ThreadSleepy { - worker: worker_index, - jobs_counter: jobs_counter.as_usize(), - }); jobs_counter } @@ -197,11 +178,6 @@ impl Sleep { let worker_index = idle_state.worker_index; if !latch.get_sleepy() { - self.logger.log(|| ThreadSleepInterruptedByLatch { - worker: worker_index, - latch_addr: latch.addr(), - }); - return; } @@ -212,11 +188,6 @@ impl Sleep { // Our latch was signalled. We should wake back up fully as we // will have some stuff to do. if !latch.fall_asleep() { - self.logger.log(|| ThreadSleepInterruptedByLatch { - worker: worker_index, - latch_addr: latch.addr(), - }); - idle_state.wake_fully(); return; } @@ -231,10 +202,6 @@ impl Sleep { // we didn't see it. We should return to just before the SLEEPY // state so we can do another search and (if we fail to find // work) go back to sleep. - self.logger.log(|| ThreadSleepInterruptedByJob { - worker: worker_index, - }); - idle_state.wake_partly(); latch.wake_up(); return; @@ -248,11 +215,6 @@ impl Sleep { // Successfully registered as asleep. - self.logger.log(|| ThreadSleeping { - worker: worker_index, - latch_addr: latch.addr(), - }); - // We have one last check for injected jobs to do. This protects against // deadlock in the very unlikely event that // @@ -296,11 +258,6 @@ impl Sleep { // Update other state: idle_state.wake_fully(); latch.wake_up(); - - self.logger.log(|| ThreadAwoken { - worker: worker_index, - latch_addr: latch.addr(), - }); } /// Notify the given thread that it should wake up (if it is @@ -318,24 +275,16 @@ impl Sleep { /// /// # Parameters /// - /// - `source_worker_index` -- index of the thread that did the - /// push, or `usize::MAX` if this came from outside the thread - /// pool -- it is used only for logging. /// - `num_jobs` -- lower bound on number of jobs available for stealing. /// We'll try to get at least one thread per job. #[inline] - pub(super) fn new_injected_jobs( - &self, - source_worker_index: usize, - num_jobs: u32, - queue_was_empty: bool, - ) { + pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { // This fence is needed to guarantee that threads // as they are about to fall asleep, observe any // new jobs that may have been injected. std::sync::atomic::fence(Ordering::SeqCst); - self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + self.new_jobs(num_jobs, queue_was_empty) } /// Signals that `num_jobs` new jobs were pushed onto a thread's @@ -348,24 +297,16 @@ impl Sleep { /// /// # Parameters /// - /// - `source_worker_index` -- index of the thread that did the - /// push, or `usize::MAX` if this came from outside the thread - /// pool -- it is used only for logging. /// - `num_jobs` -- lower bound on number of jobs available for stealing. /// We'll try to get at least one thread per job. #[inline] - pub(super) fn new_internal_jobs( - &self, - source_worker_index: usize, - num_jobs: u32, - queue_was_empty: bool, - ) { - self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { + self.new_jobs(num_jobs, queue_was_empty) } /// Common helper for `new_injected_jobs` and `new_internal_jobs`. #[inline] - fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) { + fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { // Read the counters and -- if sleepy workers have announced themselves // -- announce that there is now work available. The final value of `counters` // with which we exit the loop thus corresponds to a state when @@ -375,12 +316,6 @@ impl Sleep { let num_awake_but_idle = counters.awake_but_idle_threads(); let num_sleepers = counters.sleeping_threads(); - self.logger.log(|| JobThreadCounts { - worker: source_worker_index, - num_idle: num_awake_but_idle as u16, - num_sleepers: num_sleepers as u16, - }); - if num_sleepers == 0 { // nobody to wake return; @@ -440,8 +375,6 @@ impl Sleep { // do. self.counters.sub_sleeping_thread(); - self.logger.log(|| ThreadNotify { worker: index }); - true } else { false diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 6143e5799..88b36282d 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -383,7 +383,7 @@ fn in_place_scope_fifo_no_deadlock() { #[test] fn yield_now_to_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); @@ -401,7 +401,7 @@ fn yield_now_to_spawn() { #[test] fn yield_local_to_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); From 561b81f07b5230fad63bf599f9414d06f8cb592b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Sun, 25 Jun 2023 14:05:54 +0200 Subject: [PATCH 11/27] core: registry: Factor out "wait till out of work" part of the main loop. This was originally done for #1063, in order to reuse this to allow cleaning up the TLS data allocated by use_current_thread. We ended up not using that, but this refactoring seems useful on its own. (cherry picked from commit ea0c06df26029bee6adf9650660f71748c569c0e) --- rayon-core/src/registry.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 4734a085f..a94d6776b 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -851,6 +851,21 @@ impl WorkerThread { mem::forget(abort_guard); // successful execution, do not abort } + unsafe fn wait_until_out_of_work(&self) { + debug_assert_eq!(self as *const _, WorkerThread::current()); + let registry = &*self.registry; + let index = self.index; + + registry.acquire_thread(); + self.wait_until(®istry.thread_infos[index].terminate); + + // Should not be any work left in our queue. + debug_assert!(self.take_local_job().is_none()); + + // Let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); + } + fn find_work(&self) -> Option { // Try to find some work to do. We give preference first // to things in our local deque, then in other workers @@ -947,15 +962,7 @@ unsafe fn main_loop(thread: ThreadBuilder) { registry.catch_unwind(|| handler(index)); } - let my_terminate_latch = ®istry.thread_infos[index].terminate; - registry.acquire_thread(); - worker_thread.wait_until(my_terminate_latch); - - // Should not be any work left in our queue. - debug_assert!(worker_thread.take_local_job().is_none()); - - // let registry know we are done - Latch::set(®istry.thread_infos[index].stopped); + worker_thread.wait_until_out_of_work(); // Normal termination, do not abort. mem::forget(abort_guard); From d281300fefae96d2d1563a25df402b4f2ebf56e8 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 20 Sep 2023 11:31:06 -0700 Subject: [PATCH 12/27] Fix clippy::let_and_return (cherry picked from commit 3fe51e5cbd91b1d6ca04fb30ad80cabd8e6adefb) --- rayon-core/src/sleep/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index fa6e45a28..f2b432508 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -166,11 +166,9 @@ impl Sleep { #[cold] fn announce_sleepy(&self) -> JobsEventCounter { - let counters = self - .counters - .increment_jobs_event_counter_if(JobsEventCounter::is_active); - let jobs_counter = counters.jobs_counter(); - jobs_counter + self.counters + .increment_jobs_event_counter_if(JobsEventCounter::is_active) + .jobs_counter() } #[cold] From 8a3d5742d5205d33000d2968c02de69c850c7ed7 Mon Sep 17 00:00:00 2001 From: Benjamin Kay Date: Wed, 29 Nov 2023 14:12:14 -0600 Subject: [PATCH 13/27] Document implicit yield in install() per #1105 (cherry picked from commit e9835aa6707f590a5f0821c4070207a9e6e20050) --- rayon-core/src/thread_pool/mod.rs | 37 +++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index fd890c9fd..612ac2540 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -80,6 +80,43 @@ impl ThreadPool { /// thread-local data from the current thread will not be /// accessible. /// + /// # Warning: execution order + /// + /// If the current thread is part of a different thread pool, it will try to + /// keep busy while the `op` completes in its target pool, similar to + /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may + /// potentially schedule other tasks to run on the current thread in the + /// meantime. For example + /// + /// ```rust + /// # use rayon_core as rayon; + /// fn main() { + /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap(); + /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap(); + /// let do_it = || { + /// print!("one "); + /// pool.install(||{}); + /// print!("two "); + /// } + /// rayon::join(|| do_it(), || do_it()); + /// } + /// ``` + /// + /// Since we configured just one thread in the global pool, one might + /// expect `do_it()` to run sequentially, producing: + /// + /// ```ascii + /// one two one two + /// ``` + /// + /// However each call to `install()` yields implicitly, allowing rayon to + /// run multiple instances of `do_it()` concurrently on the single, global + /// thread. The following output would be equally valid: + /// + /// ```ascii + /// one one two two + /// ``` + /// /// # Panics /// /// If `op` should panic, that panic will be propagated. From 6152a3ff1658c4a30bdf8c138d0b6c5d87dbe29f Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 13 Dec 2023 08:27:27 -0800 Subject: [PATCH 14/27] Syntax fix in `ThreadPool::install` example (cherry picked from commit 9dc500fbdfaa43229da79ea6bec4e7a4ecf53240) --- rayon-core/src/thread_pool/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index 612ac2540..65af6d710 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -97,7 +97,7 @@ impl ThreadPool { /// print!("one "); /// pool.install(||{}); /// print!("two "); - /// } + /// }; /// rayon::join(|| do_it(), || do_it()); /// } /// ``` From 382c2bbcba2645e69f71b4ceece684d8879345ff Mon Sep 17 00:00:00 2001 From: bishopcheckmate Date: Sat, 6 Jan 2024 22:01:28 +0100 Subject: [PATCH 15/27] doc: be more clear about what the 'spawn' does (cherry picked from commit 2e071c23dccd15574b9df937fd9bf6ba6987937b) --- rayon-core/src/spawn/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index c2a8b3b53..034df30dc 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -5,8 +5,8 @@ use crate::unwind; use std::mem; use std::sync::Arc; -/// Fires off a task into the Rayon threadpool in the "static" or -/// "global" scope. Just like a standard thread, this task is not +/// Puts the task into the Rayon threadpool's job queue in the "static" +/// or "global" scope. Just like a standard thread, this task is not /// tied to the current stack frame, and hence it cannot hold any /// references other than those with `'static` lifetime. If you want /// to spawn a task that references stack data, use [the `scope()` From 85ed287ad127272f5e07a4e0f3767b8299679eea Mon Sep 17 00:00:00 2001 From: acceptacross Date: Mon, 11 Mar 2024 17:46:13 +0800 Subject: [PATCH 16/27] chore: remove repetitive word Signed-off-by: acceptacross (cherry picked from commit 6f4bb8e451d7a208a68a2b5c3ca061e7a0900c56) --- rayon-core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index c0cb2927c..72064547e 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -232,7 +232,7 @@ type DeadlockHandler = dyn Fn() + Send + Sync; type StartHandler = dyn Fn(usize) + Send + Sync; /// The type for a closure that gets invoked when a thread exits. The -/// closure is passed the index of the thread on which is is invoked. +/// closure is passed the index of the thread on which it is invoked. /// Note that this same closure may be invoked multiple times in parallel. type ExitHandler = dyn Fn(usize) + Send + Sync; From 2a25b537d03311693ba5e51d24c68e7e640d5d3a Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 23 Mar 2024 21:00:13 -0700 Subject: [PATCH 17/27] Allow clippy::type_complexity (cherry picked from commit a14f4591478c1ac73fd0a07492870d6898f6552d) --- rayon-core/src/scope/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index f9fb8d559..364b322ba 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -56,6 +56,7 @@ struct ScopeBase<'scope> { /// all of which outlive `'scope`. They're not actually required to be /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. + #[allow(clippy::type_complexity)] marker: PhantomData) + Send + Sync + 'scope>>, /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. From 5f1709b0b668bafb925564d470c9f4bba60a0b20 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 23 Mar 2024 21:11:32 -0700 Subject: [PATCH 18/27] Use prelude `Ord::min`/`max` instead of from `cmp` (cherry picked from commit f3dbb4283223ca006e4d501a9f4f995939310a50) --- rayon-core/src/scope/test.rs | 3 +-- rayon-core/src/sleep/counters.rs | 2 +- rayon-core/src/sleep/mod.rs | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/rayon-core/src/scope/test.rs b/rayon-core/src/scope/test.rs index ad8c4af0b..bfec35382 100644 --- a/rayon-core/src/scope/test.rs +++ b/rayon-core/src/scope/test.rs @@ -3,7 +3,6 @@ use crate::ThreadPoolBuilder; use crate::{scope, scope_fifo, Scope, ScopeFifo}; use rand::{Rng, SeedableRng}; use rand_xorshift::XorShiftRng; -use std::cmp; use std::iter::once; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Barrier, Mutex}; @@ -182,7 +181,7 @@ fn the_final_countdown<'scope>( let diff = if p > q { p - q } else { q - p }; let mut data = max.lock().unwrap(); - *data = cmp::max(diff, *data); + *data = Ord::max(diff, *data); if n > 0 { s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1)); diff --git a/rayon-core/src/sleep/counters.rs b/rayon-core/src/sleep/counters.rs index 53d2c5527..96db5dca2 100644 --- a/rayon-core/src/sleep/counters.rs +++ b/rayon-core/src/sleep/counters.rs @@ -166,7 +166,7 @@ impl AtomicCounters { // Current heuristic: whenever an inactive thread goes away, if // there are any sleeping threads, wake 'em up. let sleeping_threads = old_value.sleeping_threads(); - std::cmp::min(sleeping_threads, 2) + Ord::min(sleeping_threads, 2) } /// Subtracts a sleeping thread. This cannot fail, but it is only diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index f2b432508..275805bb8 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -328,10 +328,10 @@ impl Sleep { // -- clearly the existing idle jobs aren't enough. Otherwise, // check to see if we have enough idle workers. if !queue_was_empty { - let num_to_wake = std::cmp::min(num_jobs, num_sleepers); + let num_to_wake = Ord::min(num_jobs, num_sleepers); self.wake_any_threads(num_to_wake); } else if num_awake_but_idle < num_jobs { - let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); + let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers); self.wake_any_threads(num_to_wake); } } From a8bc45b4219dcb44b1d9e54f79de864b7ffbaa30 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 23 Mar 2024 21:16:53 -0700 Subject: [PATCH 19/27] Fix unused_imports (cherry picked from commit a4c8748e3e170083bf5e79210d7a9cd23c5a5524) --- rayon-core/src/join/test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rayon-core/src/join/test.rs b/rayon-core/src/join/test.rs index b303dbc81..17084db87 100644 --- a/rayon-core/src/join/test.rs +++ b/rayon-core/src/join/test.rs @@ -1,7 +1,6 @@ //! Tests for the join code. -use crate::join::*; -use crate::unwind; +use super::*; use crate::ThreadPoolBuilder; use rand::distributions::Standard; use rand::{Rng, SeedableRng}; From abd7a95c204ba0c9e5cd7a255defd90b71792c60 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 23 Mar 2024 21:27:51 -0700 Subject: [PATCH 20/27] Fix clippy::blocks_in_conditions (cherry picked from commit e8c381ddb43146dccad03822d6b5415d91e594bc) --- rayon-core/src/scope/test.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/rayon-core/src/scope/test.rs b/rayon-core/src/scope/test.rs index bfec35382..db1e4227a 100644 --- a/rayon-core/src/scope/test.rs +++ b/rayon-core/src/scope/test.rs @@ -216,12 +216,13 @@ fn panic_propagate_nested_scope_spawn() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_1() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| panic!("Hello, world!")); // job A s.spawn(|_| x = true); // job B, should still execute even though A panics }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "job b failed to execute"), } @@ -231,12 +232,13 @@ fn panic_propagate_still_execute_1() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_2() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| x = true); // job B, should still execute even though A panics s.spawn(|_| panic!("Hello, world!")); // job A }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "job b failed to execute"), } @@ -246,12 +248,13 @@ fn panic_propagate_still_execute_2() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_3() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| x = true); // spawned job should still execute despite later panic panic!("Hello, world!"); }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), } @@ -261,12 +264,13 @@ fn panic_propagate_still_execute_3() { #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_propagate_still_execute_4() { let mut x = false; - match unwind::halt_unwinding(|| { + let result = unwind::halt_unwinding(|| { scope(|s| { s.spawn(|_| panic!("Hello, world!")); x = true; }); - }) { + }); + match result { Ok(_) => panic!("failed to propagate panic"), Err(_) => assert!(x, "panic in spawn tainted scope"), } From f4533cdb497293bff5ea4b4d39de064e282c0baa Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 23 Mar 2024 21:38:04 -0700 Subject: [PATCH 21/27] Allow clippy::incompatible_msrv (cherry picked from commit 1900fae415ea0e7ceeedd9695696fb3ddc73e613) --- rayon-core/tests/stack_overflow_crash.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rayon-core/tests/stack_overflow_crash.rs b/rayon-core/tests/stack_overflow_crash.rs index 7dcde43c4..a64940692 100644 --- a/rayon-core/tests/stack_overflow_crash.rs +++ b/rayon-core/tests/stack_overflow_crash.rs @@ -8,6 +8,7 @@ use std::os::unix::process::ExitStatusExt; fn force_stack_overflow(depth: u32) { let mut buffer = [0u8; 1024 * 1024]; + #[allow(clippy::incompatible_msrv)] std::hint::black_box(&mut buffer); if depth > 0 { force_stack_overflow(depth - 1); From 32d700f66b036a5f51d82a6f1629cda8743ac8b7 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Thu, 2 May 2024 13:51:04 -0700 Subject: [PATCH 22/27] Fix `clippy::legacy_numeric_constants` (cherry picked from commit b2f739340c4b04388d1492ec498236738e7c0e4e) --- rayon-core/src/latch.rs | 1 - rayon-core/src/registry.rs | 6 +----- rayon-core/src/sleep/counters.rs | 2 +- rayon-core/src/sleep/mod.rs | 1 - 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index b0cbbd833..8903942a8 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -2,7 +2,6 @@ use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; -use std::usize; use crate::registry::{Registry, WorkerThread}; diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index a94d6776b..971ad63e7 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -18,7 +18,6 @@ use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Once}; use std::thread; -use std::usize; /// Thread builder used for customization via /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). @@ -599,10 +598,7 @@ impl Registry { pub(super) fn increment_terminate_count(&self) { let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); debug_assert!(previous != 0, "registry ref count incremented from zero"); - assert!( - previous != std::usize::MAX, - "overflow in registry ref count" - ); + assert!(previous != usize::MAX, "overflow in registry ref count"); } /// Signals that the thread-pool which owns this registry has been diff --git a/rayon-core/src/sleep/counters.rs b/rayon-core/src/sleep/counters.rs index 96db5dca2..05941becd 100644 --- a/rayon-core/src/sleep/counters.rs +++ b/rayon-core/src/sleep/counters.rs @@ -27,7 +27,7 @@ pub(super) struct Counters { pub(super) struct JobsEventCounter(usize); impl JobsEventCounter { - pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX); + pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(usize::MAX); #[inline] pub(super) fn as_usize(self) -> usize { diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 275805bb8..7d88ece21 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -8,7 +8,6 @@ use crossbeam_utils::CachePadded; use std::sync::atomic::Ordering; use std::sync::{Condvar, Mutex}; use std::thread; -use std::usize; mod counters; pub(crate) use self::counters::THREADS_MAX; From ec4c76e12b3f58926bb4f30593facb06b625df84 Mon Sep 17 00:00:00 2001 From: leopardracer <136604165+leopardracer@users.noreply.github.com> Date: Mon, 2 Dec 2024 20:59:05 +0200 Subject: [PATCH 23/27] Update README.md (cherry picked from commit 96ea97ecd5f4aabb53d855f66c34093e71a34f3e) --- rayon-core/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/README.md b/rayon-core/README.md index 43a1ba3cb..5b8714f5d 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -2,7 +2,7 @@ Note: This is an unstable fork made for use in rustc Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. -Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. +Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirrored in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. rayon-core aims to never, or almost never, have a breaking change to its API, because each revision of rayon-core also houses the global thread-pool (and hence if you have two simultaneous versions of rayon-core, you have two thread-pools). From 6f13930107c86720406b0279211c028806ad106f Mon Sep 17 00:00:00 2001 From: Bilog WEB3 <155262265+Bilogweb3@users.noreply.github.com> Date: Tue, 10 Dec 2024 22:18:18 +0100 Subject: [PATCH 24/27] Update README.md (cherry picked from commit 95eed8b3062b3b81a893a9514693b90c3bb2c99c) --- rayon-core/src/sleep/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index 6f27df51d..e79efd15c 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -139,7 +139,7 @@ The full protocol for a thread to fall asleep is as follows: * Checks the JEC to see that it has not changed from `final_value`. * If it has, then the thread goes back to searching for work. We reset to just before we got sleepy, so that we will do one more search - before attending to sleep again (rather than searching for many rounds). + before attempting to sleep again (rather than searching for many rounds). * Increments the number of sleeping threads by 1. * The thread then executes a seq-cst fence operation (see below). * The thread then does one final check for injected jobs (see below). If any From 03fb00f7a9e047da48fe020a18e2022a56bab63c Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 9 Apr 2025 15:45:41 -0700 Subject: [PATCH 25/27] Fix `static_mut_refs` on the global registry (cherry picked from commit fade88f5e4f7829d0f37469e571ca653ce1369df) --- rayon-core/src/registry.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 971ad63e7..781b6827b 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -166,7 +166,13 @@ static THE_REGISTRY_SET: Once = Once::new(); /// configuration. pub(super) fn global_registry() -> &'static Arc { set_global_registry(default_global_registry) - .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) + .or_else(|err| { + // SAFETY: we only create a shared reference to `THE_REGISTRY` after the `call_once` + // that initializes it, and there will be no more mutable accesses at all. + debug_assert!(THE_REGISTRY_SET.is_completed()); + let the_registry = unsafe { &*ptr::addr_of!(THE_REGISTRY) }; + the_registry.as_ref().ok_or(err) + }) .expect("The global thread pool has not been initialized.") } @@ -192,8 +198,14 @@ where )); THE_REGISTRY_SET.call_once(|| { - result = registry() - .map(|registry: Arc| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) + result = registry().map(|registry: Arc| { + // SAFETY: this is the only mutable access to `THE_REGISTRY`, thanks to `Once`, and + // `global_registry()` only takes a shared reference **after** this `call_once`. + unsafe { + ptr::addr_of_mut!(THE_REGISTRY).write(Some(registry)); + (*ptr::addr_of!(THE_REGISTRY)).as_ref().unwrap_unchecked() + } + }) }); result From e546baafb0b0eac93198d6c7b068e82e70d5cd58 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 9 Apr 2025 16:41:38 -0700 Subject: [PATCH 26/27] Upgrade dev-deps to rand 0.9 (cherry picked from commit 2ca893b9914afc1a8d021457b8bc5eed3fae28e0) --- rayon-core/Cargo.toml | 4 ++-- rayon-core/src/join/test.rs | 6 +++--- rayon-core/src/scope/test.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 917714b32..4539729af 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -22,8 +22,8 @@ crossbeam-deque = "0.8.1" crossbeam-utils = "0.8.0" [dev-dependencies] -rand = "0.8" -rand_xorshift = "0.3" +rand = "0.9" +rand_xorshift = "0.4" scoped-tls = "1.0" [target.'cfg(unix)'.dev-dependencies] diff --git a/rayon-core/src/join/test.rs b/rayon-core/src/join/test.rs index 17084db87..03f4ab447 100644 --- a/rayon-core/src/join/test.rs +++ b/rayon-core/src/join/test.rs @@ -2,7 +2,7 @@ use super::*; use crate::ThreadPoolBuilder; -use rand::distributions::Standard; +use rand::distr::StandardUniform; use rand::{Rng, SeedableRng}; use rand_xorshift::XorShiftRng; @@ -38,7 +38,7 @@ fn seeded_rng() -> XorShiftRng { #[test] fn sort() { let rng = seeded_rng(); - let mut data: Vec = rng.sample_iter(&Standard).take(6 * 1024).collect(); + let mut data: Vec = rng.sample_iter(&StandardUniform).take(6 * 1024).collect(); let mut sorted_data = data.clone(); sorted_data.sort(); quick_sort(&mut data); @@ -49,7 +49,7 @@ fn sort() { #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn sort_in_pool() { let rng = seeded_rng(); - let mut data: Vec = rng.sample_iter(&Standard).take(12 * 1024).collect(); + let mut data: Vec = rng.sample_iter(&StandardUniform).take(12 * 1024).collect(); let pool = ThreadPoolBuilder::new().build().unwrap(); let mut sorted_data = data.clone(); diff --git a/rayon-core/src/scope/test.rs b/rayon-core/src/scope/test.rs index db1e4227a..4505ba7c4 100644 --- a/rayon-core/src/scope/test.rs +++ b/rayon-core/src/scope/test.rs @@ -119,13 +119,13 @@ fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree { let children = if depth == 0 { vec![] } else { - (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level + (0..rng.random_range(0..4)) // somewhere between 0 and 3 children at each level .map(|_| random_tree1(depth - 1, rng)) .collect() }; Tree { - value: rng.gen_range(0..1_000_000), + value: rng.random_range(0..1_000_000), children, } } From 92cc51d0b24cb1310a374aa910da0c84ca3cd5af Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 16 Apr 2025 12:20:30 -0700 Subject: [PATCH 27/27] Release rustc-rayon-core 0.5.1 --- rayon-core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 4539729af..f7a3d1306 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustc-rayon-core" -version = "0.5.0" +version = "0.5.1" authors = ["Niko Matsakis ", "Josh Stone "] description = "Core APIs for Rayon - fork for rustc"