Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions fork_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::collections::TryReserveError;
use std::fmt;
use std::io::Error as IoError;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::{self, JoinHandle};

/// Pads the wrapped value to 128 bytes to avoid false sharing.
Expand Down Expand Up @@ -70,21 +70,24 @@ impl std::error::Error for Error {
}
}

type Trampoline = unsafe fn(*const (), usize);
type Trampoline = unsafe fn(*const (), usize, &mut bool);

/// Dummy trampoline function as opposed to the real `worker_loop`.
unsafe fn dummy_trampoline(_ctx: *const (), _index: usize) {
unsafe fn dummy_trampoline(_ctx: *const (), _index: usize, _stop: &mut bool) {
unreachable!("dummy_trampoline should not be called")
}

unsafe fn stop_trampoline(_ctx: *const (), _index: usize, stop: &mut bool) {
*stop = true;
}

/// The shared state of the thread pool, used by all threads.
/// It intentionally pads all of independently mutable regions to avoid false sharing.
/// The `task_trampoline` function receives the `task_context` state pointers and
/// some ethereal thread index similar to C-style thread pools.
#[repr(align(128))]
struct Inner {
pub total_threads: usize,
pub stop: Padded<AtomicBool>,

pub fork_context: *const (),
pub fork_trampoline: Trampoline,
Expand All @@ -99,7 +102,6 @@ impl Inner {
pub fn new(threads: usize) -> Self {
Self {
total_threads: threads,
stop: Padded::new(AtomicBool::new(false)),
fork_context: ptr::null(),
fork_trampoline: dummy_trampoline,
threads_to_sync: Padded::new(AtomicUsize::new(0)),
Expand Down Expand Up @@ -251,12 +253,12 @@ impl<A: Allocator + Clone> ThreadPool<A> {
return;
}
assert!(self.inner.threads_to_sync.load(Ordering::SeqCst) == 0);
self.inner.reset_fork();
self.inner.stop.store(true, Ordering::Release);
self.inner.fork_context = ptr::null();
self.inner.fork_trampoline = stop_trampoline;
self.inner.fork_generation.fetch_add(1, Ordering::Release);
for handle in self.workers.drain(..) {
let _ = handle.join();
}
self.inner.stop.store(false, Ordering::Relaxed);
}

/// Executes a function on each thread of the pool.
Expand Down Expand Up @@ -310,33 +312,34 @@ where
}
}

unsafe fn call_lambda<F: Fn(usize)>(ctx: *const (), index: usize) {
unsafe fn call_lambda<F: Fn(usize)>(ctx: *const (), index: usize, _stop: &mut bool) {
let f = &*(ctx as *const F);
f(index);
}

fn worker_loop(inner: &'static Inner, thread_index: usize) {
let mut last_generation = 0usize;
let mut stop = false;
assert!(thread_index != 0);
loop {
let mut new_generation;
let mut wants_stop;
while {
new_generation = inner.fork_generation.load(Ordering::Acquire);
wants_stop = inner.stop.load(Ordering::Acquire);
new_generation == last_generation && !wants_stop
new_generation == last_generation
} {
thread::yield_now();
}
if wants_stop {
return;
}

let trampoline = inner.trampoline();
let context = inner.context();
unsafe {
trampoline(context, thread_index);
trampoline(context, thread_index, &mut stop);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use an out pointer instead of a return value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial idea was that the overhead might be lower as those work items which do not want to stop the pool (so basically all but one) can just ignore the additional argument (instead of producing a useless false as a return value). I should pull the initialization out of the loop though so that the worker loop does not pay the price of it for each work item...

I think there is an ever nicer way which does not need to touch the trampoline signature at all but using the identity of the stop_trampoline, c.f. adamreichold/fork-join-scope@cdb63bf But transplanting it here is not that trivial because of the rules around function items/pointers and their comparison. So personally, I would prefer to switch to using dyn Fn(usize) + Sync as the work items instead of manually splitting context and trampoline first before implementing that.

}

if stop {
return;
}

last_generation = new_generation;

let before = inner.threads_to_sync.fetch_sub(1, Ordering::Release);
Expand Down