Skip to content

Commit 8964804

Browse files
committed
Switch to new futures
1 parent a4036f5 commit 8964804

File tree

11 files changed

+1927
-5
lines changed

11 files changed

+1927
-5
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ repository = "https://github.com/tomaka/wasm-timer"
99

1010
[dependencies]
1111
futures-preview = "0.3.0-alpha"
12+
parking_lot = "0.9"
1213
pin-utils = "0.1.0-alpha.4"
1314

1415
[target.'cfg(any(target_arch = "wasm32"))'.dependencies]
1516
js-sys = "0.3.14"
1617
send_wrapper = "0.2"
17-
wasm-bindgen = "0.2.37"
18+
wasm-bindgen = { version = "0.2.37" }
19+
wasm-bindgen-futures = { version = "0.3.25", features = ["futures_0_3"] }
1820
web-sys = { version = "0.3.14", features = ["Performance", "Window"] }

src/timer.rs

Lines changed: 625 additions & 0 deletions
Large diffs are not rendered by default.

src/timer/arc_list.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//! An atomically managed intrusive linked list of `Arc` nodes
2+
3+
use std::marker;
4+
use std::ops::Deref;
5+
use std::sync::atomic::Ordering::SeqCst;
6+
use std::sync::atomic::{AtomicBool, AtomicUsize};
7+
use std::sync::Arc;
8+
9+
pub struct ArcList<T> {
10+
list: AtomicUsize,
11+
_marker: marker::PhantomData<T>,
12+
}
13+
14+
impl<T> ArcList<T> {
15+
pub fn new() -> ArcList<T> {
16+
ArcList {
17+
list: AtomicUsize::new(0),
18+
_marker: marker::PhantomData,
19+
}
20+
}
21+
22+
/// Pushes the `data` provided onto this list if it's not already enqueued
23+
/// in this list.
24+
///
25+
/// If `data` is already enqueued in this list then this is a noop,
26+
/// otherwise, the `data` here is pushed on the end of the list.
27+
pub fn push(&self, data: &Arc<Node<T>>) -> Result<(), ()> {
28+
if data.enqueued.swap(true, SeqCst) {
29+
// note that even if our list is sealed off then the other end is
30+
// still guaranteed to see us because we were previously enqueued.
31+
return Ok(());
32+
}
33+
let mut head = self.list.load(SeqCst);
34+
let node = Arc::into_raw(data.clone()) as usize;
35+
loop {
36+
// If we've been sealed off, abort and return an error
37+
if head == 1 {
38+
unsafe {
39+
drop(Arc::from_raw(node as *mut Node<T>));
40+
}
41+
return Err(());
42+
}
43+
44+
// Otherwise attempt to push this node
45+
data.next.store(head, SeqCst);
46+
match self.list.compare_exchange(head, node, SeqCst, SeqCst) {
47+
Ok(_) => break Ok(()),
48+
Err(new_head) => head = new_head,
49+
}
50+
}
51+
}
52+
53+
/// Atomically empties this list, returning a new owned copy which can be
54+
/// used to iterate over the entries.
55+
pub fn take(&self) -> ArcList<T> {
56+
let mut list = self.list.load(SeqCst);
57+
loop {
58+
if list == 1 {
59+
break;
60+
}
61+
match self.list.compare_exchange(list, 0, SeqCst, SeqCst) {
62+
Ok(_) => break,
63+
Err(l) => list = l,
64+
}
65+
}
66+
ArcList {
67+
list: AtomicUsize::new(list),
68+
_marker: marker::PhantomData,
69+
}
70+
}
71+
72+
/// Atomically empties this list and prevents further successful calls to
73+
/// `push`.
74+
pub fn take_and_seal(&self) -> ArcList<T> {
75+
ArcList {
76+
list: AtomicUsize::new(self.list.swap(1, SeqCst)),
77+
_marker: marker::PhantomData,
78+
}
79+
}
80+
81+
/// Removes the head of the list of nodes, returning `None` if this is an
82+
/// empty list.
83+
pub fn pop(&mut self) -> Option<Arc<Node<T>>> {
84+
let head = *self.list.get_mut();
85+
if head == 0 || head == 1 {
86+
return None;
87+
}
88+
let head = unsafe { Arc::from_raw(head as *const Node<T>) };
89+
*self.list.get_mut() = head.next.load(SeqCst);
90+
// At this point, the node is out of the list, so store `false` so we
91+
// can enqueue it again and see further changes.
92+
assert!(head.enqueued.swap(false, SeqCst));
93+
Some(head)
94+
}
95+
}
96+
97+
impl<T> Drop for ArcList<T> {
98+
fn drop(&mut self) {
99+
while let Some(_) = self.pop() {
100+
// ...
101+
}
102+
}
103+
}
104+
105+
pub struct Node<T> {
106+
next: AtomicUsize,
107+
enqueued: AtomicBool,
108+
data: T,
109+
}
110+
111+
impl<T> Node<T> {
112+
pub fn new(data: T) -> Node<T> {
113+
Node {
114+
next: AtomicUsize::new(0),
115+
enqueued: AtomicBool::new(false),
116+
data: data,
117+
}
118+
}
119+
}
120+
121+
impl<T> Deref for Node<T> {
122+
type Target = T;
123+
124+
fn deref(&self) -> &T {
125+
&self.data
126+
}
127+
}
128+
129+
#[cfg(test)]
130+
mod tests {
131+
use super::*;
132+
133+
#[test]
134+
fn smoke() {
135+
let a = ArcList::new();
136+
let n = Arc::new(Node::new(1));
137+
assert!(a.push(&n).is_ok());
138+
139+
let mut l = a.take();
140+
assert_eq!(**l.pop().unwrap(), 1);
141+
assert!(l.pop().is_none());
142+
}
143+
144+
#[test]
145+
fn seal() {
146+
let a = ArcList::new();
147+
let n = Arc::new(Node::new(1));
148+
let mut l = a.take_and_seal();
149+
assert!(l.pop().is_none());
150+
assert!(a.push(&n).is_err());
151+
152+
assert!(a.take().pop().is_none());
153+
assert!(a.take_and_seal().pop().is_none());
154+
}
155+
}

src/timer/delay.rs

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
//! Support for creating futures that represent timeouts.
2+
//!
3+
//! This module contains the `Delay` type which is a future that will resolve
4+
//! at a particular point in the future.
5+
6+
use std::fmt;
7+
use std::future::Future;
8+
use std::io;
9+
use std::pin::Pin;
10+
use std::sync::atomic::AtomicUsize;
11+
use std::sync::atomic::Ordering::SeqCst;
12+
use std::sync::{Arc, Mutex};
13+
use std::task::{Context, Poll};
14+
use std::time::Duration;
15+
16+
use futures::task::AtomicWaker;
17+
18+
use crate::Instant;
19+
use crate::timer::arc_list::Node;
20+
use crate::timer::{ScheduledTimer, TimerHandle};
21+
22+
/// A future representing the notification that an elapsed duration has
23+
/// occurred.
24+
///
25+
/// This is created through the `Delay::new` or `Delay::new_at` methods
26+
/// indicating when the future should fire at. Note that these futures are not
27+
/// intended for high resolution timers, but rather they will likely fire some
28+
/// granularity after the exact instant that they're otherwise indicated to
29+
/// fire at.
30+
pub struct Delay {
31+
state: Option<Arc<Node<ScheduledTimer>>>,
32+
when: Instant,
33+
}
34+
35+
impl Delay {
36+
/// Creates a new future which will fire at `dur` time into the future.
37+
///
38+
/// The returned object will be bound to the default timer for this thread.
39+
/// The default timer will be spun up in a helper thread on first use.
40+
#[inline]
41+
pub fn new(dur: Duration) -> Delay {
42+
Delay::new_at(Instant::now() + dur)
43+
}
44+
45+
/// Creates a new future which will fire at the time specified by `at`.
46+
///
47+
/// The returned object will be bound to the default timer for this thread.
48+
/// The default timer will be spun up in a helper thread on first use.
49+
#[inline]
50+
pub fn new_at(at: Instant) -> Delay {
51+
Delay::new_handle(at, Default::default())
52+
}
53+
54+
/// Creates a new future which will fire at the time specified by `at`.
55+
///
56+
/// The returned instance of `Delay` will be bound to the timer specified by
57+
/// the `handle` argument.
58+
pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
59+
let inner = match handle.inner.upgrade() {
60+
Some(i) => i,
61+
None => {
62+
return Delay {
63+
state: None,
64+
when: at,
65+
}
66+
}
67+
};
68+
let state = Arc::new(Node::new(ScheduledTimer {
69+
at: Mutex::new(Some(at)),
70+
state: AtomicUsize::new(0),
71+
waker: AtomicWaker::new(),
72+
inner: handle.inner,
73+
slot: Mutex::new(None),
74+
}));
75+
76+
// If we fail to actually push our node then we've become an inert
77+
// timer, meaning that we'll want to immediately return an error from
78+
// `poll`.
79+
if inner.list.push(&state).is_err() {
80+
return Delay {
81+
state: None,
82+
when: at,
83+
};
84+
}
85+
86+
inner.waker.wake();
87+
Delay {
88+
state: Some(state),
89+
when: at,
90+
}
91+
}
92+
93+
/// Resets this timeout to an new timeout which will fire at the time
94+
/// specified by `dur`.
95+
///
96+
/// This is equivalent to calling `reset_at` with `Instant::now() + dur`
97+
#[inline]
98+
pub fn reset(&mut self, dur: Duration) {
99+
self.reset_at(Instant::now() + dur)
100+
}
101+
102+
/// Resets this timeout to an new timeout which will fire at the time
103+
/// specified by `at`.
104+
///
105+
/// This method is usable even of this instance of `Delay` has "already
106+
/// fired". That is, if this future has resovled, calling this method means
107+
/// that the future will still re-resolve at the specified instant.
108+
///
109+
/// If `at` is in the past then this future will immediately be resolved
110+
/// (when `poll` is called).
111+
///
112+
/// Note that if any task is currently blocked on this future then that task
113+
/// will be dropped. It is required to call `poll` again after this method
114+
/// has been called to ensure tha ta task is blocked on this future.
115+
#[inline]
116+
pub fn reset_at(&mut self, at: Instant) {
117+
self.when = at;
118+
if self._reset(at).is_err() {
119+
self.state = None
120+
}
121+
}
122+
123+
fn _reset(&mut self, at: Instant) -> Result<(), ()> {
124+
let state = match self.state {
125+
Some(ref state) => state,
126+
None => return Err(()),
127+
};
128+
if let Some(timeouts) = state.inner.upgrade() {
129+
let mut bits = state.state.load(SeqCst);
130+
loop {
131+
// If we've been invalidated, cancel this reset
132+
if bits & 0b10 != 0 {
133+
return Err(());
134+
}
135+
let new = bits.wrapping_add(0b100) & !0b11;
136+
match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
137+
Ok(_) => break,
138+
Err(s) => bits = s,
139+
}
140+
}
141+
*state.at.lock().unwrap() = Some(at);
142+
// If we fail to push our node then we've become an inert timer, so
143+
// we'll want to clear our `state` field accordingly
144+
timeouts.list.push(state)?;
145+
timeouts.waker.wake();
146+
}
147+
148+
Ok(())
149+
}
150+
}
151+
152+
#[inline]
153+
pub fn fires_at(timeout: &Delay) -> Instant {
154+
timeout.when
155+
}
156+
157+
impl Future for Delay {
158+
type Output = io::Result<()>;
159+
160+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161+
let state = match self.state {
162+
Some(ref state) => state,
163+
None => {
164+
let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away"));
165+
return Poll::Ready(err);
166+
}
167+
};
168+
169+
if state.state.load(SeqCst) & 1 != 0 {
170+
return Poll::Ready(Ok(()));
171+
}
172+
173+
state.waker.register(&cx.waker());
174+
175+
// Now that we've registered, do the full check of our own internal
176+
// state. If we've fired the first bit is set, and if we've been
177+
// invalidated the second bit is set.
178+
match state.state.load(SeqCst) {
179+
n if n & 0b01 != 0 => Poll::Ready(Ok(())),
180+
n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new(
181+
io::ErrorKind::Other,
182+
"timer has gone away",
183+
))),
184+
_ => Poll::Pending,
185+
}
186+
}
187+
}
188+
189+
impl Drop for Delay {
190+
fn drop(&mut self) {
191+
let state = match self.state {
192+
Some(ref s) => s,
193+
None => return,
194+
};
195+
if let Some(timeouts) = state.inner.upgrade() {
196+
*state.at.lock().unwrap() = None;
197+
if timeouts.list.push(state).is_ok() {
198+
timeouts.waker.wake();
199+
}
200+
}
201+
}
202+
}
203+
204+
impl fmt::Debug for Delay {
205+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
206+
f.debug_struct("Delay").field("when", &self.when).finish()
207+
}
208+
}

0 commit comments

Comments
 (0)