Skip to content

Commit 8253155

Browse files
authored
Merge pull request #41 from async-rs/own-atomic-waker
remove dependency on futures-util-preview
2 parents 65e95c5 + ec222c8 commit 8253155

File tree

5 files changed

+230
-5
lines changed

5 files changed

+230
-5
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Timeouts and intervals for futures.
1414

1515
[dependencies]
1616
futures-core-preview = "0.3.0-alpha.19"
17-
futures-util-preview = "0.3.0-alpha.19"
1817
pin-utils = "0.1.0-alpha.4"
1918

2019
[dev-dependencies]

src/atomic_waker.rs

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
use core::cell::UnsafeCell;
2+
use core::fmt;
3+
use core::sync::atomic::AtomicUsize;
4+
use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5+
use core::task::Waker;
6+
7+
/// A synchronization primitive for task wakeup.
8+
///
9+
/// Sometimes the task interested in a given event will change over time.
10+
/// An `AtomicWaker` can coordinate concurrent notifications with the consumer
11+
/// potentially "updating" the underlying task to wake up. This is useful in
12+
/// scenarios where a computation completes in another thread and wants to
13+
/// notify the consumer, but the consumer is in the process of being migrated to
14+
/// a new logical task.
15+
///
16+
/// Consumers should call `register` before checking the result of a computation
17+
/// and producers should call `wake` after producing the computation (this
18+
/// differs from the usual `thread::park` pattern). It is also permitted for
19+
/// `wake` to be called **before** `register`. This results in a no-op.
20+
///
21+
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
22+
/// `wake`.
23+
///
24+
/// `AtomicWaker` does not provide any memory ordering guarantees, as such the
25+
/// user should use caution and use other synchronization primitives to guard
26+
/// the result of the underlying computation.
27+
pub struct AtomicWaker {
28+
state: AtomicUsize,
29+
waker: UnsafeCell<Option<Waker>>,
30+
}
31+
32+
/// Idle state
33+
const WAITING: usize = 0;
34+
35+
/// A new waker value is being registered with the `AtomicWaker` cell.
36+
const REGISTERING: usize = 0b01;
37+
38+
/// The waker currently registered with the `AtomicWaker` cell is being woken.
39+
const WAKING: usize = 0b10;
40+
41+
impl AtomicWaker {
42+
/// Create an `AtomicWaker`.
43+
pub fn new() -> AtomicWaker {
44+
// Make sure that task is Sync
45+
trait AssertSync: Sync {}
46+
impl AssertSync for Waker {}
47+
48+
AtomicWaker {
49+
state: AtomicUsize::new(WAITING),
50+
waker: UnsafeCell::new(None),
51+
}
52+
}
53+
54+
/// Registers the waker to be notified on calls to `wake`.
55+
///
56+
/// The new task will take place of any previous tasks that were registered
57+
/// by previous calls to `register`. Any calls to `wake` that happen after
58+
/// a call to `register` (as defined by the memory ordering rules), will
59+
/// notify the `register` caller's task and deregister the waker from future
60+
/// notifications. Because of this, callers should ensure `register` gets
61+
/// invoked with a new `Waker` **each** time they require a wakeup.
62+
///
63+
/// It is safe to call `register` with multiple other threads concurrently
64+
/// calling `wake`. This will result in the `register` caller's current
65+
/// task being notified once.
66+
///
67+
/// This function is safe to call concurrently, but this is generally a bad
68+
/// idea. Concurrent calls to `register` will attempt to register different
69+
/// tasks to be notified. One of the callers will win and have its task set,
70+
/// but there is no guarantee as to which caller will succeed.
71+
///
72+
/// # Examples
73+
///
74+
/// Here is how `register` is used when implementing a flag.
75+
///
76+
/// ```
77+
/// use futures::future::Future;
78+
/// use futures::task::{Context, Poll, AtomicWaker};
79+
/// use std::sync::atomic::AtomicBool;
80+
/// use std::sync::atomic::Ordering::SeqCst;
81+
/// use std::pin::Pin;
82+
///
83+
/// struct Flag {
84+
/// waker: AtomicWaker,
85+
/// set: AtomicBool,
86+
/// }
87+
///
88+
/// impl Future for Flag {
89+
/// type Output = ();
90+
///
91+
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
92+
/// // Register **before** checking `set` to avoid a race condition
93+
/// // that would result in lost notifications.
94+
/// self.waker.register(cx.waker());
95+
///
96+
/// if self.set.load(SeqCst) {
97+
/// Poll::Ready(())
98+
/// } else {
99+
/// Poll::Pending
100+
/// }
101+
/// }
102+
/// }
103+
/// ```
104+
pub fn register(&self, waker: &Waker) {
105+
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
106+
WAITING => {
107+
unsafe {
108+
// Locked acquired, update the waker cell
109+
*self.waker.get() = Some(waker.clone());
110+
111+
// Release the lock. If the state transitioned to include
112+
// the `WAKING` bit, this means that a wake has been
113+
// called concurrently, so we have to remove the waker and
114+
// wake it.`
115+
//
116+
// Start by assuming that the state is `REGISTERING` as this
117+
// is what we jut set it to.
118+
let res = self
119+
.state
120+
.compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
121+
122+
match res {
123+
Ok(_) => {}
124+
Err(actual) => {
125+
// This branch can only be reached if a
126+
// concurrent thread called `wake`. In this
127+
// case, `actual` **must** be `REGISTERING |
128+
// `WAKING`.
129+
debug_assert_eq!(actual, REGISTERING | WAKING);
130+
131+
// Take the waker to wake once the atomic operation has
132+
// completed.
133+
let waker = (*self.waker.get()).take().unwrap();
134+
135+
// Just swap, because no one could change state while state == `REGISTERING` | `WAKING`.
136+
self.state.swap(WAITING, AcqRel);
137+
138+
// The atomic swap was complete, now
139+
// wake the task and return.
140+
waker.wake();
141+
}
142+
}
143+
}
144+
}
145+
WAKING => {
146+
// Currently in the process of waking the task, i.e.,
147+
// `wake` is currently being called on the old task handle.
148+
// So, we call wake on the new waker
149+
waker.wake_by_ref();
150+
}
151+
state => {
152+
// In this case, a concurrent thread is holding the
153+
// "registering" lock. This probably indicates a bug in the
154+
// caller's code as racing to call `register` doesn't make much
155+
// sense.
156+
//
157+
// We just want to maintain memory safety. It is ok to drop the
158+
// call to `register`.
159+
debug_assert!(state == REGISTERING || state == REGISTERING | WAKING);
160+
}
161+
}
162+
}
163+
164+
/// Calls `wake` on the last `Waker` passed to `register`.
165+
///
166+
/// If `register` has not been called yet, then this does nothing.
167+
pub fn wake(&self) {
168+
if let Some(waker) = self.take() {
169+
waker.wake();
170+
}
171+
}
172+
173+
/// Returns the last `Waker` passed to `register`, so that the user can wake it.
174+
///
175+
///
176+
/// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user
177+
/// to take the waker and then wake it separately, rather than performing both steps in one
178+
/// atomic action.
179+
///
180+
/// If a waker has not been registered, this returns `None`.
181+
pub fn take(&self) -> Option<Waker> {
182+
// AcqRel ordering is used in order to acquire the value of the `task`
183+
// cell as well as to establish a `release` ordering with whatever
184+
// memory the `AtomicWaker` is associated with.
185+
match self.state.fetch_or(WAKING, AcqRel) {
186+
WAITING => {
187+
// The waking lock has been acquired.
188+
let waker = unsafe { (*self.waker.get()).take() };
189+
190+
// Release the lock
191+
self.state.fetch_and(!WAKING, Release);
192+
193+
waker
194+
}
195+
state => {
196+
// There is a concurrent thread currently updating the
197+
// associated task.
198+
//
199+
// Nothing more to do as the `WAKING` bit has been set. It
200+
// doesn't matter if there are concurrent registering threads or
201+
// not.
202+
//
203+
debug_assert!(
204+
state == REGISTERING || state == REGISTERING | WAKING || state == WAKING
205+
);
206+
None
207+
}
208+
}
209+
}
210+
}
211+
212+
impl Default for AtomicWaker {
213+
fn default() -> Self {
214+
AtomicWaker::new()
215+
}
216+
}
217+
218+
impl fmt::Debug for AtomicWaker {
219+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220+
write!(f, "AtomicWaker")
221+
}
222+
}
223+
224+
unsafe impl Send for AtomicWaker {}
225+
unsafe impl Sync for AtomicWaker {}

src/delay.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ use std::sync::{Arc, Mutex};
1212
use std::task::{Context, Poll};
1313
use std::time::{Duration, Instant};
1414

15-
use futures_util::task::AtomicWaker;
16-
1715
use crate::arc_list::Node;
16+
use crate::AtomicWaker;
1817
use crate::{ScheduledTimer, TimerHandle};
1918

2019
/// A future representing the notification that an elapsed duration has

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
#![warn(missing_debug_implementations)]
1919

2020
mod arc_list;
21+
mod atomic_waker;
2122
mod delay;
2223
mod global;
2324
mod heap;
2425
mod heap_timer;
2526
mod timer;
2627

2728
use arc_list::{ArcList, Node};
29+
use atomic_waker::AtomicWaker;
2830
use heap::{Heap, Slot};
2931
use heap_timer::HeapTimer;
3032
use timer::{ScheduledTimer, Timer, TimerHandle};

src/timer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use std::task::{Context, Poll};
88
use std::time::Instant;
99

1010
use futures_core::future::Future;
11-
use futures_util::task::AtomicWaker;
1211

13-
use super::{global, ArcList, Heap, HeapTimer, Node, Slot};
12+
use crate::AtomicWaker;
13+
use crate::{global, ArcList, Heap, HeapTimer, Node, Slot};
1414

1515
/// A "timer heap" used to power separately owned instances of `Delay`.
1616
///

0 commit comments

Comments
 (0)