diff --git a/Cargo.toml b/Cargo.toml index 822ab23..c203d6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,18 @@ [package] name = "wasm-timer" edition = "2018" -description = "Abstraction over std::time::Instant and tokio_timer that works on WASM" -version = "0.1.2" +description = "Abstraction over std::time::Instant and futures-timer that works on WASM" +version = "0.2.0" authors = ["Pierre Krieger "] license = "MIT" repository = "https://github.com/tomaka/wasm-timer" +[dependencies] +futures-preview = "0.3.0-alpha" +pin-utils = "0.1.0-alpha.4" + [target.'cfg(any(target_arch = "wasm32"))'.dependencies] -futures = "0.1" js-sys = "0.3.14" send_wrapper = "0.2" wasm-bindgen = "0.2.37" web-sys = { version = "0.3.14", features = ["Performance", "Window"] } - -[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies] -tokio-timer = "0.2" diff --git a/src/lib.rs b/src/lib.rs index 31aee1f..b7e86f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,12 +18,13 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +pub use timer::*; + #[cfg(not(target_arch = "wasm32"))] pub use std::time::{Instant, SystemTime, UNIX_EPOCH}; -#[cfg(not(target_arch = "wasm32"))] -pub use tokio_timer::*; #[cfg(target_arch = "wasm32")] pub use wasm::*; +mod timer; #[cfg(target_arch = "wasm32")] mod wasm; diff --git a/src/wasm.rs b/src/wasm.rs index 41c9404..a430f45 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -21,15 +21,14 @@ #![cfg(target_arch = "wasm32")] -use futures::{prelude::*, sync::oneshot, try_ready}; +// TODO: global thing +use futures::{prelude::*, channel::oneshot}; use std::{error, fmt}; use std::cmp::{Eq, PartialEq, Ord, PartialOrd, Ordering}; use std::ops::{Add, Sub, AddAssign, SubAssign}; use std::time::Duration; use wasm_bindgen::{prelude::*, JsCast}; -pub use self::timeout::Timeout; - #[derive(Debug, Copy, Clone)] pub struct Instant { /// Unit is milliseconds. @@ -193,411 +192,3 @@ impl SubAssign for SystemTime { *self = *self - rhs; } } - -#[derive(Debug)] -pub struct Error; - -impl error::Error for Error { -} - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "Timer error") - } -} - -pub struct Delay { - handle: i32, - deadline: Instant, - triggered_rx: oneshot::Receiver<()>, - _cb: send_wrapper::SendWrapper>, -} - -// TODO: -unsafe impl Sync for Delay {} - -impl Delay { - pub fn new(deadline: Instant) -> Delay { - let now = Instant::now(); - if deadline > now { - let dur = deadline - now; - Delay::new_timeout(deadline, dur) - } else { - Delay::new_timeout(deadline, Duration::new(0, 0)) - } - } - - pub fn deadline(&self) -> Instant { - self.deadline - } - - fn new_timeout(deadline: Instant, duration: Duration) -> Delay { - let (tx, rx) = oneshot::channel(); - let mut tx = Some(tx); - - let cb = Closure::wrap(Box::new(move || { - let _ = tx.take().unwrap().send(()); - }) as Box); - - let handle = web_sys::window() - .expect("not in a browser") - .set_timeout_with_callback_and_timeout_and_arguments_0(cb.as_ref().unchecked_ref(), duration.as_millis() as i32) - .expect("failed to call set_timeout"); - - Delay { handle, triggered_rx: rx, deadline, _cb: send_wrapper::SendWrapper::new(cb) } - } - - fn reset_timeout(&mut self) { - // TODO: what does that do? - } - - pub fn reset(&mut self, deadline: Instant) { - *self = Delay::new(deadline); - } -} - -impl Drop for Delay { - fn drop(&mut self) { - web_sys::window().unwrap().clear_timeout_with_handle(self.handle); - } -} - -impl fmt::Debug for Delay { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("Delay").field(&self.deadline).finish() - } -} - -impl Future for Delay { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll { - self.triggered_rx.poll().map_err(|_| unreachable!()) - } -} - -/// A stream representing notifications at fixed interval -#[derive(Debug)] -pub struct Interval { - /// Future that completes the next time the `Interval` yields a value. - delay: Delay, - - /// The duration between values yielded by `Interval`. - duration: Duration, -} - -impl Interval { - /// Create a new `Interval` that starts at `at` and yields every `duration` - /// interval after that. - /// - /// Note that when it starts, it produces item too. - /// - /// The `duration` argument must be a non-zero duration. - /// - /// # Panics - /// - /// This function panics if `duration` is zero. - pub fn new(at: Instant, duration: Duration) -> Interval { - assert!( - duration > Duration::new(0, 0), - "`duration` must be non-zero." - ); - - Interval::new_with_delay(Delay::new(at), duration) - } - - /// Creates new `Interval` that yields with interval of `duration`. - /// - /// The function is shortcut for `Interval::new(Instant::now() + duration, duration)`. - /// - /// The `duration` argument must be a non-zero duration. - /// - /// # Panics - /// - /// This function panics if `duration` is zero. - pub fn new_interval(duration: Duration) -> Interval { - Interval::new(Instant::now() + duration, duration) - } - - pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval { - Interval { delay, duration } - } -} - -impl Stream for Interval { - type Item = Instant; - type Error = crate::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - // Wait for the delay to be done - let _ = try_ready!(self.delay.poll()); - - // Get the `now` by looking at the `delay` deadline - let now = self.delay.deadline(); - - // The next interval value is `duration` after the one that just - // yielded. - self.delay.reset(now + self.duration); - - // Return the current instant - Ok(Some(now).into()) - } -} - -pub mod timeout { - use super::{Delay, Instant}; - use futures::prelude::*; - use std::{error, fmt, time::Duration}; - - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - pub struct Timeout { - value: T, - delay: Delay, - } - - /// Error returned by `Timeout`. - #[derive(Debug)] - pub struct Error(Kind); - - /// Timeout error variants - #[derive(Debug)] - enum Kind { - /// Inner value returned an error - Inner(T), - - /// The timeout elapsed. - Elapsed, - - /// Timer returned an error. - Timer(crate::Error), - } - - impl Timeout { - /// Create a new `Timeout` that allows `value` to execute for a duration of - /// at most `timeout`. - /// - /// The exact behavior depends on if `value` is a `Future` or a `Stream`. - /// - /// See [type] level documentation for more details. - /// - /// [type]: # - /// - /// # Examples - /// - /// Create a new `Timeout` set to expire in 10 milliseconds. - /// - /// ```rust - /// # extern crate futures; - /// # extern crate tokio; - /// use tokio::timer::Timeout; - /// use futures::Future; - /// use futures::sync::oneshot; - /// use std::time::Duration; - /// - /// # fn main() { - /// let (tx, rx) = oneshot::channel(); - /// # tx.send(()).unwrap(); - /// - /// # tokio::runtime::current_thread::block_on_all( - /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. - /// Timeout::new(rx, Duration::from_millis(10)) - /// # ).unwrap(); - /// # } - /// ``` - pub fn new(value: T, timeout: Duration) -> Timeout { - let delay = Delay::new_timeout(Instant::now() + timeout, timeout); - - Timeout { - value, - delay, - } - } - - /// Gets a reference to the underlying value in this timeout. - pub fn get_ref(&self) -> &T { - &self.value - } - - /// Gets a mutable reference to the underlying value in this timeout. - pub fn get_mut(&mut self) -> &mut T { - &mut self.value - } - - /// Consumes this timeout, returning the underlying value. - pub fn into_inner(self) -> T { - self.value - } - } - - impl Timeout { - /// Create a new `Timeout` that completes when `future` completes or when - /// `deadline` is reached. - /// - /// This function differs from `new` in that: - /// - /// * It only accepts `Future` arguments. - /// * It sets an explicit `Instant` at which the timeout expires. - pub fn new_at(future: T, deadline: Instant) -> Timeout { - let delay = Delay::new(deadline); - - Timeout { - value: future, - delay, - } - } - } - - impl Future for Timeout - where T: Future, - { - type Item = T::Item; - type Error = Error; - - fn poll(&mut self) -> Poll { - // First, try polling the future - match self.value.poll() { - Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), - Ok(Async::NotReady) => {} - Err(e) => return Err(Error::inner(e)), - } - - // Now check the timer - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => { - Err(Error::elapsed()) - }, - Err(e) => Err(Error::timer(e)), - } - } - } - - impl Stream for Timeout - where T: Stream, - { - type Item = T::Item; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - // First, try polling the future - match self.value.poll() { - Ok(Async::Ready(v)) => { - if v.is_some() { - self.delay.reset_timeout(); - } - return Ok(Async::Ready(v)) - } - Ok(Async::NotReady) => {} - Err(e) => return Err(Error::inner(e)), - } - - // Now check the timer - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => { - self.delay.reset_timeout(); - Err(Error::elapsed()) - }, - Err(e) => Err(Error::timer(e)), - } - } - } - - impl Error { - /// Create a new `Error` representing the inner value completing with `Err`. - pub fn inner(err: T) -> Error { - Error(Kind::Inner(err)) - } - - /// Returns `true` if the error was caused by the inner value completing - /// with `Err`. - pub fn is_inner(&self) -> bool { - match self.0 { - Kind::Inner(_) => true, - _ => false, - } - } - - /// Consumes `self`, returning the inner future error. - pub fn into_inner(self) -> Option { - match self.0 { - Kind::Inner(err) => Some(err), - _ => None, - } - } - - /// Create a new `Error` representing the inner value not completing before - /// the deadline is reached. - pub fn elapsed() -> Error { - Error(Kind::Elapsed) - } - - /// Returns `true` if the error was caused by the inner value not completing - /// before the deadline is reached. - pub fn is_elapsed(&self) -> bool { - match self.0 { - Kind::Elapsed => true, - _ => false, - } - } - - /// Creates a new `Error` representing an error encountered by the timer - /// implementation - pub fn timer(err: crate::Error) -> Error { - Error(Kind::Timer(err)) - } - - /// Returns `true` if the error was caused by the timer. - pub fn is_timer(&self) -> bool { - match self.0 { - Kind::Timer(_) => true, - _ => false, - } - } - - /// Consumes `self`, returning the error raised by the timer implementation. - pub fn into_timer(self) -> Option { - match self.0 { - Kind::Timer(err) => Some(err), - _ => None, - } - } - } - - impl error::Error for Error { - fn description(&self) -> &str { - use self::Kind::*; - - match self.0 { - Inner(ref e) => e.description(), - Elapsed => "deadline has elapsed", - Timer(ref e) => e.description(), - } - } - } - - impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::Kind::*; - - match self.0 { - Inner(ref e) => e.fmt(fmt), - Elapsed => "deadline has elapsed".fmt(fmt), - Timer(ref e) => e.fmt(fmt), - } - } - } -} - -#[cfg(test)] -mod tests { - use crate::Delay; - - #[test] - fn test_send_sync() { - fn req() {} - req::(); - } -}