mirror of
https://github.com/fluencelabs/wasm-bindgen
synced 2025-03-16 02:00:51 +00:00
Merge pull request #1649 from alexcrichton/fix-futures
Update futures implementation to not destroy callbacks
This commit is contained in:
commit
b64f5c0ad8
@ -3,6 +3,7 @@ use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -24,10 +25,7 @@ use wasm_bindgen::prelude::*;
|
||||
///
|
||||
/// Currently this type is constructed with `JsFuture::from`.
|
||||
pub struct JsFuture {
|
||||
resolved: oneshot::Receiver<JsValue>,
|
||||
rejected: oneshot::Receiver<JsValue>,
|
||||
_cb_resolve: Closure<dyn FnMut(JsValue)>,
|
||||
_cb_reject: Closure<dyn FnMut(JsValue)>,
|
||||
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for JsFuture {
|
||||
@ -38,31 +36,37 @@ impl fmt::Debug for JsFuture {
|
||||
|
||||
impl From<Promise> for JsFuture {
|
||||
fn from(js: Promise) -> JsFuture {
|
||||
// Use the `then` method to schedule two callbacks, one for the
|
||||
// resolved value and one for the rejected value. These two callbacks
|
||||
// will be connected to oneshot channels which feed back into our
|
||||
// future.
|
||||
//
|
||||
// This may not be the speediest option today but it should work!
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
// See comments in `src/lib.rs` for why we're using one self-contained
|
||||
// callback here.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let state = Rc::new(RefCell::new(None));
|
||||
let state2 = state.clone();
|
||||
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
|
||||
let state2 = state.clone();
|
||||
let reject = Closure::once(move |val| finish(&state2, Err(val)));
|
||||
|
||||
let cb_resolve = Closure::once(move |val| {
|
||||
tx1.send(val).unwrap_throw();
|
||||
});
|
||||
js.then2(&resolve, &reject);
|
||||
*state.borrow_mut() = Some((tx, resolve, reject));
|
||||
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
return JsFuture { rx };
|
||||
|
||||
let cb_reject = Closure::once(move |val| {
|
||||
tx2.send(val).unwrap_throw();
|
||||
});
|
||||
|
||||
js.then2(&cb_resolve, &cb_reject);
|
||||
|
||||
JsFuture {
|
||||
resolved: rx1,
|
||||
rejected: rx2,
|
||||
_cb_resolve: cb_resolve,
|
||||
_cb_reject: cb_reject,
|
||||
fn finish(
|
||||
state: &RefCell<
|
||||
Option<(
|
||||
oneshot::Sender<Result<JsValue, JsValue>>,
|
||||
Closure<dyn FnMut(JsValue)>,
|
||||
Closure<dyn FnMut(JsValue)>,
|
||||
)>,
|
||||
>,
|
||||
val: Result<JsValue, JsValue>,
|
||||
) {
|
||||
match state.borrow_mut().take() {
|
||||
// We don't have any guarantee that anyone's still listening at this
|
||||
// point (the Rust `JsFuture` could have been dropped) so simply
|
||||
// ignore any errors here.
|
||||
Some((tx, _, _)) => drop(tx.send(val)),
|
||||
None => wasm_bindgen::throw_str("cannot finish twice"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -71,16 +75,11 @@ impl Future for JsFuture {
|
||||
type Output = Result<JsValue, JsValue>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
// Test if either our resolved or rejected side is finished yet.
|
||||
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) {
|
||||
return Poll::Ready(Ok(val.unwrap_throw()));
|
||||
match self.rx.poll_unpin(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Ok(val)) => Poll::Ready(val),
|
||||
Poll::Ready(Err(_)) => wasm_bindgen::throw_str("cannot cancel"),
|
||||
}
|
||||
|
||||
if let Poll::Ready(val) = self.rejected.poll_unpin(cx) {
|
||||
return Poll::Ready(Err(val.unwrap_throw()));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,9 +128,7 @@ use wasm_bindgen::prelude::*;
|
||||
///
|
||||
/// Currently this type is constructed with `JsFuture::from`.
|
||||
pub struct JsFuture {
|
||||
resolved: oneshot::Receiver<JsValue>,
|
||||
rejected: oneshot::Receiver<JsValue>,
|
||||
callbacks: Option<(Closure<dyn FnMut(JsValue)>, Closure<dyn FnMut(JsValue)>)>,
|
||||
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for JsFuture {
|
||||
@ -142,28 +140,49 @@ impl fmt::Debug for JsFuture {
|
||||
impl From<Promise> for JsFuture {
|
||||
fn from(js: Promise) -> JsFuture {
|
||||
// Use the `then` method to schedule two callbacks, one for the
|
||||
// resolved value and one for the rejected value. These two callbacks
|
||||
// will be connected to oneshot channels which feed back into our
|
||||
// future.
|
||||
// resolved value and one for the rejected value. We're currently
|
||||
// assuming that JS engines will unconditionally invoke precisely one of
|
||||
// these callbacks, no matter what.
|
||||
//
|
||||
// This may not be the speediest option today but it should work!
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
let mut tx1 = Some(tx1);
|
||||
let resolve = Closure::wrap(Box::new(move |val| {
|
||||
drop(tx1.take().unwrap().send(val));
|
||||
}) as Box<dyn FnMut(_)>);
|
||||
let mut tx2 = Some(tx2);
|
||||
let reject = Closure::wrap(Box::new(move |val| {
|
||||
drop(tx2.take().unwrap().send(val));
|
||||
}) as Box<dyn FnMut(_)>);
|
||||
// Ideally we'd have a way to cancel the callbacks getting invoked and
|
||||
// free up state ourselves when this `JsFuture` is dropped. We don't
|
||||
// have that, though, and one of the callbacks is likely always going to
|
||||
// be invoked.
|
||||
//
|
||||
// As a result we need to make sure that no matter when the callbacks
|
||||
// are invoked they are valid to be called at any time, which means they
|
||||
// have to be self-contained. Through the `Closure::once` and some
|
||||
// `Rc`-trickery we can arrange for both instances of `Closure`, and the
|
||||
// `Rc`, to all be destroyed once the first one is called.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let state = Rc::new(RefCell::new(None));
|
||||
let state2 = state.clone();
|
||||
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
|
||||
let state2 = state.clone();
|
||||
let reject = Closure::once(move |val| finish(&state2, Err(val)));
|
||||
|
||||
js.then2(&resolve, &reject);
|
||||
*state.borrow_mut() = Some((tx, resolve, reject));
|
||||
|
||||
JsFuture {
|
||||
resolved: rx1,
|
||||
rejected: rx2,
|
||||
callbacks: Some((resolve, reject)),
|
||||
return JsFuture { rx };
|
||||
|
||||
fn finish(
|
||||
state: &RefCell<
|
||||
Option<(
|
||||
oneshot::Sender<Result<JsValue, JsValue>>,
|
||||
Closure<dyn FnMut(JsValue)>,
|
||||
Closure<dyn FnMut(JsValue)>,
|
||||
)>,
|
||||
>,
|
||||
val: Result<JsValue, JsValue>,
|
||||
) {
|
||||
match state.borrow_mut().take() {
|
||||
// We don't have any guarantee that anyone's still listening at this
|
||||
// point (the Rust `JsFuture` could have been dropped) so simply
|
||||
// ignore any errors here.
|
||||
Some((tx, _, _)) => drop(tx.send(val)),
|
||||
None => wasm_bindgen::throw_str("cannot finish twice"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -173,19 +192,11 @@ impl Future for JsFuture {
|
||||
type Error = JsValue;
|
||||
|
||||
fn poll(&mut self) -> Poll<JsValue, JsValue> {
|
||||
// Test if either our resolved or rejected side is finished yet. Note
|
||||
// that they will return errors if they're disconnected which can't
|
||||
// happen until we drop the `callbacks` field, which doesn't happen
|
||||
// till we're done, so we dont need to handle that.
|
||||
if let Ok(Async::Ready(val)) = self.resolved.poll() {
|
||||
drop(self.callbacks.take());
|
||||
return Ok(val.into());
|
||||
match self.rx.poll() {
|
||||
Ok(Async::Ready(val)) => val.map(Async::Ready),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(_) => wasm_bindgen::throw_str("cannot cancel"),
|
||||
}
|
||||
if let Ok(Async::Ready(val)) = self.rejected.poll() {
|
||||
drop(self.callbacks.take());
|
||||
return Err(val);
|
||||
}
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user