Skip to content

Commit 6a6d2a5

Browse files
bors[bot]Lucretiel
andauthored
Merge #563
563: Updates to `Parker` r=taiki-e a=Lucretiel - No longer a possibility for a spurious wake (previously possible if using a timeout), and removed references to spurious wake in the documentation. - Implementation updates to `park`: now deadline based, and unified the code paths between timeout and no-timeout versions - **Breaking Change** `park_timeout` and `park_deadline` now report the reason for their return (timeout or `unpark`) in an enum. If you'd rather not have this breaking change, I'm happy to revert that side of it and just focus on the spurious awakening prevention stuff. Fixes #482 Co-authored-by: Nathan West <Lucretiel@gmail.com> Co-authored-by: Nathan West <Lucretiel@users.noreply.github.com>
2 parents aebab0f + 4c0a106 commit 6a6d2a5

File tree

2 files changed

+59
-47
lines changed

2 files changed

+59
-47
lines changed

crossbeam-utils/src/sync/parker.rs

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,17 @@ use std::marker::PhantomData;
33
use std::sync::atomic::AtomicUsize;
44
use std::sync::atomic::Ordering::SeqCst;
55
use std::sync::{Arc, Condvar, Mutex};
6-
use std::time::Duration;
6+
use std::time::{Duration, Instant};
77

88
/// A thread parking primitive.
99
///
1010
/// Conceptually, each `Parker` has an associated token which is initially not present:
1111
///
1212
/// * The [`park`] method blocks the current thread unless or until the token is available, at
13-
/// which point it automatically consumes the token. It may also return *spuriously*, without
14-
/// consuming the token.
13+
/// which point it automatically consumes the token.
1514
///
16-
/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
17-
/// time.
15+
/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16+
/// a specified maximum time.
1817
///
1918
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
2019
/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
@@ -43,13 +42,13 @@ use std::time::Duration;
4342
/// u.unpark();
4443
/// });
4544
///
46-
/// // Wakes up when `u.unpark()` provides the token, but may also wake up
47-
/// // spuriously before that without consuming the token.
45+
/// // Wakes up when `u.unpark()` provides the token.
4846
/// p.park();
4947
/// ```
5048
///
5149
/// [`park`]: Parker::park
5250
/// [`park_timeout`]: Parker::park_timeout
51+
/// [`park_deadline`]: Parker::park_deadline
5352
/// [`unpark`]: Unparker::unpark
5453
pub struct Parker {
5554
unparker: Unparker,
@@ -90,9 +89,6 @@ impl Parker {
9089

9190
/// Blocks the current thread until the token is made available.
9291
///
93-
/// A call to `park` may wake up spuriously without consuming the token, and callers should be
94-
/// prepared for this possibility.
95-
///
9692
/// # Examples
9793
///
9894
/// ```
@@ -113,9 +109,6 @@ impl Parker {
113109

114110
/// Blocks the current thread until the token is made available, but only for a limited time.
115111
///
116-
/// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
117-
/// should be prepared for this possibility.
118-
///
119112
/// # Examples
120113
///
121114
/// ```
@@ -128,7 +121,25 @@ impl Parker {
128121
/// p.park_timeout(Duration::from_millis(500));
129122
/// ```
130123
pub fn park_timeout(&self, timeout: Duration) {
131-
self.unparker.inner.park(Some(timeout));
124+
self.park_deadline(Instant::now() + timeout)
125+
}
126+
127+
/// Blocks the current thread until the token is made available, or until a certain deadline.
128+
///
129+
/// # Examples
130+
///
131+
/// ```
132+
/// use std::time::{Duration, Instant};
133+
/// use crossbeam_utils::sync::Parker;
134+
///
135+
/// let p = Parker::new();
136+
/// let deadline = Instant::now() + Duration::from_millis(500);
137+
///
138+
/// // Waits for the token to become available, but will not wait longer than 500 ms.
139+
/// p.park_deadline(deadline);
140+
/// ```
141+
pub fn park_deadline(&self, deadline: Instant) {
142+
self.unparker.inner.park(Some(deadline))
132143
}
133144

134145
/// Returns a reference to an associated [`Unparker`].
@@ -227,8 +238,7 @@ impl Unparker {
227238
/// u.unpark();
228239
/// });
229240
///
230-
/// // Wakes up when `u.unpark()` provides the token, but may also wake up
231-
/// // spuriously before that without consuming the token.
241+
/// // Wakes up when `u.unpark()` provides the token.
232242
/// p.park();
233243
/// ```
234244
///
@@ -302,7 +312,7 @@ struct Inner {
302312
}
303313

304314
impl Inner {
305-
fn park(&self, timeout: Option<Duration>) {
315+
fn park(&self, deadline: Option<Instant>) {
306316
// If we were previously notified then we consume this notification and return quickly.
307317
if self
308318
.state
@@ -313,8 +323,8 @@ impl Inner {
313323
}
314324

315325
// If the timeout is zero, then there is no need to actually block.
316-
if let Some(ref dur) = timeout {
317-
if *dur == Duration::from_millis(0) {
326+
if let Some(deadline) = deadline {
327+
if deadline <= Instant::now() {
318328
return;
319329
}
320330
}
@@ -338,36 +348,38 @@ impl Inner {
338348
Err(n) => panic!("inconsistent park_timeout state: {}", n),
339349
}
340350

341-
match timeout {
342-
None => {
343-
loop {
344-
// Block the current thread on the conditional variable.
345-
m = self.cvar.wait(m).unwrap();
346-
347-
if self
348-
.state
349-
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
350-
.is_ok()
351-
{
352-
// got a notification
353-
return;
351+
loop {
352+
// Block the current thread on the conditional variable.
353+
m = match deadline {
354+
None => self.cvar.wait(m).unwrap(),
355+
Some(deadline) => {
356+
let now = Instant::now();
357+
if now < deadline {
358+
// We could check for a timeout here, in the return value of wait_timeout,
359+
// but in the case that a timeout and an unpark arrive simultaneously, we
360+
// prefer to report the former.
361+
self.cvar.wait_timeout(m, deadline - now).unwrap().0
362+
} else {
363+
// We've timed out; swap out the state back to empty on our way out
364+
match self.state.swap(EMPTY, SeqCst) {
365+
NOTIFIED | PARKED => return,
366+
n => panic!("inconsistent park_timeout state: {}", n),
367+
};
354368
}
355-
356-
// spurious wakeup, go back to sleep
357369
}
358-
}
359-
Some(timeout) => {
360-
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
361-
// notification we just want to unconditionally set `state` back to `EMPTY`, either
362-
// consuming a notification or un-flagging ourselves as parked.
363-
let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
370+
};
364371

365-
match self.state.swap(EMPTY, SeqCst) {
366-
NOTIFIED => {} // got a notification
367-
PARKED => {} // no notification
368-
n => panic!("inconsistent park_timeout state: {}", n),
369-
}
372+
if self
373+
.state
374+
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
375+
.is_ok()
376+
{
377+
// got a notification
378+
return;
370379
}
380+
381+
// Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
382+
// in the branch above, when we discover the deadline is in the past
371383
}
372384
}
373385

crossbeam-utils/tests/parker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ fn park_timeout_unpark_before() {
1818
fn park_timeout_unpark_not_called() {
1919
let p = Parker::new();
2020
for _ in 0..10 {
21-
p.park_timeout(Duration::from_millis(10));
21+
p.park_timeout(Duration::from_millis(10))
2222
}
2323
}
2424

@@ -34,7 +34,7 @@ fn park_timeout_unpark_called_other_thread() {
3434
u.unpark();
3535
});
3636

37-
p.park_timeout(Duration::from_millis(u32::MAX as u64));
37+
p.park_timeout(Duration::from_millis(u32::MAX as u64))
3838
})
3939
.unwrap();
4040
}

0 commit comments

Comments
 (0)