mirror of
https://github.com/fluencelabs/wasm-bindgen
synced 2025-04-11 06:36:05 +00:00
Merge pull request #1514 from ibaryshnikov/threadsafe-futures
Threadsafe futures
This commit is contained in:
commit
efacd8b74d
@ -11,6 +11,7 @@ version = "0.3.25"
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
cfg-if = "0.1.9"
|
||||||
futures = "0.1.20"
|
futures = "0.1.20"
|
||||||
js-sys = { path = "../js-sys", version = '0.3.25' }
|
js-sys = { path = "../js-sys", version = '0.3.25' }
|
||||||
wasm-bindgen = { path = "../..", version = '0.2.48' }
|
wasm-bindgen = { path = "../..", version = '0.2.48' }
|
||||||
@ -18,6 +19,14 @@ futures-util-preview = { version = "0.3.0-alpha.15", optional = true }
|
|||||||
futures-channel-preview = { version = "0.3.0-alpha.15", optional = true }
|
futures-channel-preview = { version = "0.3.0-alpha.15", optional = true }
|
||||||
lazy_static = { version = "1.3.0", optional = true }
|
lazy_static = { version = "1.3.0", optional = true }
|
||||||
|
|
||||||
|
[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
|
||||||
|
path = "../web-sys"
|
||||||
|
version = "0.3.24"
|
||||||
|
features = [
|
||||||
|
"MessageEvent",
|
||||||
|
"Worker",
|
||||||
|
]
|
||||||
|
|
||||||
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
|
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
|
||||||
wasm-bindgen-test = { path = '../test', version = '0.2.48' }
|
wasm-bindgen-test = { path = '../test', version = '0.2.48' }
|
||||||
|
|
||||||
|
204
crates/futures/src/legacy.rs
Normal file
204
crates/futures/src/legacy.rs
Normal file
@ -0,0 +1,204 @@
|
|||||||
|
use futures::executor::{self, Notify, Spawn};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use js_sys::{Function, Promise};
|
||||||
|
use std::cell::{Cell, RefCell};
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
|
||||||
|
/// Converts a Rust `Future` into a JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// This function will take any future in Rust and schedule it to be executed,
|
||||||
|
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
|
||||||
|
/// to get plumbed into the rest of a system.
|
||||||
|
///
|
||||||
|
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
||||||
|
/// to run in the background and cannot contain any stack references. The
|
||||||
|
/// returned `Promise` will be resolved or rejected when the future completes,
|
||||||
|
/// depending on whether it finishes with `Ok` or `Err`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Note that in wasm panics are currently translated to aborts, but "abort" in
|
||||||
|
/// this case means that a JavaScript exception is thrown. The wasm module is
|
||||||
|
/// still usable (likely erroneously) after Rust panics.
|
||||||
|
///
|
||||||
|
/// If the `future` provided panics then the returned `Promise` **will not
|
||||||
|
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
|
||||||
|
/// limitation of wasm currently that's hoped to be fixed one day!
|
||||||
|
pub fn future_to_promise<F>(future: F) -> Promise
|
||||||
|
where
|
||||||
|
F: Future<Item = JsValue, Error = JsValue> + 'static,
|
||||||
|
{
|
||||||
|
_future_to_promise(Box::new(future))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation of actually transforming a future into a JavaScript `Promise`.
|
||||||
|
//
|
||||||
|
// The only primitive we have to work with here is `Promise::new`, which gives
|
||||||
|
// us two callbacks that we can use to either reject or resolve the promise.
|
||||||
|
// It's our job to ensure that one of those callbacks is called at the
|
||||||
|
// appropriate time.
|
||||||
|
//
|
||||||
|
// Now we know that JavaScript (in general) can't block and is largely
|
||||||
|
// notification/callback driven. That means that our future must either have
|
||||||
|
// synchronous computational work to do, or it's "scheduled a notification" to
|
||||||
|
// happen. These notifications are likely callbacks to get executed when things
|
||||||
|
// finish (like a different promise or something like `setTimeout`). The general
|
||||||
|
// idea here is thus to do as much synchronous work as we can and then otherwise
|
||||||
|
// translate notifications of a future's task into "let's poll the future!"
|
||||||
|
//
|
||||||
|
// This isn't necessarily the greatest future executor in the world, but it
|
||||||
|
// should get the job done for now hopefully.
|
||||||
|
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
|
||||||
|
let mut future = Some(executor::spawn(future));
|
||||||
|
return Promise::new(&mut |resolve, reject| {
|
||||||
|
Package::poll(&Arc::new(Package {
|
||||||
|
spawn: RefCell::new(future.take().unwrap()),
|
||||||
|
resolve,
|
||||||
|
reject,
|
||||||
|
notified: Cell::new(State::Notified),
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
struct Package {
|
||||||
|
// Our "spawned future". This'll have everything we need to poll the
|
||||||
|
// future and continue to move it forward.
|
||||||
|
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
|
||||||
|
|
||||||
|
// The current state of this future, expressed in an enum below. This
|
||||||
|
// indicates whether we're currently polling the future, received a
|
||||||
|
// notification and need to keep polling, or if we're waiting for a
|
||||||
|
// notification to come in (and no one is polling).
|
||||||
|
notified: Cell<State>,
|
||||||
|
|
||||||
|
// Our two callbacks connected to the `Promise` that we returned to
|
||||||
|
// JavaScript. We'll be invoking one of these at the end.
|
||||||
|
resolve: Function,
|
||||||
|
reject: Function,
|
||||||
|
}
|
||||||
|
|
||||||
|
// The possible states our `Package` (future) can be in, tracked internally
|
||||||
|
// and used to guide what happens when polling a future.
|
||||||
|
enum State {
|
||||||
|
// This future is currently and actively being polled. Attempting to
|
||||||
|
// access the future will result in a runtime panic and is considered a
|
||||||
|
// bug.
|
||||||
|
Polling,
|
||||||
|
|
||||||
|
// This future has been notified, while it was being polled. This marker
|
||||||
|
// is used in the `Notify` implementation below, and indicates that a
|
||||||
|
// notification was received that the future is ready to make progress.
|
||||||
|
// If seen, however, it probably means that the future is also currently
|
||||||
|
// being polled.
|
||||||
|
Notified,
|
||||||
|
|
||||||
|
// The future is blocked, waiting for something to happen. Stored here
|
||||||
|
// is a self-reference to the future itself so we can pull it out in
|
||||||
|
// `Notify` and continue polling.
|
||||||
|
//
|
||||||
|
// Note that the self-reference here is an Arc-cycle that will leak
|
||||||
|
// memory unless the future completes, but currently that should be ok
|
||||||
|
// as we'll have to stick around anyway while the future is executing!
|
||||||
|
//
|
||||||
|
// This state is removed as soon as a notification comes in, so the leak
|
||||||
|
// should only be "temporary"
|
||||||
|
Waiting(Arc<Package>),
|
||||||
|
}
|
||||||
|
|
||||||
|
// No shared memory right now, wasm is single threaded, no need to worry
|
||||||
|
// about this!
|
||||||
|
unsafe impl Send for Package {}
|
||||||
|
unsafe impl Sync for Package {}
|
||||||
|
|
||||||
|
impl Package {
|
||||||
|
// Move the future contained in `me` as far forward as we can. This will
|
||||||
|
// do as much synchronous work as possible to complete the future,
|
||||||
|
// ensuring that when it blocks we're scheduled to get notified via some
|
||||||
|
// callback somewhere at some point (vague, right?)
|
||||||
|
//
|
||||||
|
// TODO: this probably shouldn't do as much synchronous work as possible
|
||||||
|
// as it can starve other computations. Rather it should instead
|
||||||
|
// yield every so often with something like `setTimeout` with the
|
||||||
|
// timeout set to zero.
|
||||||
|
fn poll(me: &Arc<Package>) {
|
||||||
|
loop {
|
||||||
|
match me.notified.replace(State::Polling) {
|
||||||
|
// We received a notification while previously polling, or
|
||||||
|
// this is the initial poll. We've got work to do below!
|
||||||
|
State::Notified => {}
|
||||||
|
|
||||||
|
// We've gone through this loop once and no notification was
|
||||||
|
// received while we were executing work. That means we got
|
||||||
|
// `NotReady` below and we're scheduled to receive a
|
||||||
|
// notification. Block ourselves and wait for later.
|
||||||
|
//
|
||||||
|
// When the notification comes in it'll notify our task, see
|
||||||
|
// our `Waiting` state, and resume the polling process
|
||||||
|
State::Polling => {
|
||||||
|
me.notified.set(State::Waiting(me.clone()));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
State::Waiting(_) => panic!("shouldn't see waiting state!"),
|
||||||
|
}
|
||||||
|
|
||||||
|
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
|
||||||
|
// If the future is ready, immediately call the
|
||||||
|
// resolve/reject callback and then return as we're done.
|
||||||
|
Ok(Async::Ready(value)) => (value, &me.resolve),
|
||||||
|
Err(value) => (value, &me.reject),
|
||||||
|
|
||||||
|
// Otherwise keep going in our loop, if we weren't notified
|
||||||
|
// we'll break out and start waiting.
|
||||||
|
Ok(Async::NotReady) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(f.call1(&JsValue::undefined(), &val));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Notify for Package {
|
||||||
|
fn notify(&self, _id: usize) {
|
||||||
|
let me = match self.notified.replace(State::Notified) {
|
||||||
|
// we need to schedule polling to resume, so keep going
|
||||||
|
State::Waiting(me) => me,
|
||||||
|
|
||||||
|
// we were already notified, and were just notified again;
|
||||||
|
// having now coalesced the notifications we return as it's
|
||||||
|
// still someone else's job to process this
|
||||||
|
State::Notified => return,
|
||||||
|
|
||||||
|
// the future was previously being polled, and we've just
|
||||||
|
// switched it to the "you're notified" state. We don't have
|
||||||
|
// access to the future as it's being polled, so the future
|
||||||
|
// polling process later sees this notification and will
|
||||||
|
// continue polling. For us, though, there's nothing else to do,
|
||||||
|
// so we bail out.
|
||||||
|
// later see
|
||||||
|
State::Polling => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use `Promise.then` on a resolved promise to place our execution
|
||||||
|
// onto the next turn of the microtask queue, enqueueing our poll
|
||||||
|
// operation. We don't currently poll immediately as it turns out
|
||||||
|
// `futures` crate adapters aren't compatible with it and it also
|
||||||
|
// helps avoid blowing the stack by accident.
|
||||||
|
//
|
||||||
|
// Note that the `Rc`/`RefCell` trick here is basically to just
|
||||||
|
// ensure that our `Closure` gets cleaned up appropriately.
|
||||||
|
let promise = Promise::resolve(&JsValue::undefined());
|
||||||
|
let slot = Rc::new(RefCell::new(None));
|
||||||
|
let slot2 = slot.clone();
|
||||||
|
let closure = Closure::wrap(Box::new(move |_| {
|
||||||
|
let myself = slot2.borrow_mut().take();
|
||||||
|
debug_assert!(myself.is_some());
|
||||||
|
Package::poll(&me);
|
||||||
|
}) as Box<dyn FnMut(JsValue)>);
|
||||||
|
promise.then(&closure);
|
||||||
|
*slot.borrow_mut() = Some(closure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
166
crates/futures/src/legacy_atomics.rs
Normal file
166
crates/futures/src/legacy_atomics.rs
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
use futures::executor::{self, Notify, Spawn};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use js_sys::Function;
|
||||||
|
use std::sync::atomic::{AtomicI32, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
use wasm_bindgen::JsCast;
|
||||||
|
|
||||||
|
// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`.
|
||||||
|
#[wasm_bindgen]
|
||||||
|
extern "C" {
|
||||||
|
type Promise;
|
||||||
|
#[wasm_bindgen(method)]
|
||||||
|
fn then(this: &Promise, cb: &JsValue) -> Promise;
|
||||||
|
|
||||||
|
type Atomics;
|
||||||
|
#[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)]
|
||||||
|
fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise;
|
||||||
|
#[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)]
|
||||||
|
fn get_wait_async() -> JsValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts a Rust `Future` into a JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// This function will take any future in Rust and schedule it to be executed,
|
||||||
|
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
|
||||||
|
/// to get plumbed into the rest of a system.
|
||||||
|
///
|
||||||
|
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
||||||
|
/// to run in the background and cannot contain any stack references. The
|
||||||
|
/// returned `Promise` will be resolved or rejected when the future completes,
|
||||||
|
/// depending on whether it finishes with `Ok` or `Err`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Note that in wasm panics are currently translated to aborts, but "abort" in
|
||||||
|
/// this case means that a JavaScript exception is thrown. The wasm module is
|
||||||
|
/// still usable (likely erroneously) after Rust panics.
|
||||||
|
///
|
||||||
|
/// If the `future` provided panics then the returned `Promise` **will not
|
||||||
|
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
|
||||||
|
/// limitation of wasm currently that's hoped to be fixed one day!
|
||||||
|
pub fn future_to_promise<F>(future: F) -> js_sys::Promise
|
||||||
|
where
|
||||||
|
F: Future<Item = JsValue, Error = JsValue> + 'static,
|
||||||
|
{
|
||||||
|
_future_to_promise(Box::new(future))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation of actually transforming a future into a JavaScript `Promise`.
|
||||||
|
//
|
||||||
|
// The main primitives used here are `Promise::new` to actually create a JS
|
||||||
|
// promise to return as well as `Atomics.waitAsync` to create a promise that we
|
||||||
|
// can asynchronously wait on. The general idea here is that we'll create a
|
||||||
|
// promise to return and schedule work to happen in `Atomics.waitAsync`
|
||||||
|
// callbacks.
|
||||||
|
//
|
||||||
|
// After we've created a promise we start polling a future, and whenever it's
|
||||||
|
// not ready we'll execute `Atomics.waitAsync`. When that resolves we'll keep
|
||||||
|
// polling the future, and this happens until the future is done. Finally
|
||||||
|
// when it's all finished we call either resolver or reject depending on the
|
||||||
|
// result of the future.
|
||||||
|
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> js_sys::Promise {
|
||||||
|
let mut future = Some(executor::spawn(future));
|
||||||
|
return js_sys::Promise::new(&mut |resolve, reject| {
|
||||||
|
Package {
|
||||||
|
spawn: future.take().unwrap(),
|
||||||
|
resolve,
|
||||||
|
reject,
|
||||||
|
waker: Arc::new(Waker {
|
||||||
|
value: AtomicI32::new(1), // 1 == "notified, ready to poll"
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
.poll();
|
||||||
|
});
|
||||||
|
|
||||||
|
struct Package {
|
||||||
|
// Our "spawned future". This'll have everything we need to poll the
|
||||||
|
// future and continue to move it forward.
|
||||||
|
spawn: Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>,
|
||||||
|
|
||||||
|
// Our two callbacks connected to the `Promise` that we returned to
|
||||||
|
// JavaScript. We'll be invoking one of these at the end.
|
||||||
|
resolve: Function,
|
||||||
|
reject: Function,
|
||||||
|
|
||||||
|
// Shared state used to communicate waking up this future, this is the
|
||||||
|
// `Send + Sync` piece needed by the async task system.
|
||||||
|
waker: Arc<Waker>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Waker {
|
||||||
|
value: AtomicI32,
|
||||||
|
};
|
||||||
|
|
||||||
|
impl Notify for Waker {
|
||||||
|
fn notify(&self, _id: usize) {
|
||||||
|
// Attempt to notify us by storing 1. If we're already 1 then we
|
||||||
|
// were previously notified and there's nothing to do. Otherwise
|
||||||
|
// we execute the native `notify` instruction to wake up the
|
||||||
|
// corresponding `waitAsync` that was waiting for the transition
|
||||||
|
// from 0 to 1.
|
||||||
|
let prev = self.value.swap(1, Ordering::SeqCst);
|
||||||
|
if prev == 1 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
debug_assert_eq!(prev, 0);
|
||||||
|
unsafe {
|
||||||
|
core::arch::wasm32::atomic_notify(
|
||||||
|
&self.value as *const AtomicI32 as *mut i32,
|
||||||
|
1, // number of threads to notify
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Package {
|
||||||
|
fn poll(mut self) {
|
||||||
|
// Poll in a loop waiting for the future to become ready. Note that
|
||||||
|
// we probably shouldn't maximize synchronous work here but rather
|
||||||
|
// we should occasionally yield back to the runtime and schedule
|
||||||
|
// ourselves to resume this future later on.
|
||||||
|
//
|
||||||
|
// Note that 0 here means "need a notification" and 1 means "we got
|
||||||
|
// a notification". That means we're storing 0 into the `notified`
|
||||||
|
// slot and we're trying to read 1 to keep on going.
|
||||||
|
while self.waker.value.swap(0, Ordering::SeqCst) == 1 {
|
||||||
|
let (val, f) = match self.spawn.poll_future_notify(&self.waker, 0) {
|
||||||
|
// If the future is ready, immediately call the
|
||||||
|
// resolve/reject callback and then return as we're done.
|
||||||
|
Ok(Async::Ready(value)) => (value, &self.resolve),
|
||||||
|
Err(value) => (value, &self.reject),
|
||||||
|
|
||||||
|
// ... otherwise let's break out and wait
|
||||||
|
Ok(Async::NotReady) => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Call the resolution function, and then when we're done
|
||||||
|
// destroy ourselves through `drop` since our future is no
|
||||||
|
// longer needed.
|
||||||
|
drop(f.call1(&JsValue::undefined(), &val));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a `js_sys::Promise` using `Atomics.waitAsync` (or our
|
||||||
|
// polyfill) and then register its completion callback as simply
|
||||||
|
// calling this function again.
|
||||||
|
let promise = wait_async(&self.waker.value, 0).unchecked_into::<Promise>();
|
||||||
|
let closure = Closure::once_into_js(move || {
|
||||||
|
self.poll();
|
||||||
|
});
|
||||||
|
promise.then(&closure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise {
|
||||||
|
// If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today)
|
||||||
|
// then we use our fallback, otherwise we use the native function.
|
||||||
|
if Atomics::get_wait_async().is_undefined() {
|
||||||
|
crate::wait_async_polyfill::wait_async(ptr, val)
|
||||||
|
} else {
|
||||||
|
let mem = wasm_bindgen::memory().unchecked_into::<js_sys::WebAssembly::Memory>();
|
||||||
|
Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val)
|
||||||
|
}
|
||||||
|
}
|
108
crates/futures/src/legacy_shared.rs
Normal file
108
crates/futures/src/legacy_shared.rs
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
use std::cell::RefCell;
|
||||||
|
use futures::future;
|
||||||
|
use std::fmt;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
use js_sys::Promise;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
|
||||||
|
/// A Rust `Future` backed by a JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// This type is constructed with a JavaScript `Promise` object and translates
|
||||||
|
/// it to a Rust `Future`. This type implements the `Future` trait from the
|
||||||
|
/// `futures` crate and will either succeed or fail depending on what happens
|
||||||
|
/// with the JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// Currently this type is constructed with `JsFuture::from`.
|
||||||
|
pub struct JsFuture {
|
||||||
|
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for JsFuture {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "JsFuture {{ ... }}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Promise> for JsFuture {
|
||||||
|
fn from(js: Promise) -> JsFuture {
|
||||||
|
// Use the `then` method to schedule two callbacks, one for the
|
||||||
|
// resolved value and one for the rejected value. We're currently
|
||||||
|
// assuming that JS engines will unconditionally invoke precisely one of
|
||||||
|
// these callbacks, no matter what.
|
||||||
|
//
|
||||||
|
// Ideally we'd have a way to cancel the callbacks getting invoked and
|
||||||
|
// free up state ourselves when this `JsFuture` is dropped. We don't
|
||||||
|
// have that, though, and one of the callbacks is likely always going to
|
||||||
|
// be invoked.
|
||||||
|
//
|
||||||
|
// As a result we need to make sure that no matter when the callbacks
|
||||||
|
// are invoked they are valid to be called at any time, which means they
|
||||||
|
// have to be self-contained. Through the `Closure::once` and some
|
||||||
|
// `Rc`-trickery we can arrange for both instances of `Closure`, and the
|
||||||
|
// `Rc`, to all be destroyed once the first one is called.
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let state = Rc::new(RefCell::new(None));
|
||||||
|
let state2 = state.clone();
|
||||||
|
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
|
||||||
|
let state2 = state.clone();
|
||||||
|
let reject = Closure::once(move |val| finish(&state2, Err(val)));
|
||||||
|
|
||||||
|
js.then2(&resolve, &reject);
|
||||||
|
*state.borrow_mut() = Some((tx, resolve, reject));
|
||||||
|
|
||||||
|
return JsFuture { rx };
|
||||||
|
|
||||||
|
fn finish(
|
||||||
|
state: &RefCell<
|
||||||
|
Option<(
|
||||||
|
oneshot::Sender<Result<JsValue, JsValue>>,
|
||||||
|
Closure<dyn FnMut(JsValue)>,
|
||||||
|
Closure<dyn FnMut(JsValue)>,
|
||||||
|
)>,
|
||||||
|
>,
|
||||||
|
val: Result<JsValue, JsValue>,
|
||||||
|
) {
|
||||||
|
match state.borrow_mut().take() {
|
||||||
|
// We don't have any guarantee that anyone's still listening at this
|
||||||
|
// point (the Rust `JsFuture` could have been dropped) so simply
|
||||||
|
// ignore any errors here.
|
||||||
|
Some((tx, _, _)) => drop(tx.send(val)),
|
||||||
|
None => wasm_bindgen::throw_str("cannot finish twice"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for JsFuture {
|
||||||
|
type Item = JsValue;
|
||||||
|
type Error = JsValue;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<JsValue, JsValue> {
|
||||||
|
match self.rx.poll() {
|
||||||
|
Ok(Async::Ready(val)) => val.map(Async::Ready),
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
Err(_) => wasm_bindgen::throw_str("cannot cancel"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts a Rust `Future` on a local task queue.
|
||||||
|
///
|
||||||
|
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
||||||
|
/// to run in the background and cannot contain any stack references.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This function has the same panic behavior as `future_to_promise`.
|
||||||
|
pub fn spawn_local<F>(future: F)
|
||||||
|
where
|
||||||
|
F: Future<Item = (), Error = ()> + 'static,
|
||||||
|
{
|
||||||
|
crate::future_to_promise(
|
||||||
|
future
|
||||||
|
.map(|()| JsValue::undefined())
|
||||||
|
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
|
||||||
|
);
|
||||||
|
}
|
@ -101,317 +101,34 @@
|
|||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
|
#![cfg_attr(target_feature = "atomics", feature(stdsimd))]
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
|
use cfg_if::cfg_if;
|
||||||
|
|
||||||
|
mod legacy_shared;
|
||||||
|
pub use legacy_shared::*;
|
||||||
|
|
||||||
|
cfg_if! {
|
||||||
|
if #[cfg(target_feature = "atomics")] {
|
||||||
|
/// Contains a thread-safe version of this crate, with Futures 0.1
|
||||||
|
mod legacy_atomics;
|
||||||
|
pub use legacy_atomics::*;
|
||||||
|
|
||||||
|
/// Polyfill for `Atomics.waitAsync` function
|
||||||
|
mod wait_async_polyfill;
|
||||||
|
} else {
|
||||||
|
mod legacy;
|
||||||
|
pub use legacy::*;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "futures_0_3")]
|
#[cfg(feature = "futures_0_3")]
|
||||||
/// Contains a Futures 0.3 implementation of this crate.
|
cfg_if! {
|
||||||
pub mod futures_0_3;
|
if #[cfg(target_feature = "atomics")] {
|
||||||
|
compile_error!("futures 0.3 support is not implemented with atomics yet");
|
||||||
use std::cell::{Cell, RefCell};
|
} else {
|
||||||
use std::fmt;
|
/// Contains a Futures 0.3 implementation of this crate.
|
||||||
use std::rc::Rc;
|
pub mod futures_0_3;
|
||||||
use std::sync::Arc;
|
}
|
||||||
|
|
||||||
use futures::executor::{self, Notify, Spawn};
|
|
||||||
use futures::future;
|
|
||||||
use futures::prelude::*;
|
|
||||||
use futures::sync::oneshot;
|
|
||||||
use js_sys::{Function, Promise};
|
|
||||||
use wasm_bindgen::prelude::*;
|
|
||||||
|
|
||||||
/// A Rust `Future` backed by a JavaScript `Promise`.
|
|
||||||
///
|
|
||||||
/// This type is constructed with a JavaScript `Promise` object and translates
|
|
||||||
/// it to a Rust `Future`. This type implements the `Future` trait from the
|
|
||||||
/// `futures` crate and will either succeed or fail depending on what happens
|
|
||||||
/// with the JavaScript `Promise`.
|
|
||||||
///
|
|
||||||
/// Currently this type is constructed with `JsFuture::from`.
|
|
||||||
pub struct JsFuture {
|
|
||||||
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for JsFuture {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "JsFuture {{ ... }}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Promise> for JsFuture {
|
|
||||||
fn from(js: Promise) -> JsFuture {
|
|
||||||
// Use the `then` method to schedule two callbacks, one for the
|
|
||||||
// resolved value and one for the rejected value. We're currently
|
|
||||||
// assuming that JS engines will unconditionally invoke precisely one of
|
|
||||||
// these callbacks, no matter what.
|
|
||||||
//
|
|
||||||
// Ideally we'd have a way to cancel the callbacks getting invoked and
|
|
||||||
// free up state ourselves when this `JsFuture` is dropped. We don't
|
|
||||||
// have that, though, and one of the callbacks is likely always going to
|
|
||||||
// be invoked.
|
|
||||||
//
|
|
||||||
// As a result we need to make sure that no matter when the callbacks
|
|
||||||
// are invoked they are valid to be called at any time, which means they
|
|
||||||
// have to be self-contained. Through the `Closure::once` and some
|
|
||||||
// `Rc`-trickery we can arrange for both instances of `Closure`, and the
|
|
||||||
// `Rc`, to all be destroyed once the first one is called.
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
let state = Rc::new(RefCell::new(None));
|
|
||||||
let state2 = state.clone();
|
|
||||||
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
|
|
||||||
let state2 = state.clone();
|
|
||||||
let reject = Closure::once(move |val| finish(&state2, Err(val)));
|
|
||||||
|
|
||||||
js.then2(&resolve, &reject);
|
|
||||||
*state.borrow_mut() = Some((tx, resolve, reject));
|
|
||||||
|
|
||||||
return JsFuture { rx };
|
|
||||||
|
|
||||||
fn finish(
|
|
||||||
state: &RefCell<
|
|
||||||
Option<(
|
|
||||||
oneshot::Sender<Result<JsValue, JsValue>>,
|
|
||||||
Closure<dyn FnMut(JsValue)>,
|
|
||||||
Closure<dyn FnMut(JsValue)>,
|
|
||||||
)>,
|
|
||||||
>,
|
|
||||||
val: Result<JsValue, JsValue>,
|
|
||||||
) {
|
|
||||||
match state.borrow_mut().take() {
|
|
||||||
// We don't have any guarantee that anyone's still listening at this
|
|
||||||
// point (the Rust `JsFuture` could have been dropped) so simply
|
|
||||||
// ignore any errors here.
|
|
||||||
Some((tx, _, _)) => drop(tx.send(val)),
|
|
||||||
None => wasm_bindgen::throw_str("cannot finish twice"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for JsFuture {
|
|
||||||
type Item = JsValue;
|
|
||||||
type Error = JsValue;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<JsValue, JsValue> {
|
|
||||||
match self.rx.poll() {
|
|
||||||
Ok(Async::Ready(val)) => val.map(Async::Ready),
|
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
|
||||||
Err(_) => wasm_bindgen::throw_str("cannot cancel"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Converts a Rust `Future` into a JavaScript `Promise`.
|
|
||||||
///
|
|
||||||
/// This function will take any future in Rust and schedule it to be executed,
|
|
||||||
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
|
|
||||||
/// to get plumbed into the rest of a system.
|
|
||||||
///
|
|
||||||
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
|
||||||
/// to run in the background and cannot contain any stack references. The
|
|
||||||
/// returned `Promise` will be resolved or rejected when the future completes,
|
|
||||||
/// depending on whether it finishes with `Ok` or `Err`.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// Note that in wasm panics are currently translated to aborts, but "abort" in
|
|
||||||
/// this case means that a JavaScript exception is thrown. The wasm module is
|
|
||||||
/// still usable (likely erroneously) after Rust panics.
|
|
||||||
///
|
|
||||||
/// If the `future` provided panics then the returned `Promise` **will not
|
|
||||||
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
|
|
||||||
/// limitation of wasm currently that's hoped to be fixed one day!
|
|
||||||
pub fn future_to_promise<F>(future: F) -> Promise
|
|
||||||
where
|
|
||||||
F: Future<Item = JsValue, Error = JsValue> + 'static,
|
|
||||||
{
|
|
||||||
_future_to_promise(Box::new(future))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implementation of actually transforming a future into a JavaScript `Promise`.
|
|
||||||
//
|
|
||||||
// The only primitive we have to work with here is `Promise::new`, which gives
|
|
||||||
// us two callbacks that we can use to either reject or resolve the promise.
|
|
||||||
// It's our job to ensure that one of those callbacks is called at the
|
|
||||||
// appropriate time.
|
|
||||||
//
|
|
||||||
// Now we know that JavaScript (in general) can't block and is largely
|
|
||||||
// notification/callback driven. That means that our future must either have
|
|
||||||
// synchronous computational work to do, or it's "scheduled a notification" to
|
|
||||||
// happen. These notifications are likely callbacks to get executed when things
|
|
||||||
// finish (like a different promise or something like `setTimeout`). The general
|
|
||||||
// idea here is thus to do as much synchronous work as we can and then otherwise
|
|
||||||
// translate notifications of a future's task into "let's poll the future!"
|
|
||||||
//
|
|
||||||
// This isn't necessarily the greatest future executor in the world, but it
|
|
||||||
// should get the job done for now hopefully.
|
|
||||||
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
|
|
||||||
let mut future = Some(executor::spawn(future));
|
|
||||||
return Promise::new(&mut |resolve, reject| {
|
|
||||||
Package::poll(&Arc::new(Package {
|
|
||||||
spawn: RefCell::new(future.take().unwrap()),
|
|
||||||
resolve,
|
|
||||||
reject,
|
|
||||||
notified: Cell::new(State::Notified),
|
|
||||||
}));
|
|
||||||
});
|
|
||||||
|
|
||||||
struct Package {
|
|
||||||
// Our "spawned future". This'll have everything we need to poll the
|
|
||||||
// future and continue to move it forward.
|
|
||||||
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
|
|
||||||
|
|
||||||
// The current state of this future, expressed in an enum below. This
|
|
||||||
// indicates whether we're currently polling the future, received a
|
|
||||||
// notification and need to keep polling, or if we're waiting for a
|
|
||||||
// notification to come in (and no one is polling).
|
|
||||||
notified: Cell<State>,
|
|
||||||
|
|
||||||
// Our two callbacks connected to the `Promise` that we returned to
|
|
||||||
// JavaScript. We'll be invoking one of these at the end.
|
|
||||||
resolve: Function,
|
|
||||||
reject: Function,
|
|
||||||
}
|
|
||||||
|
|
||||||
// The possible states our `Package` (future) can be in, tracked internally
|
|
||||||
// and used to guide what happens when polling a future.
|
|
||||||
enum State {
|
|
||||||
// This future is currently and actively being polled. Attempting to
|
|
||||||
// access the future will result in a runtime panic and is considered a
|
|
||||||
// bug.
|
|
||||||
Polling,
|
|
||||||
|
|
||||||
// This future has been notified, while it was being polled. This marker
|
|
||||||
// is used in the `Notify` implementation below, and indicates that a
|
|
||||||
// notification was received that the future is ready to make progress.
|
|
||||||
// If seen, however, it probably means that the future is also currently
|
|
||||||
// being polled.
|
|
||||||
Notified,
|
|
||||||
|
|
||||||
// The future is blocked, waiting for something to happen. Stored here
|
|
||||||
// is a self-reference to the future itself so we can pull it out in
|
|
||||||
// `Notify` and continue polling.
|
|
||||||
//
|
|
||||||
// Note that the self-reference here is an Arc-cycle that will leak
|
|
||||||
// memory unless the future completes, but currently that should be ok
|
|
||||||
// as we'll have to stick around anyway while the future is executing!
|
|
||||||
//
|
|
||||||
// This state is removed as soon as a notification comes in, so the leak
|
|
||||||
// should only be "temporary"
|
|
||||||
Waiting(Arc<Package>),
|
|
||||||
}
|
|
||||||
|
|
||||||
// No shared memory right now, wasm is single threaded, no need to worry
|
|
||||||
// about this!
|
|
||||||
unsafe impl Send for Package {}
|
|
||||||
unsafe impl Sync for Package {}
|
|
||||||
|
|
||||||
impl Package {
|
|
||||||
// Move the future contained in `me` as far forward as we can. This will
|
|
||||||
// do as much synchronous work as possible to complete the future,
|
|
||||||
// ensuring that when it blocks we're scheduled to get notified via some
|
|
||||||
// callback somewhere at some point (vague, right?)
|
|
||||||
//
|
|
||||||
// TODO: this probably shouldn't do as much synchronous work as possible
|
|
||||||
// as it can starve other computations. Rather it should instead
|
|
||||||
// yield every so often with something like `setTimeout` with the
|
|
||||||
// timeout set to zero.
|
|
||||||
fn poll(me: &Arc<Package>) {
|
|
||||||
loop {
|
|
||||||
match me.notified.replace(State::Polling) {
|
|
||||||
// We received a notification while previously polling, or
|
|
||||||
// this is the initial poll. We've got work to do below!
|
|
||||||
State::Notified => {}
|
|
||||||
|
|
||||||
// We've gone through this loop once and no notification was
|
|
||||||
// received while we were executing work. That means we got
|
|
||||||
// `NotReady` below and we're scheduled to receive a
|
|
||||||
// notification. Block ourselves and wait for later.
|
|
||||||
//
|
|
||||||
// When the notification comes in it'll notify our task, see
|
|
||||||
// our `Waiting` state, and resume the polling process
|
|
||||||
State::Polling => {
|
|
||||||
me.notified.set(State::Waiting(me.clone()));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
State::Waiting(_) => panic!("shouldn't see waiting state!"),
|
|
||||||
}
|
|
||||||
|
|
||||||
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
|
|
||||||
// If the future is ready, immediately call the
|
|
||||||
// resolve/reject callback and then return as we're done.
|
|
||||||
Ok(Async::Ready(value)) => (value, &me.resolve),
|
|
||||||
Err(value) => (value, &me.reject),
|
|
||||||
|
|
||||||
// Otherwise keep going in our loop, if we weren't notified
|
|
||||||
// we'll break out and start waiting.
|
|
||||||
Ok(Async::NotReady) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
drop(f.call1(&JsValue::undefined(), &val));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Notify for Package {
|
|
||||||
fn notify(&self, _id: usize) {
|
|
||||||
let me = match self.notified.replace(State::Notified) {
|
|
||||||
// we need to schedule polling to resume, so keep going
|
|
||||||
State::Waiting(me) => me,
|
|
||||||
|
|
||||||
// we were already notified, and were just notified again;
|
|
||||||
// having now coalesced the notifications we return as it's
|
|
||||||
// still someone else's job to process this
|
|
||||||
State::Notified => return,
|
|
||||||
|
|
||||||
// the future was previously being polled, and we've just
|
|
||||||
// switched it to the "you're notified" state. We don't have
|
|
||||||
// access to the future as it's being polled, so the future
|
|
||||||
// polling process later sees this notification and will
|
|
||||||
// continue polling. For us, though, there's nothing else to do,
|
|
||||||
// so we bail out.
|
|
||||||
// later see
|
|
||||||
State::Polling => return,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Use `Promise.then` on a resolved promise to place our execution
|
|
||||||
// onto the next turn of the microtask queue, enqueueing our poll
|
|
||||||
// operation. We don't currently poll immediately as it turns out
|
|
||||||
// `futures` crate adapters aren't compatible with it and it also
|
|
||||||
// helps avoid blowing the stack by accident.
|
|
||||||
//
|
|
||||||
// Note that the `Rc`/`RefCell` trick here is basically to just
|
|
||||||
// ensure that our `Closure` gets cleaned up appropriately.
|
|
||||||
let promise = Promise::resolve(&JsValue::undefined());
|
|
||||||
let slot = Rc::new(RefCell::new(None));
|
|
||||||
let slot2 = slot.clone();
|
|
||||||
let closure = Closure::wrap(Box::new(move |_| {
|
|
||||||
let myself = slot2.borrow_mut().take();
|
|
||||||
debug_assert!(myself.is_some());
|
|
||||||
Package::poll(&me);
|
|
||||||
}) as Box<dyn FnMut(JsValue)>);
|
|
||||||
promise.then(&closure);
|
|
||||||
*slot.borrow_mut() = Some(closure);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Converts a Rust `Future` on a local task queue.
|
|
||||||
///
|
|
||||||
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
|
||||||
/// to run in the background and cannot contain any stack references.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// This function has the same panic behavior as `future_to_promise`.
|
|
||||||
pub fn spawn_local<F>(future: F)
|
|
||||||
where
|
|
||||||
F: Future<Item = (), Error = ()> + 'static,
|
|
||||||
{
|
|
||||||
future_to_promise(
|
|
||||||
future
|
|
||||||
.map(|()| JsValue::undefined())
|
|
||||||
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
104
crates/futures/src/wait_async_polyfill.rs
Normal file
104
crates/futures/src/wait_async_polyfill.rs
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
//!
|
||||||
|
//! The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async
|
||||||
|
//! and ported to Rust
|
||||||
|
//!
|
||||||
|
|
||||||
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* Author: Lars T Hansen, lhansen@mozilla.com
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* Polyfill for Atomics.waitAsync() for web browsers.
|
||||||
|
*
|
||||||
|
* Any kind of agent that is able to create a new Worker can use this polyfill.
|
||||||
|
*
|
||||||
|
* Load this file in all agents that will use Atomics.waitAsync.
|
||||||
|
*
|
||||||
|
* Agents that don't call Atomics.waitAsync need do nothing special.
|
||||||
|
*
|
||||||
|
* Any kind of agent can wake another agent that is sleeping in
|
||||||
|
* Atomics.waitAsync by just calling Atomics.wake for the location being slept
|
||||||
|
* on, as normal.
|
||||||
|
*
|
||||||
|
* The implementation is not completely faithful to the proposed semantics: in
|
||||||
|
* the case where an agent first asyncWaits and then waits on the same location:
|
||||||
|
* when it is woken, the two waits will be woken in order, while in the real
|
||||||
|
* semantics, the sync wait will be woken first.
|
||||||
|
*
|
||||||
|
* In this polyfill Atomics.waitAsync is not very fast.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* Implementation:
|
||||||
|
*
|
||||||
|
* For every wait we fork off a Worker to perform the wait. Workers are reused
|
||||||
|
* when possible. The worker communicates with its parent using postMessage.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use js_sys::{encode_uri_component, Array, Promise};
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::sync::atomic::AtomicI32;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
use wasm_bindgen::JsCast;
|
||||||
|
use web_sys::{MessageEvent, Worker};
|
||||||
|
|
||||||
|
const HELPER_CODE: &'static str = "
|
||||||
|
onmessage = function (ev) {
|
||||||
|
let [ia, index, value] = ev.data;
|
||||||
|
ia = new Int32Array(ia.buffer);
|
||||||
|
let result = Atomics.wait(ia, index, value);
|
||||||
|
postMessage(result);
|
||||||
|
};
|
||||||
|
";
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static HELPERS: RefCell<Vec<Worker>> = RefCell::new(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn alloc_helper() -> Worker {
|
||||||
|
HELPERS.with(|helpers| {
|
||||||
|
if let Some(helper) = helpers.borrow_mut().pop() {
|
||||||
|
return helper;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut initialization_string = "data:application/javascript,".to_owned();
|
||||||
|
let encoded: String = encode_uri_component(HELPER_CODE).into();
|
||||||
|
initialization_string.push_str(&encoded);
|
||||||
|
|
||||||
|
Worker::new(&initialization_string).unwrap_or_else(|js| wasm_bindgen::throw_val(js))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn free_helper(helper: Worker) {
|
||||||
|
HELPERS.with(move |helpers| {
|
||||||
|
let mut helpers = helpers.borrow_mut();
|
||||||
|
helpers.push(helper.clone());
|
||||||
|
helpers.truncate(10); // random arbitrary limit chosen here
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wait_async(ptr: &AtomicI32, value: i32) -> Promise {
|
||||||
|
Promise::new(&mut |resolve, _reject| {
|
||||||
|
let helper = alloc_helper();
|
||||||
|
let helper_ref = helper.clone();
|
||||||
|
|
||||||
|
let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| {
|
||||||
|
// Our helper is done waiting so it's available to wait on a
|
||||||
|
// different location, so return it to the free list.
|
||||||
|
free_helper(helper_ref);
|
||||||
|
drop(resolve.call1(&JsValue::NULL, &e.data()));
|
||||||
|
});
|
||||||
|
helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
|
||||||
|
|
||||||
|
let data = Array::of3(
|
||||||
|
&wasm_bindgen::memory(),
|
||||||
|
&JsValue::from(ptr as *const AtomicI32 as i32 / 4),
|
||||||
|
&JsValue::from(value),
|
||||||
|
);
|
||||||
|
|
||||||
|
helper
|
||||||
|
.post_message(&data)
|
||||||
|
.unwrap_or_else(|js| wasm_bindgen::throw_val(js));
|
||||||
|
})
|
||||||
|
}
|
@ -6,6 +6,8 @@ extern crate wasm_bindgen;
|
|||||||
extern crate wasm_bindgen_futures;
|
extern crate wasm_bindgen_futures;
|
||||||
extern crate wasm_bindgen_test;
|
extern crate wasm_bindgen_test;
|
||||||
|
|
||||||
|
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||||
|
|
||||||
use futures::unsync::oneshot;
|
use futures::unsync::oneshot;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
|
@ -598,10 +598,19 @@ pub mod Atomics {
|
|||||||
/// The static `Atomics.notify()` method notifies up some agents that
|
/// The static `Atomics.notify()` method notifies up some agents that
|
||||||
/// are sleeping in the wait queue.
|
/// are sleeping in the wait queue.
|
||||||
/// Note: This operation works with a shared `Int32Array` only.
|
/// Note: This operation works with a shared `Int32Array` only.
|
||||||
|
/// If `count` is not provided, notifies all the agents in the queue.
|
||||||
///
|
///
|
||||||
/// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify)
|
/// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify)
|
||||||
#[wasm_bindgen(js_namespace = Atomics, catch)]
|
#[wasm_bindgen(js_namespace = Atomics, catch)]
|
||||||
pub fn notify(typed_array: &Int32Array, index: u32, count: u32) -> Result<u32, JsValue>;
|
pub fn notify(typed_array: &Int32Array, index: u32) -> Result<u32, JsValue>;
|
||||||
|
|
||||||
|
/// Notifies up to `count` agents in the wait queue.
|
||||||
|
#[wasm_bindgen(js_namespace = Atomics, catch, js_name = notify)]
|
||||||
|
pub fn notify_with_count(
|
||||||
|
typed_array: &Int32Array,
|
||||||
|
index: u32,
|
||||||
|
count: u32,
|
||||||
|
) -> Result<u32, JsValue>;
|
||||||
|
|
||||||
/// The static `Atomics.or()` method computes a bitwise OR with a given value
|
/// The static `Atomics.or()` method computes a bitwise OR with a given value
|
||||||
/// at a given position in the array, and returns the old value at that position.
|
/// at a given position in the array, and returns the old value at that position.
|
||||||
|
@ -18,7 +18,7 @@ wasm-bindgen = { version = "0.2.48", features = ['serde-serialize'] }
|
|||||||
wasm-bindgen-futures = "0.3.25"
|
wasm-bindgen-futures = "0.3.25"
|
||||||
|
|
||||||
[dependencies.web-sys]
|
[dependencies.web-sys]
|
||||||
version = "0.3.4"
|
version = "0.3.23"
|
||||||
features = [
|
features = [
|
||||||
'CanvasRenderingContext2d',
|
'CanvasRenderingContext2d',
|
||||||
'ErrorEvent',
|
'ErrorEvent',
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use futures::sync::oneshot;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use js_sys::{Promise, Uint8ClampedArray, WebAssembly};
|
use js_sys::{Promise, Uint8ClampedArray, WebAssembly};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
@ -69,27 +70,28 @@ impl Scene {
|
|||||||
// threads so we don't lock up the main thread, so we ship off a thread
|
// threads so we don't lock up the main thread, so we ship off a thread
|
||||||
// which actually does the whole rayon business. When our returned
|
// which actually does the whole rayon business. When our returned
|
||||||
// future is resolved we can pull out the final version of the image.
|
// future is resolved we can pull out the final version of the image.
|
||||||
let done = pool
|
let (tx, rx) = oneshot::channel();
|
||||||
.run_notify(move || {
|
pool.run(move || {
|
||||||
thread_pool.install(|| {
|
thread_pool.install(|| {
|
||||||
rgb_data
|
|
||||||
.par_chunks_mut(4)
|
|
||||||
.enumerate()
|
|
||||||
.for_each(|(i, chunk)| {
|
|
||||||
let i = i as u32;
|
|
||||||
let x = i % width;
|
|
||||||
let y = i / width;
|
|
||||||
let ray = raytracer::Ray::create_prime(x, y, &scene);
|
|
||||||
let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba();
|
|
||||||
chunk[0] = result.data[0];
|
|
||||||
chunk[1] = result.data[1];
|
|
||||||
chunk[2] = result.data[2];
|
|
||||||
chunk[3] = result.data[3];
|
|
||||||
});
|
|
||||||
});
|
|
||||||
rgb_data
|
rgb_data
|
||||||
})?
|
.par_chunks_mut(4)
|
||||||
.map(move |_data| image_data(base, len, width, height).into());
|
.enumerate()
|
||||||
|
.for_each(|(i, chunk)| {
|
||||||
|
let i = i as u32;
|
||||||
|
let x = i % width;
|
||||||
|
let y = i / width;
|
||||||
|
let ray = raytracer::Ray::create_prime(x, y, &scene);
|
||||||
|
let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba();
|
||||||
|
chunk[0] = result.data[0];
|
||||||
|
chunk[1] = result.data[1];
|
||||||
|
chunk[2] = result.data[2];
|
||||||
|
chunk[3] = result.data[3];
|
||||||
|
});
|
||||||
|
});
|
||||||
|
drop(tx.send(rgb_data));
|
||||||
|
})?;
|
||||||
|
let done = rx.map(move |_data| image_data(base, len, width, height).into())
|
||||||
|
.map_err(|_| JsValue::undefined());
|
||||||
|
|
||||||
Ok(RenderingScene {
|
Ok(RenderingScene {
|
||||||
promise: wasm_bindgen_futures::future_to_promise(done),
|
promise: wasm_bindgen_futures::future_to_promise(done),
|
||||||
|
@ -1,13 +1,8 @@
|
|||||||
//! A small module that's intended to provide an example of creating a pool of
|
//! A small module that's intended to provide an example of creating a pool of
|
||||||
//! web workers which can be used to execute `rayon`-style work.
|
//! web workers which can be used to execute `rayon`-style work.
|
||||||
|
|
||||||
use futures::sync::oneshot;
|
use std::cell::RefCell;
|
||||||
use futures::Future;
|
|
||||||
use std::cell::{RefCell, UnsafeCell};
|
|
||||||
use std::mem;
|
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
use wasm_bindgen::JsCast;
|
use wasm_bindgen::JsCast;
|
||||||
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
|
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
|
||||||
@ -141,12 +136,11 @@ impl WorkerPool {
|
|||||||
/// whatn it's done the worker is ready to execute more work. This method is
|
/// whatn it's done the worker is ready to execute more work. This method is
|
||||||
/// used for all spawned workers to ensure that when the work is finished
|
/// used for all spawned workers to ensure that when the work is finished
|
||||||
/// the worker is reclaimed back into this pool.
|
/// the worker is reclaimed back into this pool.
|
||||||
fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) {
|
fn reclaim_on_message(&self, worker: Worker) {
|
||||||
let state = Rc::downgrade(&self.state);
|
let state = Rc::downgrade(&self.state);
|
||||||
let worker2 = worker.clone();
|
let worker2 = worker.clone();
|
||||||
let reclaim_slot = Rc::new(RefCell::new(None));
|
let reclaim_slot = Rc::new(RefCell::new(None));
|
||||||
let slot2 = reclaim_slot.clone();
|
let slot2 = reclaim_slot.clone();
|
||||||
let mut on_finish = Some(on_finish);
|
|
||||||
let reclaim = Closure::wrap(Box::new(move |event: Event| {
|
let reclaim = Closure::wrap(Box::new(move |event: Event| {
|
||||||
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
|
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
|
||||||
console_log!("error in worker: {}", error.message());
|
console_log!("error in worker: {}", error.message());
|
||||||
@ -155,11 +149,9 @@ impl WorkerPool {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is a completion event then we can execute our `on_finish`
|
// If this is a completion event then can deallocate our own
|
||||||
// callback and we can also deallocate our own callback by clearing
|
// callback by clearing out `slot2` which contains our own closure.
|
||||||
// out `slot2` which contains our own closure.
|
|
||||||
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
|
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
|
||||||
on_finish.take().unwrap()();
|
|
||||||
if let Some(state) = state.upgrade() {
|
if let Some(state) = state.upgrade() {
|
||||||
state.push(worker2.clone());
|
state.push(worker2.clone());
|
||||||
}
|
}
|
||||||
@ -193,80 +185,9 @@ impl WorkerPool {
|
|||||||
/// a web worker, that error is returned.
|
/// a web worker, that error is returned.
|
||||||
pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
|
pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
|
||||||
let worker = self.execute(f)?;
|
let worker = self.execute(f)?;
|
||||||
self.reclaim_on_message(worker, || {});
|
self.reclaim_on_message(worker);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes the closure `f` in a web worker, returning a future of the
|
|
||||||
/// value that `f` produces.
|
|
||||||
///
|
|
||||||
/// This method is the same as `run` execept that it allows recovering the
|
|
||||||
/// return value of the closure `f` in a nonblocking fashion with the future
|
|
||||||
/// returned.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
///
|
|
||||||
/// If an error happens while spawning a web worker or sending a message to
|
|
||||||
/// a web worker, that error is returned.
|
|
||||||
pub fn run_notify<T>(
|
|
||||||
&self,
|
|
||||||
f: impl FnOnce() -> T + Send + 'static,
|
|
||||||
) -> Result<impl Future<Item = T, Error = JsValue> + 'static, JsValue>
|
|
||||||
where
|
|
||||||
T: Send + 'static,
|
|
||||||
{
|
|
||||||
// FIXME(#1379) we should just use the `oneshot` directly as the future,
|
|
||||||
// but we have to use JS callbacks to ensure we don't have futures cross
|
|
||||||
// threads as that's currently not safe to do so.
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
let storage = Arc::new(AtomicValue::new(None));
|
|
||||||
let storage2 = storage.clone();
|
|
||||||
let worker = self.execute(move || {
|
|
||||||
assert!(storage2.replace(Some(f())).is_ok());
|
|
||||||
})?;
|
|
||||||
self.reclaim_on_message(worker, move || match storage.replace(None) {
|
|
||||||
Ok(Some(val)) => drop(tx.send(val)),
|
|
||||||
_ => unreachable!(),
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(rx.map_err(|_| JsValue::undefined()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A small helper struct representing atomic access to an internal value `T`
|
|
||||||
///
|
|
||||||
/// This struct only supports one API, `replace`, which will either succeed and
|
|
||||||
/// replace the internal value with another (returning the previous one), or it
|
|
||||||
/// will fail returning the value passed in. Failure happens when two threads
|
|
||||||
/// try to `replace` at the same time.
|
|
||||||
///
|
|
||||||
/// This is only really intended to help safely transfer information between
|
|
||||||
/// threads, it doesn't provide any synchronization capabilities itself other
|
|
||||||
/// than a guaranteed safe API.
|
|
||||||
struct AtomicValue<T> {
|
|
||||||
modifying: AtomicBool,
|
|
||||||
slot: UnsafeCell<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<T: Send> Send for AtomicValue<T> {}
|
|
||||||
unsafe impl<T: Send> Sync for AtomicValue<T> {}
|
|
||||||
|
|
||||||
impl<T> AtomicValue<T> {
|
|
||||||
fn new(val: T) -> AtomicValue<T> {
|
|
||||||
AtomicValue {
|
|
||||||
modifying: AtomicBool::new(false),
|
|
||||||
slot: UnsafeCell::new(val),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn replace(&self, val: T) -> Result<T, T> {
|
|
||||||
if self.modifying.swap(true, SeqCst) {
|
|
||||||
return Err(val);
|
|
||||||
}
|
|
||||||
let ret = unsafe { mem::replace(&mut *self.slot.get(), val) };
|
|
||||||
self.modifying.store(false, SeqCst);
|
|
||||||
Ok(ret)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PoolState {
|
impl PoolState {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user