|
| 1 | +use core::cell::UnsafeCell; |
| 2 | +use core::fmt; |
| 3 | +use core::sync::atomic::AtomicUsize; |
| 4 | +use core::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
| 5 | +use core::task::Waker; |
| 6 | + |
| 7 | +/// A synchronization primitive for task wakeup. |
| 8 | +/// |
| 9 | +/// Sometimes the task interested in a given event will change over time. |
| 10 | +/// An `AtomicWaker` can coordinate concurrent notifications with the consumer |
| 11 | +/// potentially "updating" the underlying task to wake up. This is useful in |
| 12 | +/// scenarios where a computation completes in another thread and wants to |
| 13 | +/// notify the consumer, but the consumer is in the process of being migrated to |
| 14 | +/// a new logical task. |
| 15 | +/// |
| 16 | +/// Consumers should call `register` before checking the result of a computation |
| 17 | +/// and producers should call `wake` after producing the computation (this |
| 18 | +/// differs from the usual `thread::park` pattern). It is also permitted for |
| 19 | +/// `wake` to be called **before** `register`. This results in a no-op. |
| 20 | +/// |
| 21 | +/// A single `AtomicWaker` may be reused for any number of calls to `register` or |
| 22 | +/// `wake`. |
| 23 | +/// |
| 24 | +/// `AtomicWaker` does not provide any memory ordering guarantees, as such the |
| 25 | +/// user should use caution and use other synchronization primitives to guard |
| 26 | +/// the result of the underlying computation. |
| 27 | +pub struct AtomicWaker { |
| 28 | + state: AtomicUsize, |
| 29 | + waker: UnsafeCell<Option<Waker>>, |
| 30 | +} |
| 31 | + |
| 32 | +/// Idle state |
| 33 | +const WAITING: usize = 0; |
| 34 | + |
| 35 | +/// A new waker value is being registered with the `AtomicWaker` cell. |
| 36 | +const REGISTERING: usize = 0b01; |
| 37 | + |
| 38 | +/// The waker currently registered with the `AtomicWaker` cell is being woken. |
| 39 | +const WAKING: usize = 0b10; |
| 40 | + |
| 41 | +impl AtomicWaker { |
| 42 | + /// Create an `AtomicWaker`. |
| 43 | + pub fn new() -> AtomicWaker { |
| 44 | + // Make sure that task is Sync |
| 45 | + trait AssertSync: Sync {} |
| 46 | + impl AssertSync for Waker {} |
| 47 | + |
| 48 | + AtomicWaker { |
| 49 | + state: AtomicUsize::new(WAITING), |
| 50 | + waker: UnsafeCell::new(None), |
| 51 | + } |
| 52 | + } |
| 53 | + |
| 54 | + /// Registers the waker to be notified on calls to `wake`. |
| 55 | + /// |
| 56 | + /// The new task will take place of any previous tasks that were registered |
| 57 | + /// by previous calls to `register`. Any calls to `wake` that happen after |
| 58 | + /// a call to `register` (as defined by the memory ordering rules), will |
| 59 | + /// notify the `register` caller's task and deregister the waker from future |
| 60 | + /// notifications. Because of this, callers should ensure `register` gets |
| 61 | + /// invoked with a new `Waker` **each** time they require a wakeup. |
| 62 | + /// |
| 63 | + /// It is safe to call `register` with multiple other threads concurrently |
| 64 | + /// calling `wake`. This will result in the `register` caller's current |
| 65 | + /// task being notified once. |
| 66 | + /// |
| 67 | + /// This function is safe to call concurrently, but this is generally a bad |
| 68 | + /// idea. Concurrent calls to `register` will attempt to register different |
| 69 | + /// tasks to be notified. One of the callers will win and have its task set, |
| 70 | + /// but there is no guarantee as to which caller will succeed. |
| 71 | + /// |
| 72 | + /// # Examples |
| 73 | + /// |
| 74 | + /// Here is how `register` is used when implementing a flag. |
| 75 | + /// |
| 76 | + /// ``` |
| 77 | + /// use futures::future::Future; |
| 78 | + /// use futures::task::{Context, Poll, AtomicWaker}; |
| 79 | + /// use std::sync::atomic::AtomicBool; |
| 80 | + /// use std::sync::atomic::Ordering::SeqCst; |
| 81 | + /// use std::pin::Pin; |
| 82 | + /// |
| 83 | + /// struct Flag { |
| 84 | + /// waker: AtomicWaker, |
| 85 | + /// set: AtomicBool, |
| 86 | + /// } |
| 87 | + /// |
| 88 | + /// impl Future for Flag { |
| 89 | + /// type Output = (); |
| 90 | + /// |
| 91 | + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
| 92 | + /// // Register **before** checking `set` to avoid a race condition |
| 93 | + /// // that would result in lost notifications. |
| 94 | + /// self.waker.register(cx.waker()); |
| 95 | + /// |
| 96 | + /// if self.set.load(SeqCst) { |
| 97 | + /// Poll::Ready(()) |
| 98 | + /// } else { |
| 99 | + /// Poll::Pending |
| 100 | + /// } |
| 101 | + /// } |
| 102 | + /// } |
| 103 | + /// ``` |
| 104 | + pub fn register(&self, waker: &Waker) { |
| 105 | + match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { |
| 106 | + WAITING => { |
| 107 | + unsafe { |
| 108 | + // Locked acquired, update the waker cell |
| 109 | + *self.waker.get() = Some(waker.clone()); |
| 110 | + |
| 111 | + // Release the lock. If the state transitioned to include |
| 112 | + // the `WAKING` bit, this means that a wake has been |
| 113 | + // called concurrently, so we have to remove the waker and |
| 114 | + // wake it.` |
| 115 | + // |
| 116 | + // Start by assuming that the state is `REGISTERING` as this |
| 117 | + // is what we jut set it to. |
| 118 | + let res = self |
| 119 | + .state |
| 120 | + .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); |
| 121 | + |
| 122 | + match res { |
| 123 | + Ok(_) => {} |
| 124 | + Err(actual) => { |
| 125 | + // This branch can only be reached if a |
| 126 | + // concurrent thread called `wake`. In this |
| 127 | + // case, `actual` **must** be `REGISTERING | |
| 128 | + // `WAKING`. |
| 129 | + debug_assert_eq!(actual, REGISTERING | WAKING); |
| 130 | + |
| 131 | + // Take the waker to wake once the atomic operation has |
| 132 | + // completed. |
| 133 | + let waker = (*self.waker.get()).take().unwrap(); |
| 134 | + |
| 135 | + // Just swap, because no one could change state while state == `REGISTERING` | `WAKING`. |
| 136 | + self.state.swap(WAITING, AcqRel); |
| 137 | + |
| 138 | + // The atomic swap was complete, now |
| 139 | + // wake the task and return. |
| 140 | + waker.wake(); |
| 141 | + } |
| 142 | + } |
| 143 | + } |
| 144 | + } |
| 145 | + WAKING => { |
| 146 | + // Currently in the process of waking the task, i.e., |
| 147 | + // `wake` is currently being called on the old task handle. |
| 148 | + // So, we call wake on the new waker |
| 149 | + waker.wake_by_ref(); |
| 150 | + } |
| 151 | + state => { |
| 152 | + // In this case, a concurrent thread is holding the |
| 153 | + // "registering" lock. This probably indicates a bug in the |
| 154 | + // caller's code as racing to call `register` doesn't make much |
| 155 | + // sense. |
| 156 | + // |
| 157 | + // We just want to maintain memory safety. It is ok to drop the |
| 158 | + // call to `register`. |
| 159 | + debug_assert!(state == REGISTERING || state == REGISTERING | WAKING); |
| 160 | + } |
| 161 | + } |
| 162 | + } |
| 163 | + |
| 164 | + /// Calls `wake` on the last `Waker` passed to `register`. |
| 165 | + /// |
| 166 | + /// If `register` has not been called yet, then this does nothing. |
| 167 | + pub fn wake(&self) { |
| 168 | + if let Some(waker) = self.take() { |
| 169 | + waker.wake(); |
| 170 | + } |
| 171 | + } |
| 172 | + |
| 173 | + /// Returns the last `Waker` passed to `register`, so that the user can wake it. |
| 174 | + /// |
| 175 | + /// |
| 176 | + /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user |
| 177 | + /// to take the waker and then wake it separately, rather than performing both steps in one |
| 178 | + /// atomic action. |
| 179 | + /// |
| 180 | + /// If a waker has not been registered, this returns `None`. |
| 181 | + pub fn take(&self) -> Option<Waker> { |
| 182 | + // AcqRel ordering is used in order to acquire the value of the `task` |
| 183 | + // cell as well as to establish a `release` ordering with whatever |
| 184 | + // memory the `AtomicWaker` is associated with. |
| 185 | + match self.state.fetch_or(WAKING, AcqRel) { |
| 186 | + WAITING => { |
| 187 | + // The waking lock has been acquired. |
| 188 | + let waker = unsafe { (*self.waker.get()).take() }; |
| 189 | + |
| 190 | + // Release the lock |
| 191 | + self.state.fetch_and(!WAKING, Release); |
| 192 | + |
| 193 | + waker |
| 194 | + } |
| 195 | + state => { |
| 196 | + // There is a concurrent thread currently updating the |
| 197 | + // associated task. |
| 198 | + // |
| 199 | + // Nothing more to do as the `WAKING` bit has been set. It |
| 200 | + // doesn't matter if there are concurrent registering threads or |
| 201 | + // not. |
| 202 | + // |
| 203 | + debug_assert!( |
| 204 | + state == REGISTERING || state == REGISTERING | WAKING || state == WAKING |
| 205 | + ); |
| 206 | + None |
| 207 | + } |
| 208 | + } |
| 209 | + } |
| 210 | +} |
| 211 | + |
| 212 | +impl Default for AtomicWaker { |
| 213 | + fn default() -> Self { |
| 214 | + AtomicWaker::new() |
| 215 | + } |
| 216 | +} |
| 217 | + |
| 218 | +impl fmt::Debug for AtomicWaker { |
| 219 | + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 220 | + write!(f, "AtomicWaker") |
| 221 | + } |
| 222 | +} |
| 223 | + |
| 224 | +unsafe impl Send for AtomicWaker {} |
| 225 | +unsafe impl Sync for AtomicWaker {} |
0 commit comments