Update futures implementation to not destroy callbacks

JS engines guarantee that at least one of our `then` callbacks are
invoked, so that means if we destroy them prematurely they're guaranteed
to log an exception to the console! Instead to prevent exceptions from
happening tweak how the completion callbacks for JS futures are managed
and ensure that the closures stay alive until they're invoked later.

Closes #1637
This commit is contained in:
Alex Crichton 2019-07-08 09:32:25 -07:00
parent 604c036111
commit d32b6a916c
2 changed files with 78 additions and 68 deletions

View File

@ -3,6 +3,7 @@ use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -24,10 +25,7 @@ use wasm_bindgen::prelude::*;
/// ///
/// Currently this type is constructed with `JsFuture::from`. /// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture { pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>, rx: oneshot::Receiver<Result<JsValue, JsValue>>,
rejected: oneshot::Receiver<JsValue>,
_cb_resolve: Closure<dyn FnMut(JsValue)>,
_cb_reject: Closure<dyn FnMut(JsValue)>,
} }
impl fmt::Debug for JsFuture { impl fmt::Debug for JsFuture {
@ -38,31 +36,37 @@ impl fmt::Debug for JsFuture {
impl From<Promise> for JsFuture { impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture { fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the // See comments in `src/lib.rs` for why we're using one self-contained
// resolved value and one for the rejected value. These two callbacks // callback here.
// will be connected to oneshot channels which feed back into our let (tx, rx) = oneshot::channel();
// future. let state = Rc::new(RefCell::new(None));
// let state2 = state.clone();
// This may not be the speediest option today but it should work! let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
let (tx1, rx1) = oneshot::channel(); let state2 = state.clone();
let reject = Closure::once(move |val| finish(&state2, Err(val)));
let cb_resolve = Closure::once(move |val| { js.then2(&resolve, &reject);
tx1.send(val).unwrap_throw(); *state.borrow_mut() = Some((tx, resolve, reject));
});
let (tx2, rx2) = oneshot::channel(); return JsFuture { rx };
let cb_reject = Closure::once(move |val| { fn finish(
tx2.send(val).unwrap_throw(); state: &RefCell<
}); Option<(
oneshot::Sender<Result<JsValue, JsValue>>,
js.then2(&cb_resolve, &cb_reject); Closure<dyn FnMut(JsValue)>,
Closure<dyn FnMut(JsValue)>,
JsFuture { )>,
resolved: rx1, >,
rejected: rx2, val: Result<JsValue, JsValue>,
_cb_resolve: cb_resolve, ) {
_cb_reject: cb_reject, match state.borrow_mut().take() {
// We don't have any guarantee that anyone's still listening at this
// point (the Rust `JsFuture` could have been dropped) so simply
// ignore any errors here.
Some((tx, _, _)) => drop(tx.send(val)),
None => wasm_bindgen::throw_str("cannot finish twice"),
}
} }
} }
} }
@ -71,16 +75,11 @@ impl Future for JsFuture {
type Output = Result<JsValue, JsValue>; type Output = Result<JsValue, JsValue>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Test if either our resolved or rejected side is finished yet. match self.rx.poll_unpin(cx) {
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) { Poll::Pending => Poll::Pending,
return Poll::Ready(Ok(val.unwrap_throw())); Poll::Ready(Ok(val)) => Poll::Ready(val),
Poll::Ready(Err(_)) => wasm_bindgen::throw_str("cannot cancel"),
} }
if let Poll::Ready(val) = self.rejected.poll_unpin(cx) {
return Poll::Ready(Err(val.unwrap_throw()));
}
Poll::Pending
} }
} }

View File

@ -128,9 +128,7 @@ use wasm_bindgen::prelude::*;
/// ///
/// Currently this type is constructed with `JsFuture::from`. /// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture { pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>, rx: oneshot::Receiver<Result<JsValue, JsValue>>,
rejected: oneshot::Receiver<JsValue>,
callbacks: Option<(Closure<dyn FnMut(JsValue)>, Closure<dyn FnMut(JsValue)>)>,
} }
impl fmt::Debug for JsFuture { impl fmt::Debug for JsFuture {
@ -142,28 +140,49 @@ impl fmt::Debug for JsFuture {
impl From<Promise> for JsFuture { impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture { fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the // Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. These two callbacks // resolved value and one for the rejected value. We're currently
// will be connected to oneshot channels which feed back into our // assuming that JS engines will unconditionally invoke precisely one of
// future. // these callbacks, no matter what.
// //
// This may not be the speediest option today but it should work! // Ideally we'd have a way to cancel the callbacks getting invoked and
let (tx1, rx1) = oneshot::channel(); // free up state ourselves when this `JsFuture` is dropped. We don't
let (tx2, rx2) = oneshot::channel(); // have that, though, and one of the callbacks is likely always going to
let mut tx1 = Some(tx1); // be invoked.
let resolve = Closure::wrap(Box::new(move |val| { //
drop(tx1.take().unwrap().send(val)); // As a result we need to make sure that no matter when the callbacks
}) as Box<dyn FnMut(_)>); // are invoked they are valid to be called at any time, which means they
let mut tx2 = Some(tx2); // have to be self-contained. Through the `Closure::once` and some
let reject = Closure::wrap(Box::new(move |val| { // `Rc`-trickery we can arrange for both instances of `Closure`, and the
drop(tx2.take().unwrap().send(val)); // `Rc`, to all be destroyed once the first one is called.
}) as Box<dyn FnMut(_)>); let (tx, rx) = oneshot::channel();
let state = Rc::new(RefCell::new(None));
let state2 = state.clone();
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
let state2 = state.clone();
let reject = Closure::once(move |val| finish(&state2, Err(val)));
js.then2(&resolve, &reject); js.then2(&resolve, &reject);
*state.borrow_mut() = Some((tx, resolve, reject));
JsFuture { return JsFuture { rx };
resolved: rx1,
rejected: rx2, fn finish(
callbacks: Some((resolve, reject)), state: &RefCell<
Option<(
oneshot::Sender<Result<JsValue, JsValue>>,
Closure<dyn FnMut(JsValue)>,
Closure<dyn FnMut(JsValue)>,
)>,
>,
val: Result<JsValue, JsValue>,
) {
match state.borrow_mut().take() {
// We don't have any guarantee that anyone's still listening at this
// point (the Rust `JsFuture` could have been dropped) so simply
// ignore any errors here.
Some((tx, _, _)) => drop(tx.send(val)),
None => wasm_bindgen::throw_str("cannot finish twice"),
}
} }
} }
} }
@ -173,19 +192,11 @@ impl Future for JsFuture {
type Error = JsValue; type Error = JsValue;
fn poll(&mut self) -> Poll<JsValue, JsValue> { fn poll(&mut self) -> Poll<JsValue, JsValue> {
// Test if either our resolved or rejected side is finished yet. Note match self.rx.poll() {
// that they will return errors if they're disconnected which can't Ok(Async::Ready(val)) => val.map(Async::Ready),
// happen until we drop the `callbacks` field, which doesn't happen Ok(Async::NotReady) => Ok(Async::NotReady),
// till we're done, so we dont need to handle that. Err(_) => wasm_bindgen::throw_str("cannot cancel"),
if let Ok(Async::Ready(val)) = self.resolved.poll() {
drop(self.callbacks.take());
return Ok(val.into());
} }
if let Ok(Async::Ready(val)) = self.rejected.poll() {
drop(self.callbacks.take());
return Err(val);
}
Ok(Async::NotReady)
} }
} }