Skip to content

Commit b56511f

Browse files
committed
Refactor scope latches to reduce matching
The former `enum ScopeLatch` forced a `match` during both `increment` and `set` (decrement), even though both variants only need to update an `AtomicUsize` most of the time. rayon-rs#1057 helped hide that for `increment`, but `set` branching still showed up in perf profiles. Now this is refactored to a unified `CountLatch` that has a direct field for its `counter` used in the frequent case, and then an internal enum for the one-time notification variants. Therefore, most of its updates will have no `match` reached at all. The only other use of the former `CountLatch` was the one-shot termination latch in `WorkerThread`, so that's now renamed to `OnceLatch`. (cherry picked from commit 32d3774)
1 parent f46f1d4 commit b56511f

File tree

4 files changed

+118
-168
lines changed

4 files changed

+118
-168
lines changed

rayon-core/src/broadcast/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::job::{ArcJob, StackJob};
2-
use crate::latch::LatchRef;
2+
use crate::latch::{CountLatch, LatchRef};
33
use crate::registry::{Registry, WorkerThread};
4-
use crate::scope::ScopeLatch;
54
use std::fmt;
65
use std::marker::PhantomData;
76
use std::sync::Arc;
@@ -108,7 +107,7 @@ where
108107
let n_threads = registry.num_threads();
109108
let current_thread = WorkerThread::current().as_ref();
110109
let tlv = crate::tlv::get();
111-
let latch = ScopeLatch::with_count(n_threads, current_thread);
110+
let latch = CountLatch::with_count(n_threads, current_thread);
112111
let jobs: Vec<_> = (0..n_threads)
113112
.map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch)))
114113
.collect();

rayon-core/src/latch.rs

Lines changed: 107 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,13 @@ impl CoreLatch {
142142
}
143143
}
144144

145+
impl AsCoreLatch for CoreLatch {
146+
#[inline]
147+
fn as_core_latch(&self) -> &CoreLatch {
148+
self
149+
}
150+
}
151+
145152
/// Spin latches are the simplest, most efficient kind, but they do
146153
/// not support a `wait()` operation. They just have a boolean flag
147154
/// that becomes true when `set()` is called.
@@ -269,96 +276,114 @@ impl Latch for LockLatch {
269276
}
270277
}
271278

272-
/// Counting latches are used to implement scopes. They track a
273-
/// counter. Unlike other latches, calling `set()` does not
274-
/// necessarily make the latch be considered `set()`; instead, it just
275-
/// decrements the counter. The latch is only "set" (in the sense that
276-
/// `probe()` returns true) once the counter reaches zero.
279+
/// Once latches are used to implement one-time blocking, primarily
280+
/// for the termination flag of the threads in the pool.
277281
///
278-
/// Note: like a `SpinLatch`, count laches are always associated with
282+
/// Note: like a `SpinLatch`, once-latches are always associated with
279283
/// some registry that is probing them, which must be tickled when
280284
/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
281285
/// reference to that registry. This is because in some cases the
282-
/// registry owns the count-latch, and that would create a cycle. So a
283-
/// `CountLatch` must be given a reference to its owning registry when
286+
/// registry owns the once-latch, and that would create a cycle. So a
287+
/// `OnceLatch` must be given a reference to its owning registry when
284288
/// it is set. For this reason, it does not implement the `Latch`
285289
/// trait (but it doesn't have to, as it is not used in those generic
286290
/// contexts).
287291
#[derive(Debug)]
288-
pub(super) struct CountLatch {
289-
// counter is first to nudge layout like CountLockLatch
290-
counter: AtomicUsize,
292+
pub(super) struct OnceLatch {
291293
core_latch: CoreLatch,
292294
}
293295

294-
impl CountLatch {
295-
#[inline]
296-
pub(super) fn new() -> CountLatch {
297-
Self::with_count(1)
298-
}
299-
296+
impl OnceLatch {
300297
#[inline]
301-
pub(super) fn with_count(n: usize) -> CountLatch {
302-
CountLatch {
298+
pub(super) fn new() -> OnceLatch {
299+
Self {
303300
core_latch: CoreLatch::new(),
304-
counter: AtomicUsize::new(n),
305301
}
306302
}
307303

308-
#[inline]
309-
pub(super) fn increment(&self) {
310-
debug_assert!(!self.core_latch.probe());
311-
self.counter.fetch_add(1, Ordering::Relaxed);
312-
}
313-
314-
/// Decrements the latch counter by one. If this is the final
315-
/// count, then the latch is **set**, and calls to `probe()` will
316-
/// return true. Returns whether the latch was set.
317-
#[inline]
318-
pub(super) unsafe fn set(this: *const Self) -> bool {
319-
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
320-
CoreLatch::set(&(*this).core_latch);
321-
true
322-
} else {
323-
false
324-
}
325-
}
326-
327-
/// Decrements the latch counter by one and possibly set it. If
328-
/// the latch is set, then the specific worker thread is tickled,
304+
/// Set the latch, then tickle the specific worker thread,
329305
/// which should be the one that owns this latch.
330306
#[inline]
331307
pub(super) unsafe fn set_and_tickle_one(
332308
this: *const Self,
333309
registry: &Registry,
334310
target_worker_index: usize,
335311
) {
336-
if Self::set(this) {
312+
if CoreLatch::set(&(*this).core_latch) {
337313
registry.notify_worker_latch_is_set(target_worker_index);
338314
}
339315
}
340316
}
341317

342-
impl AsCoreLatch for CountLatch {
318+
impl AsCoreLatch for OnceLatch {
343319
#[inline]
344320
fn as_core_latch(&self) -> &CoreLatch {
345321
&self.core_latch
346322
}
347323
}
348324

325+
/// Counting latches are used to implement scopes. They track a
326+
/// counter. Unlike other latches, calling `set()` does not
327+
/// necessarily make the latch be considered `set()`; instead, it just
328+
/// decrements the counter. The latch is only "set" (in the sense that
329+
/// `probe()` returns true) once the counter reaches zero.
349330
#[derive(Debug)]
350-
pub(super) struct CountLockLatch {
351-
// counter is first to nudge layout like CountLatch
331+
pub(super) struct CountLatch {
352332
counter: AtomicUsize,
353-
lock_latch: LockLatch,
333+
kind: CountLatchKind,
354334
}
355335

356-
impl CountLockLatch {
357-
#[inline]
358-
pub(super) fn with_count(n: usize) -> CountLockLatch {
359-
CountLockLatch {
360-
lock_latch: LockLatch::new(),
361-
counter: AtomicUsize::new(n),
336+
enum CountLatchKind {
337+
/// A latch for scopes created on a rayon thread which will participate in work-
338+
/// stealing while it waits for completion. This thread is not necessarily part
339+
/// of the same registry as the scope itself!
340+
Stealing {
341+
latch: CoreLatch,
342+
/// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
343+
/// with registry B, when a job completes in a thread of registry B, we may
344+
/// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
345+
/// That means we need a reference to registry A (since at that point we will
346+
/// only have a reference to registry B), so we stash it here.
347+
registry: Arc<Registry>,
348+
/// The index of the worker to wake in `registry`
349+
worker_index: usize,
350+
},
351+
352+
/// A latch for scopes created on a non-rayon thread which will block to wait.
353+
Blocking { latch: LockLatch },
354+
}
355+
356+
impl std::fmt::Debug for CountLatchKind {
357+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358+
match self {
359+
CountLatchKind::Stealing { latch, .. } => {
360+
f.debug_tuple("Stealing").field(latch).finish()
361+
}
362+
CountLatchKind::Blocking { latch, .. } => {
363+
f.debug_tuple("Blocking").field(latch).finish()
364+
}
365+
}
366+
}
367+
}
368+
369+
impl CountLatch {
370+
pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
371+
Self::with_count(1, owner)
372+
}
373+
374+
pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
375+
Self {
376+
counter: AtomicUsize::new(count),
377+
kind: match owner {
378+
Some(owner) => CountLatchKind::Stealing {
379+
latch: CoreLatch::new(),
380+
registry: Arc::clone(owner.registry()),
381+
worker_index: owner.index(),
382+
},
383+
None => CountLatchKind::Blocking {
384+
latch: LockLatch::new(),
385+
},
386+
},
362387
}
363388
}
364389

@@ -368,16 +393,42 @@ impl CountLockLatch {
368393
debug_assert!(old_counter != 0);
369394
}
370395

371-
pub(super) fn wait(&self) {
372-
self.lock_latch.wait();
396+
pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
397+
match &self.kind {
398+
CountLatchKind::Stealing {
399+
latch,
400+
registry,
401+
worker_index,
402+
} => unsafe {
403+
let owner = owner.expect("owner thread");
404+
debug_assert_eq!(registry.id(), owner.registry().id());
405+
debug_assert_eq!(*worker_index, owner.index());
406+
owner.wait_until(latch);
407+
},
408+
CountLatchKind::Blocking { latch } => latch.wait(),
409+
}
373410
}
374411
}
375412

376-
impl Latch for CountLockLatch {
413+
impl Latch for CountLatch {
377414
#[inline]
378415
unsafe fn set(this: *const Self) {
379416
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
380-
LockLatch::set(&(*this).lock_latch);
417+
// NOTE: Once we call `set` on the internal `latch`,
418+
// the target may proceed and invalidate `this`!
419+
match (*this).kind {
420+
CountLatchKind::Stealing {
421+
ref latch,
422+
ref registry,
423+
worker_index,
424+
} => {
425+
let registry = Arc::clone(registry);
426+
if CoreLatch::set(latch) {
427+
registry.notify_worker_latch_is_set(worker_index);
428+
}
429+
}
430+
CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
431+
}
381432
}
382433
}
383434
}

rayon-core/src/registry.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::job::{JobFifo, JobRef, StackJob};
2-
use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
2+
use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
33
use crate::log::Event::*;
44
use crate::log::Logger;
55
use crate::sleep::Sleep;
@@ -634,7 +634,7 @@ impl Registry {
634634
pub(super) fn terminate(&self) {
635635
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
636636
for (i, thread_info) in self.thread_infos.iter().enumerate() {
637-
unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
637+
unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
638638
}
639639
}
640640
}
@@ -682,10 +682,7 @@ struct ThreadInfo {
682682
/// This latch is *set* by the `terminate` method on the
683683
/// `Registry`, once the registry's main "terminate" counter
684684
/// reaches zero.
685-
///
686-
/// NB. We use a `CountLatch` here because it has no lifetimes and is
687-
/// meant for async use, but the count never gets higher than one.
688-
terminate: CountLatch,
685+
terminate: OnceLatch,
689686

690687
/// the "stealer" half of the worker's deque
691688
stealer: Stealer<JobRef>,
@@ -696,7 +693,7 @@ impl ThreadInfo {
696693
ThreadInfo {
697694
primed: LockLatch::new(),
698695
stopped: LockLatch::new(),
699-
terminate: CountLatch::new(),
696+
terminate: OnceLatch::new(),
700697
stealer,
701698
}
702699
}

0 commit comments

Comments
 (0)