Skip to content

Commit 322dfe8

Browse files
bors[bot]cuviper
andauthored
1026: Add `yield_now` and `yield_local` r=cuviper a=cuviper * `yield_now` looks for work anywhere in the thread pool and executes it. * `yield_local` only looks in the thread-local deque (including broadcasts). In either case, they return: * `Some(Yield::Executed)` if work was found and executed. * `Some(Yield::Idle)` if no work was found in their pool/thread. * `None` if the current thread is not part of a thread pool. These do not call `std::thread::yield_now()`, but the docs suggest polling loops might want to add that as a fallback. Co-authored-by: Josh Stone <cuviper@gmail.com>
2 parents 099241d + 874ff73 commit 322dfe8

File tree

5 files changed

+144
-13
lines changed

5 files changed

+144
-13
lines changed

rayon-core/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
//! there is no other thread to share the work. However, since the pool is not running independent
3939
//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
4040
//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41-
//! anything like thread preemption or `async` task switching.
41+
//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42+
//! can also volunteer execution time.
4243
//!
4344
//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
4445
//!
@@ -101,6 +102,7 @@ pub use self::spawn::{spawn, spawn_fifo};
101102
pub use self::thread_pool::current_thread_has_pending_tasks;
102103
pub use self::thread_pool::current_thread_index;
103104
pub use self::thread_pool::ThreadPool;
105+
pub use self::thread_pool::{yield_local, yield_now, Yield};
104106

105107
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
106108

rayon-core/src/registry.rs

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::sleep::Sleep;
66
use crate::unwind;
77
use crate::{
88
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
9+
Yield,
910
};
1011
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
1112
use std::cell::Cell;
@@ -772,7 +773,7 @@ impl WorkerThread {
772773
/// for breadth-first execution, it would mean dequeuing from the
773774
/// bottom.
774775
#[inline]
775-
pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
776+
pub(super) fn take_local_job(&self) -> Option<JobRef> {
776777
let popped_job = self.worker.pop();
777778

778779
if popped_job.is_some() {
@@ -814,16 +815,7 @@ impl WorkerThread {
814815

815816
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
816817
while !latch.probe() {
817-
// Try to find some work to do. We give preference first
818-
// to things in our local deque, then in other workers
819-
// deques, and finally to injected jobs from the
820-
// outside. The idea is to finish what we started before
821-
// we take on something new.
822-
if let Some(job) = self
823-
.take_local_job()
824-
.or_else(|| self.steal())
825-
.or_else(|| self.registry.pop_injected_job(self.index))
826-
{
818+
if let Some(job) = self.find_work() {
827819
self.registry.sleep.work_found(idle_state);
828820
self.execute(job);
829821
idle_state = self.registry.sleep.start_looking(self.index, latch);
@@ -846,6 +838,37 @@ impl WorkerThread {
846838
mem::forget(abort_guard); // successful execution, do not abort
847839
}
848840

841+
fn find_work(&self) -> Option<JobRef> {
842+
// Try to find some work to do. We give preference first
843+
// to things in our local deque, then in other workers
844+
// deques, and finally to injected jobs from the
845+
// outside. The idea is to finish what we started before
846+
// we take on something new.
847+
self.take_local_job()
848+
.or_else(|| self.steal())
849+
.or_else(|| self.registry.pop_injected_job(self.index))
850+
}
851+
852+
pub(super) fn yield_now(&self) -> Yield {
853+
match self.find_work() {
854+
Some(job) => unsafe {
855+
self.execute(job);
856+
Yield::Executed
857+
},
858+
None => Yield::Idle,
859+
}
860+
}
861+
862+
pub(super) fn yield_local(&self) -> Yield {
863+
match self.take_local_job() {
864+
Some(job) => unsafe {
865+
self.execute(job);
866+
Yield::Executed
867+
},
868+
None => Yield::Idle,
869+
}
870+
}
871+
849872
#[inline]
850873
pub(super) unsafe fn execute(&self, job: JobRef) {
851874
job.execute();
@@ -855,7 +878,7 @@ impl WorkerThread {
855878
///
856879
/// This should only be done as a last resort, when there is no
857880
/// local work to do.
858-
unsafe fn steal(&self) -> Option<JobRef> {
881+
fn steal(&self) -> Option<JobRef> {
859882
// we only steal when we don't have any work to do locally
860883
debug_assert!(self.local_deque_is_empty());
861884

rayon-core/src/thread_pool/mod.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,30 @@ impl ThreadPool {
339339
// We assert that `self.registry` has not terminated.
340340
unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
341341
}
342+
343+
/// Cooperatively yields execution to Rayon.
344+
///
345+
/// This is similar to the general [`yield_now()`], but only if the current
346+
/// thread is part of *this* thread pool.
347+
///
348+
/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
349+
/// nothing was available, or `None` if the current thread is not part this pool.
350+
pub fn yield_now(&self) -> Option<Yield> {
351+
let curr = self.registry.current_thread()?;
352+
Some(curr.yield_now())
353+
}
354+
355+
/// Cooperatively yields execution to local Rayon work.
356+
///
357+
/// This is similar to the general [`yield_local()`], but only if the current
358+
/// thread is part of *this* thread pool.
359+
///
360+
/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
361+
/// nothing was available, or `None` if the current thread is not part this pool.
362+
pub fn yield_local(&self) -> Option<Yield> {
363+
let curr = self.registry.current_thread()?;
364+
Some(curr.yield_local())
365+
}
342366
}
343367

344368
impl Drop for ThreadPool {
@@ -400,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
400424
Some(!curr.local_deque_is_empty())
401425
}
402426
}
427+
428+
/// Cooperatively yields execution to Rayon.
429+
///
430+
/// If the current thread is part of a rayon thread pool, this looks for a
431+
/// single unit of pending work in the pool, then executes it. Completion of
432+
/// that work might include nested work or further work stealing.
433+
///
434+
/// This is similar to [`std::thread::yield_now()`], but does not literally make
435+
/// that call. If you are implementing a polling loop, you may want to also
436+
/// yield to the OS scheduler yourself if no Rayon work was found.
437+
///
438+
/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
439+
/// nothing was available, or `None` if this thread is not part of any pool at all.
440+
pub fn yield_now() -> Option<Yield> {
441+
unsafe {
442+
let thread = WorkerThread::current().as_ref()?;
443+
Some(thread.yield_now())
444+
}
445+
}
446+
447+
/// Cooperatively yields execution to local Rayon work.
448+
///
449+
/// If the current thread is part of a rayon thread pool, this looks for a
450+
/// single unit of pending work in this thread's queue, then executes it.
451+
/// Completion of that work might include nested work or further work stealing.
452+
///
453+
/// This is similar to [`yield_now()`], but does not steal from other threads.
454+
///
455+
/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
456+
/// nothing was available, or `None` if this thread is not part of any pool at all.
457+
pub fn yield_local() -> Option<Yield> {
458+
unsafe {
459+
let thread = WorkerThread::current().as_ref()?;
460+
Some(thread.yield_local())
461+
}
462+
}
463+
464+
/// Result of [`yield_now()`] or [`yield_local()`].
465+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
466+
pub enum Yield {
467+
/// Work was found and executed.
468+
Executed,
469+
/// No available work was found.
470+
Idle,
471+
}

rayon-core/src/thread_pool/test.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
380380
rx_ref.recv().unwrap();
381381
});
382382
}
383+
384+
#[test]
385+
fn yield_now_to_spawn() {
386+
let (tx, rx) = crossbeam_channel::bounded(1);
387+
388+
// Queue a regular spawn.
389+
crate::spawn(move || tx.send(22).unwrap());
390+
391+
// The single-threaded fallback mode (for wasm etc.) won't
392+
// get a chance to run the spawn if we never yield to it.
393+
crate::registry::in_worker(move |_, _| {
394+
crate::yield_now();
395+
});
396+
397+
// The spawn **must** have started by now, but we still might have to wait
398+
// for it to finish if a different thread stole it first.
399+
assert_eq!(22, rx.recv().unwrap());
400+
}
401+
402+
#[test]
403+
fn yield_local_to_spawn() {
404+
let (tx, rx) = crossbeam_channel::bounded(1);
405+
406+
// Queue a regular spawn.
407+
crate::spawn(move || tx.send(22).unwrap());
408+
409+
// The single-threaded fallback mode (for wasm etc.) won't
410+
// get a chance to run the spawn if we never yield to it.
411+
crate::registry::in_worker(move |_, _| {
412+
crate::yield_local();
413+
});
414+
415+
// The spawn **must** have started by now, but we still might have to wait
416+
// for it to finish if a different thread stole it first.
417+
assert_eq!(22, rx.recv().unwrap());
418+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ pub use rayon_core::{in_place_scope, scope, Scope};
124124
pub use rayon_core::{in_place_scope_fifo, scope_fifo, ScopeFifo};
125125
pub use rayon_core::{join, join_context};
126126
pub use rayon_core::{spawn, spawn_fifo};
127+
pub use rayon_core::{yield_local, yield_now, Yield};
127128

128129
/// We need to transmit raw pointers across threads. It is possible to do this
129130
/// without any unsafe code by converting pointers to usize or to AtomicPtr<T>

0 commit comments

Comments
 (0)