Skip to content

Commit be1b824

Browse files
committed
Add callbacks for when threads start and stop doing work
1 parent 27911f7 commit be1b824

File tree

4 files changed

+102
-14
lines changed

4 files changed

+102
-14
lines changed

rayon-core/src/lib.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
144144
/// Closure invoked to spawn threads.
145145
spawn_handler: S,
146146

147+
/// Closure invoked when starting computations in a thread.
148+
acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
149+
150+
/// Closure invoked when blocking in a thread.
151+
release_thread_handler: Option<Box<ReleaseThreadHandler>>,
152+
147153
/// If false, worker threads will execute spawned jobs in a
148154
/// "depth-first" fashion. If true, they will do a "breadth-first"
149155
/// fashion. Depth-first is the default.
@@ -186,12 +192,22 @@ impl Default for ThreadPoolBuilder {
186192
start_handler: None,
187193
exit_handler: None,
188194
deadlock_handler: None,
195+
acquire_thread_handler: None,
196+
release_thread_handler: None,
189197
spawn_handler: DefaultSpawn,
190198
breadth_first: false,
191199
}
192200
}
193201
}
194202

203+
/// The type for a closure that gets invoked before starting computations in a thread.
204+
/// Note that this same closure may be invoked multiple times in parallel.
205+
type AcquireThreadHandler = dyn Fn() + Send + Sync;
206+
207+
/// The type for a closure that gets invoked before blocking in a thread.
208+
/// Note that this same closure may be invoked multiple times in parallel.
209+
type ReleaseThreadHandler = dyn Fn() + Send + Sync;
210+
195211
impl ThreadPoolBuilder {
196212
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
197213
pub fn new() -> Self {
@@ -292,7 +308,12 @@ impl ThreadPoolBuilder {
292308
Ok(())
293309
})
294310
.build()?;
295-
Ok(with_pool(&pool))
311+
let result = unwind::halt_unwinding(|| with_pool(&pool));
312+
pool.wait_until_stopped();
313+
match result {
314+
Ok(result) => Ok(result),
315+
Err(err) => unwind::resume_unwinding(err),
316+
}
296317
});
297318

298319
match result {
@@ -371,6 +392,8 @@ impl<S> ThreadPoolBuilder<S> {
371392
start_handler: self.start_handler,
372393
exit_handler: self.exit_handler,
373394
deadlock_handler: self.deadlock_handler,
395+
acquire_thread_handler: self.acquire_thread_handler,
396+
release_thread_handler: self.release_thread_handler,
374397
breadth_first: self.breadth_first,
375398
}
376399
}
@@ -529,6 +552,34 @@ impl<S> ThreadPoolBuilder<S> {
529552
self.breadth_first
530553
}
531554

555+
/// Takes the current acquire thread callback, leaving `None`.
556+
fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
557+
self.acquire_thread_handler.take()
558+
}
559+
560+
/// Set a callback to be invoked when starting computations in a thread.
561+
pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self
562+
where
563+
H: Fn() + Send + Sync + 'static,
564+
{
565+
self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
566+
self
567+
}
568+
569+
/// Takes the current release thread callback, leaving `None`.
570+
fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
571+
self.release_thread_handler.take()
572+
}
573+
574+
/// Set a callback to be invoked when blocking in thread.
575+
pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self
576+
where
577+
H: Fn() + Send + Sync + 'static,
578+
{
579+
self.release_thread_handler = Some(Box::new(release_thread_handler));
580+
self
581+
}
582+
532583
/// Takes the current deadlock callback, leaving `None`.
533584
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
534585
self.deadlock_handler.take()
@@ -699,6 +750,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
699750
ref deadlock_handler,
700751
ref start_handler,
701752
ref exit_handler,
753+
ref acquire_thread_handler,
754+
ref release_thread_handler,
702755
spawn_handler: _,
703756
ref breadth_first,
704757
} = *self;
@@ -716,6 +769,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
716769
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
717770
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
718771
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
772+
let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
773+
let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
719774

720775
f.debug_struct("ThreadPoolBuilder")
721776
.field("num_threads", num_threads)
@@ -725,6 +780,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
725780
.field("deadlock_handler", &deadlock_handler)
726781
.field("start_handler", &start_handler)
727782
.field("exit_handler", &exit_handler)
783+
.field("acquire_thread_handler", &acquire_thread_handler)
784+
.field("release_thread_handler", &release_thread_handler)
728785
.field("breadth_first", &breadth_first)
729786
.finish()
730787
}

rayon-core/src/registry.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use crate::sleep::Sleep;
55
use crate::unwind;
66
use crate::util::leak;
77
use crate::{
8-
DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
9-
ThreadPoolBuilder,
8+
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
9+
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
1010
};
1111
use crossbeam_deque::{Steal, Stealer, Worker};
1212
use crossbeam_queue::SegQueue;
@@ -137,9 +137,11 @@ pub struct Registry {
137137
sleep: Sleep,
138138
injected_jobs: SegQueue<JobRef>,
139139
panic_handler: Option<Box<PanicHandler>>,
140-
deadlock_handler: Option<Box<DeadlockHandler>>,
140+
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
141141
start_handler: Option<Box<StartHandler>>,
142142
exit_handler: Option<Box<ExitHandler>>,
143+
pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
144+
pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
143145

144146
// When this latch reaches 0, it means that all work on this
145147
// registry must be complete. This is ensured in the following ways:
@@ -244,6 +246,8 @@ impl Registry {
244246
deadlock_handler: builder.take_deadlock_handler(),
245247
start_handler: builder.take_start_handler(),
246248
exit_handler: builder.take_exit_handler(),
249+
acquire_thread_handler: builder.take_acquire_thread_handler(),
250+
release_thread_handler: builder.take_release_thread_handler(),
247251
});
248252

249253
// If we return early or panic, make sure to terminate existing threads.
@@ -346,11 +350,24 @@ impl Registry {
346350

347351
/// Waits for the worker threads to stop. This is used for testing
348352
/// -- so we can check that termination actually works.
349-
#[cfg(test)]
350353
pub(super) fn wait_until_stopped(&self) {
354+
self.release_thread();
351355
for info in &self.thread_infos {
352356
info.stopped.wait();
353357
}
358+
self.acquire_thread();
359+
}
360+
361+
pub(crate) fn acquire_thread(&self) {
362+
if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
363+
acquire_thread_handler();
364+
}
365+
}
366+
367+
pub(crate) fn release_thread(&self) {
368+
if let Some(ref release_thread_handler) = self.release_thread_handler {
369+
release_thread_handler();
370+
}
354371
}
355372

356373
/// ////////////////////////////////////////////////////////////////////////
@@ -453,7 +470,9 @@ impl Registry {
453470
l,
454471
);
455472
self.inject(&[job.as_job_ref()]);
473+
self.release_thread();
456474
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
475+
self.acquire_thread();
457476
job.into_result()
458477
})
459478
}
@@ -687,11 +706,10 @@ impl WorkerThread {
687706
yields = self.registry.sleep.work_found(self.index, yields);
688707
self.execute(job);
689708
} else {
690-
yields = self.registry.sleep.no_work_found(
691-
self.index,
692-
yields,
693-
&self.registry.deadlock_handler,
694-
);
709+
yields = self
710+
.registry
711+
.sleep
712+
.no_work_found(self.index, yields, &self.registry);
695713
}
696714
}
697715

@@ -783,6 +801,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
783801
}
784802
}
785803

804+
registry.acquire_thread();
786805
worker_thread.wait_until(&registry.terminate_latch);
787806

788807
// Should not be any work left in our queue.
@@ -805,6 +824,8 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
805824
}
806825
// We're already exiting the thread, there's nothing else to do.
807826
}
827+
828+
registry.release_thread();
808829
}
809830

810831
/// If already in a worker-thread, just execute `op`. Otherwise,

rayon-core/src/sleep/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! for an overview.
33
44
use crate::log::Event::*;
5+
use crate::registry::Registry;
56
use crate::DeadlockHandler;
67
use std::sync::atomic::{AtomicUsize, Ordering};
78
use std::sync::{Condvar, Mutex};
@@ -117,7 +118,7 @@ impl Sleep {
117118
&self,
118119
worker_index: usize,
119120
yields: usize,
120-
deadlock_handler: &Option<Box<DeadlockHandler>>,
121+
registry: &Registry,
121122
) -> usize {
122123
log!(DidNotFindWork {
123124
worker: worker_index,
@@ -145,7 +146,7 @@ impl Sleep {
145146
}
146147
} else {
147148
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
148-
self.sleep(worker_index, deadlock_handler);
149+
self.sleep(worker_index, registry);
149150
0
150151
}
151152
}
@@ -248,7 +249,7 @@ impl Sleep {
248249
self.worker_is_sleepy(state, worker_index)
249250
}
250251

251-
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
252+
fn sleep(&self, worker_index: usize, registry: &Registry) {
252253
loop {
253254
// Acquire here suffices. If we observe that the current worker is still
254255
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -327,12 +328,15 @@ impl Sleep {
327328

328329
// Decrement the number of active threads and check for a deadlock
329330
data.active_threads -= 1;
330-
data.deadlock_check(deadlock_handler);
331+
data.deadlock_check(&registry.deadlock_handler);
332+
333+
registry.release_thread();
331334

332335
let _ = self.tickle.wait(data).unwrap();
333336
log!(GotAwoken {
334337
worker: worker_index
335338
});
339+
registry.acquire_thread();
336340
return;
337341
}
338342
} else {

rayon-core/src/thread_pool/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,12 @@ impl ThreadPool {
252252
// We assert that `self.registry` has not terminated.
253253
unsafe { spawn::spawn_fifo_in(op, &self.registry) }
254254
}
255+
256+
pub(crate) fn wait_until_stopped(self) {
257+
let registry = self.registry.clone();
258+
drop(self);
259+
registry.wait_until_stopped();
260+
}
255261
}
256262

257263
impl Drop for ThreadPool {

0 commit comments

Comments
 (0)