Skip to content

Commit cde9684

Browse files
committed
Clean up atomics/futures + polyfill
* Remove now-unneeded `State` enum * Remove timeout argument from polyfill since we don't need it * Call `Atomics.waitAsync` if it's available instead of using our polyfill * Remove some extraneous dead code from the polyfill * Add a `val: i32` argument to the polyfill * Simplify the flow of futures with `Package` since `waitAsync` handles all the heavy lifting for us. * Remove `Arc<Package>` and just use `Package` * Remove `RefCell` from inside of `Package` now that it is no longer needed.
1 parent 9f77f8d commit cde9684

File tree

2 files changed

+135
-274
lines changed

2 files changed

+135
-274
lines changed

crates/futures/src/atomics.rs

Lines changed: 100 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::cell::{Cell, RefCell};
1+
use std::cell::RefCell;
22
use std::fmt;
33
use std::rc::Rc;
44
use std::sync::atomic::{AtomicI32, Ordering};
@@ -8,8 +8,9 @@ use futures::executor::{self, Notify, Spawn};
88
use futures::future;
99
use futures::prelude::*;
1010
use futures::sync::oneshot;
11-
use js_sys::{Function, Promise};
11+
use js_sys::Function;
1212
use wasm_bindgen::prelude::*;
13+
use wasm_bindgen::JsCast;
1314

1415
/// A Rust `Future` backed by a JavaScript `Promise`.
1516
///
@@ -23,14 +24,28 @@ pub struct JsFuture {
2324
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
2425
}
2526

27+
// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`.
28+
#[wasm_bindgen]
29+
extern "C" {
30+
type Promise;
31+
#[wasm_bindgen(method)]
32+
fn then(this: &Promise, cb: &JsValue) -> Promise;
33+
34+
type Atomics;
35+
#[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)]
36+
fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise;
37+
#[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)]
38+
fn get_wait_async() -> JsValue;
39+
}
40+
2641
impl fmt::Debug for JsFuture {
2742
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2843
write!(f, "JsFuture {{ ... }}")
2944
}
3045
}
3146

32-
impl From<Promise> for JsFuture {
33-
fn from(js: Promise) -> JsFuture {
47+
impl From<js_sys::Promise> for JsFuture {
48+
fn from(js: js_sys::Promise) -> JsFuture {
3449
// Use the `then` method to schedule two callbacks, one for the
3550
// resolved value and one for the rejected value. We're currently
3651
// assuming that JS engines will unconditionally invoke precisely one of
@@ -112,205 +127,132 @@ impl Future for JsFuture {
112127
/// If the `future` provided panics then the returned `Promise` **will not
113128
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
114129
/// limitation of wasm currently that's hoped to be fixed one day!
115-
pub fn future_to_promise<F>(future: F) -> Promise
116-
where
117-
F: Future<Item = JsValue, Error = JsValue> + 'static,
130+
pub fn future_to_promise<F>(future: F) -> js_sys::Promise
131+
where
132+
F: Future<Item = JsValue, Error = JsValue> + 'static,
118133
{
119134
_future_to_promise(Box::new(future))
120135
}
121136

122137
// Implementation of actually transforming a future into a JavaScript `Promise`.
123138
//
124-
// The only primitive we have to work with here is `Promise::new`, which gives
125-
// us two callbacks that we can use to either reject or resolve the promise.
126-
// It's our job to ensure that one of those callbacks is called at the
127-
// appropriate time.
128-
//
129-
// Now we know that JavaScript (in general) can't block and is largely
130-
// notification/callback driven. That means that our future must either have
131-
// synchronous computational work to do, or it's "scheduled a notification" to
132-
// happen. These notifications are likely callbacks to get executed when things
133-
// finish (like a different promise or something like `setTimeout`). The general
134-
// idea here is thus to do as much synchronous work as we can and then otherwise
135-
// translate notifications of a future's task into "let's poll the future!"
139+
// The main primitives used here are `Promise::new` to actually create a JS
140+
// promise to return as well as `Atomics.waitAsync` to create a promise that we
141+
// can asynchronously wait on. The general idea here is that we'll create a
142+
// promise to return and schedule work to happen in `Atomics.waitAsync`
143+
// callbacks.
136144
//
137-
// This isn't necessarily the greatest future executor in the world, but it
138-
// should get the job done for now hopefully.
139-
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
145+
// After we've created a promise we start polling a future, and whenever it's
146+
// not ready we'll execute `Atomics.waitAsync`. When that resolves we'll keep
147+
// polling the future, and this happens until the future is done. Finally
148+
// when it's all finished we call either resolver or reject depending on the
149+
// result of the future.
150+
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> js_sys::Promise {
140151
let mut future = Some(executor::spawn(future));
141-
return Promise::new(&mut |resolve, reject| {
142-
Package::poll(&Arc::new(Package {
143-
spawn: RefCell::new(future.take().unwrap()),
152+
return js_sys::Promise::new(&mut |resolve, reject| {
153+
Package {
154+
spawn: future.take().unwrap(),
144155
resolve,
145156
reject,
146-
notified: Cell::new(State::Notified),
147-
waker: Arc::new(Waker::default()),
148-
}));
157+
waker: Arc::new(Waker {
158+
value: AtomicI32::new(1), // 1 == "notified, ready to poll"
159+
}),
160+
}
161+
.poll();
149162
});
150163

151164
struct Package {
152165
// Our "spawned future". This'll have everything we need to poll the
153166
// future and continue to move it forward.
154-
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
155-
156-
// The current state of this future, expressed in an enum below. This
157-
// indicates whether we're currently polling the future, received a
158-
// notification and need to keep polling, or if we're waiting for a
159-
// notification to come in (and no one is polling).
160-
notified: Cell<State>,
167+
spawn: Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>,
161168

162169
// Our two callbacks connected to the `Promise` that we returned to
163170
// JavaScript. We'll be invoking one of these at the end.
164171
resolve: Function,
165172
reject: Function,
166173

167-
// Struct to wake a future
174+
// Shared state used to communicate waking up this future, this is the
175+
// `Send + Sync` piece needed by the async task system.
168176
waker: Arc<Waker>,
169177
}
170178

171-
// The possible states our `Package` (future) can be in, tracked internally
172-
// and used to guide what happens when polling a future.
173-
enum State {
174-
// This future is currently and actively being polled. Attempting to
175-
// access the future will result in a runtime panic and is considered a
176-
// bug.
177-
Polling,
178-
179-
// This future has been notified, while it was being polled. This marker
180-
// is used in the `Notify` implementation below, and indicates that a
181-
// notification was received that the future is ready to make progress.
182-
// If seen, however, it probably means that the future is also currently
183-
// being polled.
184-
Notified,
185-
186-
// The future is blocked, waiting for something to happen. Stored here
187-
// is a self-reference to the future itself so we can pull it out in
188-
// `Notify` and continue polling.
189-
//
190-
// Note that the self-reference here is an Arc-cycle that will leak
191-
// memory unless the future completes, but currently that should be ok
192-
// as we'll have to stick around anyway while the future is executing!
193-
//
194-
// This state is removed as soon as a notification comes in, so the leak
195-
// should only be "temporary"
196-
Waiting(Arc<Package>),
197-
}
198-
199-
#[derive(Default)]
200179
struct Waker {
201-
// worker will be waiting on this value
202-
// 0 by default, which means not notified
203180
value: AtomicI32,
204181
};
205182

206183
impl Notify for Waker {
207184
fn notify(&self, _id: usize) {
208-
// since we have only value field here
209-
// let it be 1 if notified, 0 if not
210-
if self.value.swap(1, Ordering::SeqCst) == 0 {
211-
let _ = unsafe {
212-
core::arch::wasm32::atomic_notify(
213-
&self.value as *const AtomicI32 as *mut i32,
214-
std::u32::MAX, // number of threads to notify
215-
)
216-
};
217-
}
218-
}
219-
}
220-
221-
fn poll_again(package: Arc<Package>) {
222-
let me = match package.notified.replace(State::Notified) {
223-
// we need to schedule polling to resume, so keep going
224-
State::Waiting(me) => {
225-
me
226-
}
227-
228-
// we were already notified, and were just notified again;
229-
// having now coalesced the notifications we return as it's
230-
// still someone else's job to process this
231-
State::Notified => {
185+
// Attempt to notify us by storing 1. If we're already 1 then we
186+
// were previously notified and there's nothing to do. Otherwise
187+
// we execute the native `notify` instruction to wake up the
188+
// corresponding `waitAsync` that was waiting for the transition
189+
// from 0 to 1.
190+
let prev = self.value.swap(1, Ordering::SeqCst);
191+
if prev == 1 {
232192
return;
233193
}
234-
235-
// the future was previously being polled, and we've just
236-
// switched it to the "you're notified" state. We don't have
237-
// access to the future as it's being polled, so the future
238-
// polling process later sees this notification and will
239-
// continue polling. For us, though, there's nothing else to do,
240-
// so we bail out.
241-
// later see
242-
State::Polling => {
243-
return;
194+
debug_assert_eq!(prev, 0);
195+
unsafe {
196+
core::arch::wasm32::atomic_notify(
197+
&self.value as *const AtomicI32 as *mut i32,
198+
1, // number of threads to notify
199+
);
244200
}
245-
};
246-
247-
// Use `Promise.then` on a resolved promise to place our execution
248-
// onto the next turn of the microtask queue, enqueueing our poll
249-
// operation. We don't currently poll immediately as it turns out
250-
// `futures` crate adapters aren't compatible with it and it also
251-
// helps avoid blowing the stack by accident.
252-
let promise =
253-
crate::polyfill::wait_async(&package.waker.value).expect("Should create a Promise");
254-
let closure = Closure::once(move |_| {
255-
Package::poll(&me);
256-
});
257-
promise.then(&closure);
258-
closure.forget();
201+
}
259202
}
260203

261204
impl Package {
262-
// Move the future contained in `me` as far forward as we can. This will
263-
// do as much synchronous work as possible to complete the future,
264-
// ensuring that when it blocks we're scheduled to get notified via some
265-
// callback somewhere at some point (vague, right?)
266-
//
267-
// TODO: this probably shouldn't do as much synchronous work as possible
268-
// as it can starve other computations. Rather it should instead
269-
// yield every so often with something like `setTimeout` with the
270-
// timeout set to zero.
271-
fn poll(me: &Arc<Package>) {
272-
loop {
273-
match me.notified.replace(State::Polling) {
274-
// We received a notification while previously polling, or
275-
// this is the initial poll. We've got work to do below!
276-
State::Notified => {}
277-
278-
// We've gone through this loop once and no notification was
279-
// received while we were executing work. That means we got
280-
// `NotReady` below and we're scheduled to receive a
281-
// notification. Block ourselves and wait for later.
282-
//
283-
// When the notification comes in it'll notify our task, see
284-
// our `Waiting` state, and resume the polling process
285-
State::Polling => {
286-
me.notified.set(State::Waiting(me.clone()));
287-
288-
poll_again(me.clone());
289-
290-
break;
291-
}
292-
293-
State::Waiting(_) => panic!("shouldn't see waiting state!"),
294-
}
295-
296-
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) {
205+
fn poll(mut self) {
206+
// Poll in a loop waiting for the future to become ready. Note that
207+
// we probably shouldn't maximize synchronous work here but rather
208+
// we should occasionally yield back to the runtime and schedule
209+
// ourselves to resume this future later on.
210+
//
211+
// Note that 0 here means "need a notification" and 1 means "we got
212+
// a notification". That means we're storing 0 into the `notified`
213+
// slot and we're trying to read 1 to keep on going.
214+
while self.waker.value.swap(0, Ordering::SeqCst) == 1 {
215+
let (val, f) = match self.spawn.poll_future_notify(&self.waker, 0) {
297216
// If the future is ready, immediately call the
298217
// resolve/reject callback and then return as we're done.
299-
Ok(Async::Ready(value)) => (value, &me.resolve),
300-
Err(value) => (value, &me.reject),
218+
Ok(Async::Ready(value)) => (value, &self.resolve),
219+
Err(value) => (value, &self.reject),
301220

302-
// Otherwise keep going in our loop, if we weren't notified
303-
// we'll break out and start waiting.
304-
Ok(Async::NotReady) => continue,
221+
// ... otherwise let's break out and wait
222+
Ok(Async::NotReady) => break,
305223
};
306224

225+
// Call the resolution function, and then when we're done
226+
// destroy ourselves through `drop` since our future is no
227+
// longer needed.
307228
drop(f.call1(&JsValue::undefined(), &val));
308-
break;
229+
return;
309230
}
231+
232+
// Create a `js_sys::Promise` using `Atomics.waitAsync` (or our
233+
// polyfill) and then register its completion callback as simply
234+
// calling this function again.
235+
let promise = wait_async(&self.waker.value, 0).unchecked_into::<Promise>();
236+
let closure = Closure::once_into_js(move || {
237+
self.poll();
238+
});
239+
promise.then(&closure);
310240
}
311241
}
312242
}
313243

244+
fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise {
245+
// If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today)
246+
// then we use our fallback, otherwise we use the native function.
247+
if Atomics::get_wait_async().is_undefined() {
248+
crate::polyfill::wait_async(ptr, val)
249+
} else {
250+
let mem = wasm_bindgen::memory().unchecked_into::<js_sys::WebAssembly::Memory>();
251+
Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val)
252+
}
253+
254+
}
255+
314256
/// Converts a Rust `Future` on a local task queue.
315257
///
316258
/// The `future` provided must adhere to `'static` because it'll be scheduled
@@ -320,8 +262,8 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
320262
///
321263
/// This function has the same panic behavior as `future_to_promise`.
322264
pub fn spawn_local<F>(future: F)
323-
where
324-
F: Future<Item = (), Error = ()> + 'static,
265+
where
266+
F: Future<Item = (), Error = ()> + 'static,
325267
{
326268
future_to_promise(
327269
future

0 commit comments

Comments
 (0)