Skip to content

Commit 9fa93e0

Browse files
committed
Add yield_now and yield_local
1 parent 1341ce3 commit 9fa93e0

File tree

5 files changed

+132
-13
lines changed

5 files changed

+132
-13
lines changed

rayon-core/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
//! there is no other thread to share the work. However, since the pool is not running independent
3939
//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
4040
//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41-
//! anything like thread preemption or `async` task switching.
41+
//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42+
//! can also volunteer execution time.
4243
//!
4344
//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
4445
//!
@@ -101,6 +102,7 @@ pub use self::spawn::{spawn, spawn_fifo};
101102
pub use self::thread_pool::current_thread_has_pending_tasks;
102103
pub use self::thread_pool::current_thread_index;
103104
pub use self::thread_pool::ThreadPool;
105+
pub use self::thread_pool::{yield_local, yield_now};
104106

105107
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
106108

rayon-core/src/registry.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ impl WorkerThread {
772772
/// for breadth-first execution, it would mean dequeuing from the
773773
/// bottom.
774774
#[inline]
775-
pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
775+
pub(super) fn take_local_job(&self) -> Option<JobRef> {
776776
let popped_job = self.worker.pop();
777777

778778
if popped_job.is_some() {
@@ -814,16 +814,7 @@ impl WorkerThread {
814814

815815
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
816816
while !latch.probe() {
817-
// Try to find some work to do. We give preference first
818-
// to things in our local deque, then in other workers
819-
// deques, and finally to injected jobs from the
820-
// outside. The idea is to finish what we started before
821-
// we take on something new.
822-
if let Some(job) = self
823-
.take_local_job()
824-
.or_else(|| self.steal())
825-
.or_else(|| self.registry.pop_injected_job(self.index))
826-
{
817+
if let Some(job) = self.find_work() {
827818
self.registry.sleep.work_found(idle_state);
828819
self.execute(job);
829820
idle_state = self.registry.sleep.start_looking(self.index, latch);
@@ -846,6 +837,37 @@ impl WorkerThread {
846837
mem::forget(abort_guard); // successful execution, do not abort
847838
}
848839

840+
fn find_work(&self) -> Option<JobRef> {
841+
// Try to find some work to do. We give preference first
842+
// to things in our local deque, then in other workers
843+
// deques, and finally to injected jobs from the
844+
// outside. The idea is to finish what we started before
845+
// we take on something new.
846+
self.take_local_job()
847+
.or_else(|| self.steal())
848+
.or_else(|| self.registry.pop_injected_job(self.index))
849+
}
850+
851+
pub(super) fn yield_now(&self) -> bool {
852+
match self.find_work() {
853+
Some(job) => unsafe {
854+
self.execute(job);
855+
true
856+
},
857+
None => false,
858+
}
859+
}
860+
861+
pub(super) fn yield_local(&self) -> bool {
862+
match self.take_local_job() {
863+
Some(job) => unsafe {
864+
self.execute(job);
865+
true
866+
},
867+
None => false,
868+
}
869+
}
870+
849871
#[inline]
850872
pub(super) unsafe fn execute(&self, job: JobRef) {
851873
job.execute();
@@ -855,7 +877,7 @@ impl WorkerThread {
855877
///
856878
/// This should only be done as a last resort, when there is no
857879
/// local work to do.
858-
unsafe fn steal(&self) -> Option<JobRef> {
880+
fn steal(&self) -> Option<JobRef> {
859881
// we only steal when we don't have any work to do locally
860882
debug_assert!(self.local_deque_is_empty());
861883

rayon-core/src/thread_pool/mod.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,28 @@ impl ThreadPool {
339339
// We assert that `self.registry` has not terminated.
340340
unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
341341
}
342+
343+
/// Cooperatively yields execution to Rayon.
344+
///
345+
/// This is similar to the general [`yield_now()`], but only if the current
346+
/// thread is part of *this* thread pool. Returns `Some(true)` if anything
347+
/// was executed, `Some(false)` if nothing was available, or `None` if the
348+
/// current thread is not part of this pool.
349+
pub fn yield_now(&self) -> Option<bool> {
350+
let curr = self.registry.current_thread()?;
351+
Some(curr.yield_now())
352+
}
353+
354+
/// Cooperatively yields execution to local Rayon work.
355+
///
356+
/// This is similar to the general [`yield_local()`], but only if the current
357+
/// thread is part of *this* thread pool. Returns `Some(true)` if anything
358+
/// was executed, `Some(false)` if nothing was available, or `None` if the
359+
/// current thread is not part of this pool.
360+
pub fn yield_local(&self) -> Option<bool> {
361+
let curr = self.registry.current_thread()?;
362+
Some(curr.yield_local())
363+
}
342364
}
343365

344366
impl Drop for ThreadPool {
@@ -400,3 +422,39 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
400422
Some(!curr.local_deque_is_empty())
401423
}
402424
}
425+
426+
/// Cooperatively yields execution to Rayon.
427+
///
428+
/// If the current thread is part of a rayon thread pool, this looks for a
429+
/// single unit of pending work in the pool, then executes it. Completion of
430+
/// that work might include nested work or further work stealing.
431+
///
432+
/// This is similar to [`std::thread::yield_now()`], but does not literally make
433+
/// that call. If you are implementing a polling loop, you may want to also
434+
/// yield to the OS scheduler yourself if no Rayon work was found.
435+
///
436+
/// Returns `Some(true)` if anything was executed, `Some(false)` if nothing was
437+
/// available, or `None` if this thread is not part of any pool at all.
438+
pub fn yield_now() -> Option<bool> {
439+
unsafe {
440+
let thread = WorkerThread::current().as_ref()?;
441+
Some(thread.yield_now())
442+
}
443+
}
444+
445+
/// Cooperatively yields execution to local Rayon work.
446+
///
447+
/// If the current thread is part of a rayon thread pool, this looks for a
448+
/// single unit of pending work in this thread's queue, then executes it.
449+
/// Completion of that work might include nested work or further work stealing.
450+
///
451+
/// This is similar to [`yield_now()`], but does not steal from other threads.
452+
///
453+
/// Returns `Some(true)` if anything was executed, `Some(false)` if nothing was
454+
/// available, or `None` if this thread is not part of any pool at all.
455+
pub fn yield_local() -> Option<bool> {
456+
unsafe {
457+
let thread = WorkerThread::current().as_ref()?;
458+
Some(thread.yield_local())
459+
}
460+
}

rayon-core/src/thread_pool/test.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
380380
rx_ref.recv().unwrap();
381381
});
382382
}
383+
384+
#[test]
385+
fn yield_now_to_spawn() {
386+
let (tx, rx) = crossbeam_channel::bounded(1);
387+
388+
// Queue a regular spawn.
389+
crate::spawn(move || tx.send(22).unwrap());
390+
391+
// The single-threaded fallback mode (for wasm etc.) won't
392+
// get a chance to run the spawn if we never yield to it.
393+
crate::registry::in_worker(move |_, _| {
394+
crate::yield_now();
395+
});
396+
397+
// The spawn **must** have started by now, but we still might have to wait
398+
// for it to finish if a different thread stole it first.
399+
assert_eq!(22, rx.recv().unwrap());
400+
}
401+
402+
#[test]
403+
fn yield_local_to_spawn() {
404+
let (tx, rx) = crossbeam_channel::bounded(1);
405+
406+
// Queue a regular spawn.
407+
crate::spawn(move || tx.send(22).unwrap());
408+
409+
// The single-threaded fallback mode (for wasm etc.) won't
410+
// get a chance to run the spawn if we never yield to it.
411+
crate::registry::in_worker(move |_, _| {
412+
crate::yield_local();
413+
});
414+
415+
// The spawn **must** have started by now, but we still might have to wait
416+
// for it to finish if a different thread stole it first.
417+
assert_eq!(22, rx.recv().unwrap());
418+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ pub use rayon_core::{in_place_scope, scope, Scope};
124124
pub use rayon_core::{in_place_scope_fifo, scope_fifo, ScopeFifo};
125125
pub use rayon_core::{join, join_context};
126126
pub use rayon_core::{spawn, spawn_fifo};
127+
pub use rayon_core::{yield_local, yield_now};
127128

128129
/// We need to transmit raw pointers across threads. It is possible to do this
129130
/// without any unsafe code by converting pointers to usize or to AtomicPtr<T>

0 commit comments

Comments
 (0)