Skip to content

Commit 1e2e38b

Browse files
authored
sync: use WakeList in Notify and batch_semaphore (#4071)
## Motivation PR #4055 added a new `WakeList` type, to manage a potentially uninitialized array when waking batches of wakers. This has the advantage of not initializing a bunch of empty `Option`s when only a small number of tasks are being woken, potentially improving performance in these cases. Currently, `WakeList` is used only in the IO driver. However, `tokio::sync` contains some code that's almost identical to the code in the IO driver that was replaced with `WakeList`, so we can apply the same optimizations there. ## Solution This branch changes `tokio::sync::Notify` and `tokio::sync::batch_semaphore::Semaphore` to use `WakeList` when waking batches of wakers. This was a pretty straightforward drop-in replacement. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
1 parent 80bda3b commit 1e2e38b

File tree

3 files changed

+38
-25
lines changed

3 files changed

+38
-25
lines changed

tokio/src/sync/batch_semaphore.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell;
1919
use crate::loom::sync::atomic::AtomicUsize;
2020
use crate::loom::sync::{Mutex, MutexGuard};
2121
use crate::util::linked_list::{self, LinkedList};
22+
use crate::util::WakeList;
2223

2324
use std::future::Future;
2425
use std::marker::PhantomPinned;
@@ -239,12 +240,12 @@ impl Semaphore {
239240
/// If `rem` exceeds the number of permits needed by the wait list, the
240241
/// remainder are assigned back to the semaphore.
241242
fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
242-
let mut wakers: [Option<Waker>; 8] = Default::default();
243+
let mut wakers = WakeList::new();
243244
let mut lock = Some(waiters);
244245
let mut is_empty = false;
245246
while rem > 0 {
246247
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
247-
'inner: for slot in &mut wakers[..] {
248+
'inner: while wakers.can_push() {
248249
// Was the waiter assigned enough permits to wake it?
249250
match waiters.queue.last() {
250251
Some(waiter) => {
@@ -260,7 +261,11 @@ impl Semaphore {
260261
}
261262
};
262263
let mut waiter = waiters.queue.pop_back().unwrap();
263-
*slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
264+
if let Some(waker) =
265+
unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
266+
{
267+
wakers.push(waker);
268+
}
264269
}
265270

266271
if rem > 0 && is_empty {
@@ -283,10 +288,7 @@ impl Semaphore {
283288

284289
drop(waiters); // release the lock
285290

286-
wakers
287-
.iter_mut()
288-
.filter_map(Option::take)
289-
.for_each(Waker::wake);
291+
wakers.wake_all();
290292
}
291293

292294
assert_eq!(rem, 0);

tokio/src/sync/notify.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use crate::loom::sync::atomic::AtomicUsize;
99
use crate::loom::sync::Mutex;
1010
use crate::util::linked_list::{self, LinkedList};
11+
use crate::util::WakeList;
1112

1213
use std::cell::UnsafeCell;
1314
use std::future::Future;
@@ -391,10 +392,7 @@ impl Notify {
391392
/// }
392393
/// ```
393394
pub fn notify_waiters(&self) {
394-
const NUM_WAKERS: usize = 32;
395-
396-
let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
397-
let mut curr_waker = 0;
395+
let mut wakers = WakeList::new();
398396

399397
// There are waiters, the lock must be acquired to notify.
400398
let mut waiters = self.waiters.lock();
@@ -414,7 +412,7 @@ impl Notify {
414412
// concurrently change, as holding the lock is required to
415413
// transition **out** of `WAITING`.
416414
'outer: loop {
417-
while curr_waker < NUM_WAKERS {
415+
while wakers.can_push() {
418416
match waiters.pop_back() {
419417
Some(mut waiter) => {
420418
// Safety: `waiters` lock is still held.
@@ -425,8 +423,7 @@ impl Notify {
425423
waiter.notified = Some(NotificationType::AllWaiters);
426424

427425
if let Some(waker) = waiter.waker.take() {
428-
wakers[curr_waker] = Some(waker);
429-
curr_waker += 1;
426+
wakers.push(waker);
430427
}
431428
}
432429
None => {
@@ -437,11 +434,7 @@ impl Notify {
437434

438435
drop(waiters);
439436

440-
for waker in wakers.iter_mut().take(curr_waker) {
441-
waker.take().unwrap().wake();
442-
}
443-
444-
curr_waker = 0;
437+
wakers.wake_all();
445438

446439
// Acquire the lock again.
447440
waiters = self.waiters.lock();
@@ -456,9 +449,7 @@ impl Notify {
456449
// Release the lock before notifying
457450
drop(waiters);
458451

459-
for waker in wakers.iter_mut().take(curr_waker) {
460-
waker.take().unwrap().wake();
461-
}
452+
wakers.wake_all();
462453
}
463454
}
464455

tokio/src/util/mod.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
cfg_io_driver! {
22
pub(crate) mod bit;
33
pub(crate) mod slab;
4-
5-
mod wake_list;
6-
pub(crate) use wake_list::WakeList;
74
}
85

6+
#[cfg(any(
7+
// io driver uses `WakeList` directly
8+
feature = "net",
9+
feature = "process",
10+
// `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`.
11+
feature = "sync",
12+
// `fs` uses `batch_semaphore`, which requires `WakeList`.
13+
feature = "fs",
14+
// rt and signal use `Notify`, which requires `WakeList`.
15+
feature = "rt",
16+
feature = "signal",
17+
))]
18+
mod wake_list;
19+
#[cfg(any(
20+
feature = "net",
21+
feature = "process",
22+
feature = "sync",
23+
feature = "fs",
24+
feature = "rt",
25+
feature = "signal",
26+
))]
27+
pub(crate) use wake_list::WakeList;
28+
929
#[cfg(any(
1030
feature = "fs",
1131
feature = "net",

0 commit comments

Comments
 (0)