diff --git a/Cargo.toml b/Cargo.toml index c203d6a..2c01384 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,12 @@ repository = "https://github.com/tomaka/wasm-timer" [dependencies] futures-preview = "0.3.0-alpha" +parking_lot = "0.9" pin-utils = "0.1.0-alpha.4" [target.'cfg(any(target_arch = "wasm32"))'.dependencies] js-sys = "0.3.14" send_wrapper = "0.2" -wasm-bindgen = "0.2.37" +wasm-bindgen = { version = "0.2.37" } +wasm-bindgen-futures = { version = "0.3.25", features = ["futures_0_3"] } web-sys = { version = "0.3.14", features = ["Performance", "Window"] } diff --git a/src/timer.rs b/src/timer.rs new file mode 100644 index 0000000..7d9e45a --- /dev/null +++ b/src/timer.rs @@ -0,0 +1,625 @@ +// The `timer` module is a copy-paste from the code of `futures-timer`, but +// adjusted for WASM. +// +// Copyright (c) 2014 Alex Crichton +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// + // Apache License + // Version 2.0, January 2004 + // http://www.apache.org/licenses/ +// +// TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION +// +// 1. Definitions. +// + // "License" shall mean the terms and conditions for use, reproduction, + // and distribution as defined by Sections 1 through 9 of this document. +// + // "Licensor" shall mean the copyright owner or entity authorized by + // the copyright owner that is granting the License. +// + // "Legal Entity" shall mean the union of the acting entity and all + // other entities that control, are controlled by, or are under common + // control with that entity. For the purposes of this definition, + // "control" means (i) the power, direct or indirect, to cause the + // direction or management of such entity, whether by contract or + // otherwise, or (ii) ownership of fifty percent (50%) or more of the + // outstanding shares, or (iii) beneficial ownership of such entity. +// + // "You" (or "Your") shall mean an individual or Legal Entity + // exercising permissions granted by this License. +// + // "Source" form shall mean the preferred form for making modifications, + // including but not limited to software source code, documentation + // source, and configuration files. +// + // "Object" form shall mean any form resulting from mechanical + // transformation or translation of a Source form, including but + // not limited to compiled object code, generated documentation, + // and conversions to other media types. +// + // "Work" shall mean the work of authorship, whether in Source or + // Object form, made available under the License, as indicated by a + // copyright notice that is included in or attached to the work + // (an example is provided in the Appendix below). +// + // "Derivative Works" shall mean any work, whether in Source or Object + // form, that is based on (or derived from) the Work and for which the + // editorial revisions, annotations, elaborations, or other modifications + // represent, as a whole, an original work of authorship. For the purposes + // of this License, Derivative Works shall not include works that remain + // separable from, or merely link (or bind by name) to the interfaces of, + // the Work and Derivative Works thereof. +// + // "Contribution" shall mean any work of authorship, including + // the original version of the Work and any modifications or additions + // to that Work or Derivative Works thereof, that is intentionally + // submitted to Licensor for inclusion in the Work by the copyright owner + // or by an individual or Legal Entity authorized to submit on behalf of + // the copyright owner. For the purposes of this definition, "submitted" + // means any form of electronic, verbal, or written communication sent + // to the Licensor or its representatives, including but not limited to + // communication on electronic mailing lists, source code control systems, + // and issue tracking systems that are managed by, or on behalf of, the + // Licensor for the purpose of discussing and improving the Work, but + // excluding communication that is conspicuously marked or otherwise + // designated in writing by the copyright owner as "Not a Contribution." +// + // "Contributor" shall mean Licensor and any individual or Legal Entity + // on behalf of whom a Contribution has been received by Licensor and + // subsequently incorporated within the Work. +// +// 2. Grant of Copyright License. Subject to the terms and conditions of + // this License, each Contributor hereby grants to You a perpetual, + // worldwide, non-exclusive, no-charge, royalty-free, irrevocable + // copyright license to reproduce, prepare Derivative Works of, + // publicly display, publicly perform, sublicense, and distribute the + // Work and such Derivative Works in Source or Object form. +// +// 3. Grant of Patent License. Subject to the terms and conditions of + // this License, each Contributor hereby grants to You a perpetual, + // worldwide, non-exclusive, no-charge, royalty-free, irrevocable + // (except as stated in this section) patent license to make, have made, + // use, offer to sell, sell, import, and otherwise transfer the Work, + // where such license applies only to those patent claims licensable + // by such Contributor that are necessarily infringed by their + // Contribution(s) alone or by combination of their Contribution(s) + // with the Work to which such Contribution(s) was submitted. If You + // institute patent litigation against any entity (including a + // cross-claim or counterclaim in a lawsuit) alleging that the Work + // or a Contribution incorporated within the Work constitutes direct + // or contributory patent infringement, then any patent licenses + // granted to You under this License for that Work shall terminate + // as of the date such litigation is filed. +// +// 4. Redistribution. You may reproduce and distribute copies of the + // Work or Derivative Works thereof in any medium, with or without + // modifications, and in Source or Object form, provided that You + // meet the following conditions: +// + // (a) You must give any other recipients of the Work or + // Derivative Works a copy of this License; and +// + // (b) You must cause any modified files to carry prominent notices + // stating that You changed the files; and +// + // (c) You must retain, in the Source form of any Derivative Works + // that You distribute, all copyright, patent, trademark, and + // attribution notices from the Source form of the Work, + // excluding those notices that do not pertain to any part of + // the Derivative Works; and +// + // (d) If the Work includes a "NOTICE" text file as part of its + // distribution, then any Derivative Works that You distribute must + // include a readable copy of the attribution notices contained + // within such NOTICE file, excluding those notices that do not + // pertain to any part of the Derivative Works, in at least one + // of the following places: within a NOTICE text file distributed + // as part of the Derivative Works; within the Source form or + // documentation, if provided along with the Derivative Works; or, + // within a display generated by the Derivative Works, if and + // wherever such third-party notices normally appear. The contents + // of the NOTICE file are for informational purposes only and + // do not modify the License. You may add Your own attribution + // notices within Derivative Works that You distribute, alongside + // or as an addendum to the NOTICE text from the Work, provided + // that such additional attribution notices cannot be construed + // as modifying the License. +// + // You may add Your own copyright statement to Your modifications and + // may provide additional or different license terms and conditions + // for use, reproduction, or distribution of Your modifications, or + // for any such Derivative Works as a whole, provided Your use, + // reproduction, and distribution of the Work otherwise complies with + // the conditions stated in this License. +// +// 5. Submission of Contributions. Unless You explicitly state otherwise, + // any Contribution intentionally submitted for inclusion in the Work + // by You to the Licensor shall be under the terms and conditions of + // this License, without any additional terms or conditions. + // Notwithstanding the above, nothing herein shall supersede or modify + // the terms of any separate license agreement you may have executed + // with Licensor regarding such Contributions. +// +// 6. Trademarks. This License does not grant permission to use the trade + // names, trademarks, service marks, or product names of the Licensor, + // except as required for reasonable and customary use in describing the + // origin of the Work and reproducing the content of the NOTICE file. +// +// 7. Disclaimer of Warranty. Unless required by applicable law or + // agreed to in writing, Licensor provides the Work (and each + // Contributor provides its Contributions) on an "AS IS" BASIS, + // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + // implied, including, without limitation, any warranties or conditions + // of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + // PARTICULAR PURPOSE. You are solely responsible for determining the + // appropriateness of using or redistributing the Work and assume any + // risks associated with Your exercise of permissions under this License. +// +// 8. Limitation of Liability. In no event and under no legal theory, + // whether in tort (including negligence), contract, or otherwise, + // unless required by applicable law (such as deliberate and grossly + // negligent acts) or agreed to in writing, shall any Contributor be + // liable to You for damages, including any direct, indirect, special, + // incidental, or consequential damages of any character arising as a + // result of this License or out of the use or inability to use the + // Work (including but not limited to damages for loss of goodwill, + // work stoppage, computer failure or malfunction, or any and all + // other commercial damages or losses), even if such Contributor + // has been advised of the possibility of such damages. +// +// 9. Accepting Warranty or Additional Liability. While redistributing + // the Work or Derivative Works thereof, You may choose to offer, + // and charge a fee for, acceptance of support, warranty, indemnity, + // or other liability obligations and/or rights consistent with this + // License. However, in accepting such obligations, You may act only + // on Your own behalf and on Your sole responsibility, not on behalf + // of any other Contributor, and only if You agree to indemnify, + // defend, and hold each Contributor harmless for any liability + // incurred by, or claims asserted against, such Contributor by reason + // of your accepting any such warranty or additional liability. +// +// END OF TERMS AND CONDITIONS +// +// APPENDIX: How to apply the Apache License to your work. +// + // To apply the Apache License to your work, attach the following + // boilerplate notice, with the fields enclosed by brackets "[]" + // replaced with your own identifying information. (Don't include + // the brackets!) The text should be enclosed in the appropriate + // comment syntax for the file format. We also recommend that a + // file or class name and description of purpose be included on the + // same "printed page" as the copyright notice for easier + // identification within third-party archives. +// +// Copyright [yyyy] [name of copyright owner] +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// + // http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::Instant; +use std::cmp::Ordering; +use std::mem; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex, Weak}; +use std::task::{Context, Poll}; +use std::fmt; + +use futures::prelude::*; +use futures::task::AtomicWaker; + +use arc_list::{ArcList, Node}; +use heap::{Heap, Slot}; + +mod arc_list; +mod global; +mod heap; + +pub mod ext; +pub use ext::{TryFutureExt, TryStreamExt}; + +/// A "timer heap" used to power separately owned instances of `Delay` and +/// `Interval`. +/// +/// This timer is implemented as a priority queued-based heap. Each `Timer` +/// contains a few primary methods which which to drive it: +/// +/// * `next_wake` indicates how long the ambient system needs to sleep until it +/// invokes further processing on a `Timer` +/// * `advance_to` is what actually fires timers on the `Timer`, and should be +/// called essentially every iteration of the event loop, or when the time +/// specified by `next_wake` has elapsed. +/// * The `Future` implementation for `Timer` is used to process incoming timer +/// updates and requests. This is used to schedule new timeouts, update +/// existing ones, or delete existing timeouts. The `Future` implementation +/// will never resolve, but it'll schedule notifications of when to wake up +/// and process more messages. +/// +/// Note that if you're using this crate you probably don't need to use a +/// `Timer` as there is a global one already available for you run on a helper +/// thread. If this isn't desirable, though, then the +/// `TimerHandle::set_fallback` method can be used instead! +pub struct Timer { + inner: Arc, + timer_heap: Heap, +} + +/// A handle to a `Timer` which is used to create instances of a `Delay`. +#[derive(Clone)] +pub struct TimerHandle { + inner: Weak, +} + +mod delay; +mod interval; +pub use self::delay::Delay; +pub use self::interval::Interval; + +struct Inner { + /// List of updates the `Timer` needs to process + list: ArcList, + + /// The blocked `Timer` task to receive notifications to the `list` above. + waker: AtomicWaker, +} + +/// Shared state between the `Timer` and a `Delay`. +struct ScheduledTimer { + waker: AtomicWaker, + + // The lowest bit here is whether the timer has fired or not, the second + // lowest bit is whether the timer has been invalidated, and all the other + // bits are the "generation" of the timer which is reset during the `reset` + // function. Only timers for a matching generation are fired. + state: AtomicUsize, + + inner: Weak, + at: Mutex>, + + // TODO: this is only accessed by the timer thread, should have a more + // lightweight protection than a `Mutex` + slot: Mutex>, +} + +/// Entries in the timer heap, sorted by the instant they're firing at and then +/// also containing some payload data. +struct HeapTimer { + at: Instant, + gen: usize, + node: Arc>, +} + +impl Timer { + /// Creates a new timer heap ready to create new timers. + pub fn new() -> Timer { + Timer { + inner: Arc::new(Inner { + list: ArcList::new(), + waker: AtomicWaker::new(), + }), + timer_heap: Heap::new(), + } + } + + /// Returns a handle to this timer heap, used to create new timeouts. + pub fn handle(&self) -> TimerHandle { + TimerHandle { + inner: Arc::downgrade(&self.inner), + } + } + + /// Returns the time at which this timer next needs to be invoked with + /// `advance_to`. + /// + /// Event loops or threads typically want to sleep until the specified + /// instant. + pub fn next_event(&self) -> Option { + self.timer_heap.peek().map(|t| t.at) + } + + /// Proces any timers which are supposed to fire at or before the current + /// instant. + /// + /// This method is equivalent to `self.advance_to(Instant::now())`. + pub fn advance(&mut self) { + self.advance_to(Instant::now()) + } + + /// Proces any timers which are supposed to fire before `now` specified. + /// + /// This method should be called on `Timer` periodically to advance the + /// internal state and process any pending timers which need to fire. + pub fn advance_to(&mut self, now: Instant) { + loop { + match self.timer_heap.peek() { + Some(head) if head.at <= now => {} + Some(_) => break, + None => break, + }; + + // Flag the timer as fired and then notify its task, if any, that's + // blocked. + let heap_timer = self.timer_heap.pop().unwrap(); + *heap_timer.node.slot.lock().unwrap() = None; + let bits = heap_timer.gen << 2; + match heap_timer + .node + .state + .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst) + { + Ok(_) => heap_timer.node.waker.wake(), + Err(_b) => {} + } + } + } + + /// Either updates the timer at slot `idx` to fire at `at`, or adds a new + /// timer at `idx` and sets it to fire at `at`. + fn update_or_add(&mut self, at: Instant, node: Arc>) { + // TODO: avoid remove + push and instead just do one sift of the heap? + // In theory we could update it in place and then do the percolation + // as necessary + let gen = node.state.load(SeqCst) >> 2; + let mut slot = node.slot.lock().unwrap(); + if let Some(heap_slot) = slot.take() { + self.timer_heap.remove(heap_slot); + } + *slot = Some(self.timer_heap.push(HeapTimer { + at: at, + gen: gen, + node: node.clone(), + })); + } + + fn remove(&mut self, node: Arc>) { + // If this `idx` is still around and it's still got a registered timer, + // then we jettison it form the timer heap. + let mut slot = node.slot.lock().unwrap(); + let heap_slot = match slot.take() { + Some(slot) => slot, + None => return, + }; + self.timer_heap.remove(heap_slot); + } + + fn invalidate(&mut self, node: Arc>) { + node.state.fetch_or(0b10, SeqCst); + node.waker.wake(); + } +} + +impl Future for Timer { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner).waker.register(cx.waker()); + let mut list = self.inner.list.take(); + while let Some(node) = list.pop() { + let at = *node.at.lock().unwrap(); + match at { + Some(at) => self.update_or_add(at, node), + None => self.remove(node), + } + } + Poll::Pending + } +} + +impl Drop for Timer { + fn drop(&mut self) { + // Seal off our list to prevent any more updates from getting pushed on. + // Any timer which sees an error from the push will immediately become + // inert. + let mut list = self.inner.list.take_and_seal(); + + // Now that we'll never receive another timer, drain the list of all + // updates and also drain our heap of all active timers, invalidating + // everything. + while let Some(t) = list.pop() { + self.invalidate(t); + } + while let Some(t) = self.timer_heap.pop() { + self.invalidate(t.node); + } + } +} + +impl fmt::Debug for Timer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Timer").field("heap", &"...").finish() + } +} + +impl PartialEq for HeapTimer { + fn eq(&self, other: &HeapTimer) -> bool { + self.at == other.at + } +} + +impl Eq for HeapTimer {} + +impl PartialOrd for HeapTimer { + fn partial_cmp(&self, other: &HeapTimer) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HeapTimer { + fn cmp(&self, other: &HeapTimer) -> Ordering { + self.at.cmp(&other.at) + } +} + +static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0); + +/// Error returned from `TimerHandle::set_fallback`. +#[derive(Clone, Debug)] +pub struct SetDefaultError(()); + +impl TimerHandle { + /// Configures this timer handle to be the one returned by + /// `TimerHandle::default`. + /// + /// By default a global thread is initialized on the first call to + /// `TimerHandle::default`. This first call can happen transitively through + /// `Delay::new`. If, however, that hasn't happened yet then the global + /// default timer handle can be configured through this method. + /// + /// This method can be used to prevent the global helper thread from + /// spawning. If this method is successful then the global helper thread + /// will never get spun up. + /// + /// On success this timer handle will have installed itself globally to be + /// used as the return value for `TimerHandle::default` unless otherwise + /// specified. + /// + /// # Errors + /// + /// If another thread has already called `set_as_global_fallback` or this + /// thread otherwise loses a race to call this method then it will fail + /// returning an error. Once a call to `set_as_global_fallback` is + /// successful then no future calls may succeed. + pub fn set_as_global_fallback(self) -> Result<(), SetDefaultError> { + unsafe { + let val = self.into_usize(); + match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { + Ok(_) => Ok(()), + Err(_) => { + drop(TimerHandle::from_usize(val)); + Err(SetDefaultError(())) + } + } + } + } + + fn into_usize(self) -> usize { + unsafe { mem::transmute::, usize>(self.inner) } + } + + unsafe fn from_usize(val: usize) -> TimerHandle { + let inner = mem::transmute::>(val);; + TimerHandle { inner } + } +} + +impl Default for TimerHandle { + #[cfg(not(target_arch = "wasm32"))] + fn default() -> TimerHandle { + let mut fallback = HANDLE_FALLBACK.load(SeqCst); + + // If the fallback hasn't been previously initialized then let's spin + // up a helper thread and try to initialize with that. If we can't + // actually create a helper thread then we'll just return a "defunkt" + // handle which will return errors when timer objects are attempted to + // be associated. + if fallback == 0 { + let helper = match global::HelperThread::new() { + Ok(helper) => helper, + Err(_) => return TimerHandle { inner: Weak::new() }, + }; + + // If we successfully set ourselves as the actual fallback then we + // want to `forget` the helper thread to ensure that it persists + // globally. If we fail to set ourselves as the fallback that means + // that someone was racing with this call to + // `TimerHandle::default`. They ended up winning so we'll destroy + // our helper thread (which shuts down the thread) and reload the + // fallback. + if helper.handle().set_as_global_fallback().is_ok() { + let ret = helper.handle(); + helper.forget(); + return ret; + } + fallback = HANDLE_FALLBACK.load(SeqCst); + } + + // At this point our fallback handle global was configured so we use + // its value to reify a handle, clone it, and then forget our reified + // handle as we don't actually have an owning reference to it. + assert!(fallback != 0); + unsafe { + let handle = TimerHandle::from_usize(fallback); + let ret = handle.clone(); + drop(handle.into_usize()); + return ret; + } + } + + #[cfg(target_arch = "wasm32")] + fn default() -> TimerHandle { + let mut fallback = HANDLE_FALLBACK.load(SeqCst); + + // If the fallback hasn't been previously initialized then let's spin + // up a helper thread and try to initialize with that. If we can't + // actually create a helper thread then we'll just return a "defunkt" + // handle which will return errors when timer objects are attempted to + // be associated. + if fallback == 0 { + let handle = global::run(); + + // If we successfully set ourselves as the actual fallback then we + // want to `forget` the helper thread to ensure that it persists + // globally. If we fail to set ourselves as the fallback that means + // that someone was racing with this call to + // `TimerHandle::default`. They ended up winning so we'll destroy + // our helper thread (which shuts down the thread) and reload the + // fallback. + if handle.clone().set_as_global_fallback().is_ok() { + return handle; + } + fallback = HANDLE_FALLBACK.load(SeqCst); + } + + // At this point our fallback handle global was configured so we use + // its value to reify a handle, clone it, and then forget our reified + // handle as we don't actually have an owning reference to it. + assert!(fallback != 0); + unsafe { + let handle = TimerHandle::from_usize(fallback); + let ret = handle.clone(); + drop(handle.into_usize()); + return ret; + } + } +} + +impl fmt::Debug for TimerHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("TimerHandle").field("inner", &"...").finish() + } +} + diff --git a/src/timer/arc_list.rs b/src/timer/arc_list.rs new file mode 100644 index 0000000..4388052 --- /dev/null +++ b/src/timer/arc_list.rs @@ -0,0 +1,155 @@ +//! An atomically managed intrusive linked list of `Arc` nodes + +use std::marker; +use std::ops::Deref; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::Arc; + +pub struct ArcList { + list: AtomicUsize, + _marker: marker::PhantomData, +} + +impl ArcList { + pub fn new() -> ArcList { + ArcList { + list: AtomicUsize::new(0), + _marker: marker::PhantomData, + } + } + + /// Pushes the `data` provided onto this list if it's not already enqueued + /// in this list. + /// + /// If `data` is already enqueued in this list then this is a noop, + /// otherwise, the `data` here is pushed on the end of the list. + pub fn push(&self, data: &Arc>) -> Result<(), ()> { + if data.enqueued.swap(true, SeqCst) { + // note that even if our list is sealed off then the other end is + // still guaranteed to see us because we were previously enqueued. + return Ok(()); + } + let mut head = self.list.load(SeqCst); + let node = Arc::into_raw(data.clone()) as usize; + loop { + // If we've been sealed off, abort and return an error + if head == 1 { + unsafe { + drop(Arc::from_raw(node as *mut Node)); + } + return Err(()); + } + + // Otherwise attempt to push this node + data.next.store(head, SeqCst); + match self.list.compare_exchange(head, node, SeqCst, SeqCst) { + Ok(_) => break Ok(()), + Err(new_head) => head = new_head, + } + } + } + + /// Atomically empties this list, returning a new owned copy which can be + /// used to iterate over the entries. + pub fn take(&self) -> ArcList { + let mut list = self.list.load(SeqCst); + loop { + if list == 1 { + break; + } + match self.list.compare_exchange(list, 0, SeqCst, SeqCst) { + Ok(_) => break, + Err(l) => list = l, + } + } + ArcList { + list: AtomicUsize::new(list), + _marker: marker::PhantomData, + } + } + + /// Atomically empties this list and prevents further successful calls to + /// `push`. + pub fn take_and_seal(&self) -> ArcList { + ArcList { + list: AtomicUsize::new(self.list.swap(1, SeqCst)), + _marker: marker::PhantomData, + } + } + + /// Removes the head of the list of nodes, returning `None` if this is an + /// empty list. + pub fn pop(&mut self) -> Option>> { + let head = *self.list.get_mut(); + if head == 0 || head == 1 { + return None; + } + let head = unsafe { Arc::from_raw(head as *const Node) }; + *self.list.get_mut() = head.next.load(SeqCst); + // At this point, the node is out of the list, so store `false` so we + // can enqueue it again and see further changes. + assert!(head.enqueued.swap(false, SeqCst)); + Some(head) + } +} + +impl Drop for ArcList { + fn drop(&mut self) { + while let Some(_) = self.pop() { + // ... + } + } +} + +pub struct Node { + next: AtomicUsize, + enqueued: AtomicBool, + data: T, +} + +impl Node { + pub fn new(data: T) -> Node { + Node { + next: AtomicUsize::new(0), + enqueued: AtomicBool::new(false), + data: data, + } + } +} + +impl Deref for Node { + type Target = T; + + fn deref(&self) -> &T { + &self.data + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn smoke() { + let a = ArcList::new(); + let n = Arc::new(Node::new(1)); + assert!(a.push(&n).is_ok()); + + let mut l = a.take(); + assert_eq!(**l.pop().unwrap(), 1); + assert!(l.pop().is_none()); + } + + #[test] + fn seal() { + let a = ArcList::new(); + let n = Arc::new(Node::new(1)); + let mut l = a.take_and_seal(); + assert!(l.pop().is_none()); + assert!(a.push(&n).is_err()); + + assert!(a.take().pop().is_none()); + assert!(a.take_and_seal().pop().is_none()); + } +} diff --git a/src/timer/delay.rs b/src/timer/delay.rs new file mode 100644 index 0000000..17cd334 --- /dev/null +++ b/src/timer/delay.rs @@ -0,0 +1,208 @@ +//! Support for creating futures that represent timeouts. +//! +//! This module contains the `Delay` type which is a future that will resolve +//! at a particular point in the future. + +use std::fmt; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::task::AtomicWaker; + +use crate::Instant; +use crate::timer::arc_list::Node; +use crate::timer::{ScheduledTimer, TimerHandle}; + +/// A future representing the notification that an elapsed duration has +/// occurred. +/// +/// This is created through the `Delay::new` or `Delay::new_at` methods +/// indicating when the future should fire at. Note that these futures are not +/// intended for high resolution timers, but rather they will likely fire some +/// granularity after the exact instant that they're otherwise indicated to +/// fire at. +pub struct Delay { + state: Option>>, + when: Instant, +} + +impl Delay { + /// Creates a new future which will fire at `dur` time into the future. + /// + /// The returned object will be bound to the default timer for this thread. + /// The default timer will be spun up in a helper thread on first use. + #[inline] + pub fn new(dur: Duration) -> Delay { + Delay::new_at(Instant::now() + dur) + } + + /// Creates a new future which will fire at the time specified by `at`. + /// + /// The returned object will be bound to the default timer for this thread. + /// The default timer will be spun up in a helper thread on first use. + #[inline] + pub fn new_at(at: Instant) -> Delay { + Delay::new_handle(at, Default::default()) + } + + /// Creates a new future which will fire at the time specified by `at`. + /// + /// The returned instance of `Delay` will be bound to the timer specified by + /// the `handle` argument. + pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay { + let inner = match handle.inner.upgrade() { + Some(i) => i, + None => { + return Delay { + state: None, + when: at, + } + } + }; + let state = Arc::new(Node::new(ScheduledTimer { + at: Mutex::new(Some(at)), + state: AtomicUsize::new(0), + waker: AtomicWaker::new(), + inner: handle.inner, + slot: Mutex::new(None), + })); + + // If we fail to actually push our node then we've become an inert + // timer, meaning that we'll want to immediately return an error from + // `poll`. + if inner.list.push(&state).is_err() { + return Delay { + state: None, + when: at, + }; + } + + inner.waker.wake(); + Delay { + state: Some(state), + when: at, + } + } + + /// Resets this timeout to an new timeout which will fire at the time + /// specified by `dur`. + /// + /// This is equivalent to calling `reset_at` with `Instant::now() + dur` + #[inline] + pub fn reset(&mut self, dur: Duration) { + self.reset_at(Instant::now() + dur) + } + + /// Resets this timeout to an new timeout which will fire at the time + /// specified by `at`. + /// + /// This method is usable even of this instance of `Delay` has "already + /// fired". That is, if this future has resovled, calling this method means + /// that the future will still re-resolve at the specified instant. + /// + /// If `at` is in the past then this future will immediately be resolved + /// (when `poll` is called). + /// + /// Note that if any task is currently blocked on this future then that task + /// will be dropped. It is required to call `poll` again after this method + /// has been called to ensure tha ta task is blocked on this future. + #[inline] + pub fn reset_at(&mut self, at: Instant) { + self.when = at; + if self._reset(at).is_err() { + self.state = None + } + } + + fn _reset(&mut self, at: Instant) -> Result<(), ()> { + let state = match self.state { + Some(ref state) => state, + None => return Err(()), + }; + if let Some(timeouts) = state.inner.upgrade() { + let mut bits = state.state.load(SeqCst); + loop { + // If we've been invalidated, cancel this reset + if bits & 0b10 != 0 { + return Err(()); + } + let new = bits.wrapping_add(0b100) & !0b11; + match state.state.compare_exchange(bits, new, SeqCst, SeqCst) { + Ok(_) => break, + Err(s) => bits = s, + } + } + *state.at.lock().unwrap() = Some(at); + // If we fail to push our node then we've become an inert timer, so + // we'll want to clear our `state` field accordingly + timeouts.list.push(state)?; + timeouts.waker.wake(); + } + + Ok(()) + } +} + +#[inline] +pub fn fires_at(timeout: &Delay) -> Instant { + timeout.when +} + +impl Future for Delay { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let state = match self.state { + Some(ref state) => state, + None => { + let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away")); + return Poll::Ready(err); + } + }; + + if state.state.load(SeqCst) & 1 != 0 { + return Poll::Ready(Ok(())); + } + + state.waker.register(&cx.waker()); + + // Now that we've registered, do the full check of our own internal + // state. If we've fired the first bit is set, and if we've been + // invalidated the second bit is set. + match state.state.load(SeqCst) { + n if n & 0b01 != 0 => Poll::Ready(Ok(())), + n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "timer has gone away", + ))), + _ => Poll::Pending, + } + } +} + +impl Drop for Delay { + fn drop(&mut self) { + let state = match self.state { + Some(ref s) => s, + None => return, + }; + if let Some(timeouts) = state.inner.upgrade() { + *state.at.lock().unwrap() = None; + if timeouts.list.push(state).is_ok() { + timeouts.waker.wake(); + } + } + } +} + +impl fmt::Debug for Delay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Delay").field("when", &self.when).finish() + } +} diff --git a/src/timer/ext.rs b/src/timer/ext.rs new file mode 100644 index 0000000..0f1120a --- /dev/null +++ b/src/timer/ext.rs @@ -0,0 +1,204 @@ +//! Extension traits for the standard `Stream` and `Future` traits. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::prelude::*; +use pin_utils::unsafe_pinned; + +use crate::{Delay, Instant}; + +/// An extension trait for futures which provides convenient accessors for +/// timing out execution and such. +pub trait TryFutureExt: TryFuture + Sized { + /// Creates a new future which will take at most `dur` time to resolve from + /// the point at which this method is called. + /// + /// This combinator creates a new future which wraps the receiving future + /// in a timeout. The future returned will resolve in at most `dur` time + /// specified (relative to when this function is called). + /// + /// If the future completes before `dur` elapses then the future will + /// resolve with that item. Otherwise the future will resolve to an error + /// once `dur` has elapsed. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// use std::time::Duration; + /// use futures::prelude::*; + /// use futures_timer::TryFutureExt; + /// + /// # fn long_future() -> impl TryFuture { + /// # futures::future::ok(()) + /// # } + /// # + /// #[runtime::main] + /// async fn main() { + /// let future = long_future(); + /// let timed_out = future.timeout(Duration::from_secs(1)); + /// + /// match timed_out.await { + /// Ok(item) => println!("got {:?} within enough time!", item), + /// Err(_) => println!("took too long to produce the item"), + /// } + /// } + /// ``` + fn timeout(self, dur: Duration) -> Timeout + where + Self::Error: From, + { + Timeout { + timeout: Delay::new(dur), + future: self, + } + } + + /// Creates a new future which will resolve no later than `at` specified. + /// + /// This method is otherwise equivalent to the `timeout` method except that + /// it tweaks the moment at when the timeout elapsed to being specified with + /// an absolute value rather than a relative one. For more documentation see + /// the `timeout` method. + fn timeout_at(self, at: Instant) -> Timeout + where + Self::Error: From, + { + Timeout { + timeout: Delay::new_at(at), + future: self, + } + } +} + +impl TryFutureExt for F {} + +/// Future returned by the `FutureExt::timeout` method. +#[derive(Debug)] +pub struct Timeout +where + F: TryFuture, + F::Error: From, +{ + future: F, + timeout: Delay, +} + +impl Timeout +where + F: TryFuture, + F::Error: From, +{ + unsafe_pinned!(future: F); + unsafe_pinned!(timeout: Delay); +} + +impl Future for Timeout +where + F: TryFuture, + F::Error: From, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future().try_poll(cx) { + Poll::Pending => {} + other => return other, + } + + if self.timeout().poll(cx).is_ready() { + let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); + Poll::Ready(err) + } else { + Poll::Pending + } + } +} + +/// An extension trait for streams which provides convenient accessors for +/// timing out execution and such. +pub trait TryStreamExt: TryStream + Sized { + /// Creates a new stream which will take at most `dur` time to yield each + /// item of the stream. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// in a timeout-per-item. The stream returned will resolve in at most + /// `dur` time for each item yielded from the stream. The first item's timer + /// starts when this method is called. + /// + /// If a stream's item completes before `dur` elapses then the timer will be + /// reset for the next item. If the timeout elapses, however, then an error + /// will be yielded on the stream and the timer will be reset. + fn timeout(self, dur: Duration) -> TimeoutStream + where + Self::Error: From, + { + TimeoutStream { + timeout: Delay::new(dur), + dur, + stream: self, + } + } +} + +impl TryStreamExt for S {} + +/// Stream returned by the `StreamExt::timeout` method. +#[derive(Debug)] +pub struct TimeoutStream +where + S: TryStream, + S::Error: From, +{ + timeout: Delay, + dur: Duration, + stream: S, +} + +impl TimeoutStream +where + S: TryStream, + S::Error: From, +{ + unsafe_pinned!(timeout: Delay); + unsafe_pinned!(stream: S); +} + +impl TryStream for TimeoutStream +where + S: TryStream, + S::Error: From, +{ + type Ok = S::Ok; + type Error = S::Error; + + fn try_poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let dur = self.dur; + + let r = self.as_mut().stream().try_poll_next(cx); + match r { + Poll::Pending => {} + other => { + self.as_mut().timeout().reset(dur); + return other; + } + } + + if self.as_mut().timeout().poll(cx).is_ready() { + self.as_mut().timeout().reset(dur); + Poll::Ready(Some(Err(io::Error::new( + io::ErrorKind::TimedOut, + "stream item timed out", + ) + .into()))) + } else { + Poll::Pending + } + } +} diff --git a/src/timer/global.rs b/src/timer/global.rs new file mode 100644 index 0000000..6854148 --- /dev/null +++ b/src/timer/global.rs @@ -0,0 +1,8 @@ +pub(crate) use self::platform::*; + +#[cfg(not(target_arch = "wasm32"))] +#[path = "global/desktop.rs"] +mod platform; +#[cfg(target_arch = "wasm32")] +#[path = "global/wasm.rs"] +mod platform; diff --git a/src/timer/global/desktop.rs b/src/timer/global/desktop.rs new file mode 100644 index 0000000..072daac --- /dev/null +++ b/src/timer/global/desktop.rs @@ -0,0 +1,106 @@ +use std::future::Future; +use std::io; +use std::mem::{self, ManuallyDrop}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::{Context, RawWaker, RawWakerVTable, Waker}; +use std::thread; +use std::thread::Thread; +use std::time::Instant; + +use pin_utils::pin_mut; + +use crate::{Timer, TimerHandle}; + +pub struct HelperThread { + thread: Option>, + timer: TimerHandle, + done: Arc, +} + +impl HelperThread { + pub fn new() -> io::Result { + let timer = Timer::new(); + let timer_handle = timer.handle(); + let done = Arc::new(AtomicBool::new(false)); + let done2 = done.clone(); + let thread = thread::Builder::new().spawn(move || run(timer, done2))?; + + Ok(HelperThread { + thread: Some(thread), + done, + timer: timer_handle, + }) + } + + pub fn handle(&self) -> TimerHandle { + self.timer.clone() + } + + pub fn forget(mut self) { + self.thread.take(); + } +} + +impl Drop for HelperThread { + fn drop(&mut self) { + let thread = match self.thread.take() { + Some(thread) => thread, + None => return, + }; + self.done.store(true, Ordering::SeqCst); + thread.thread().unpark(); + drop(thread.join()); + } +} + +fn run(timer: Timer, done: Arc) { + let mut waker = current_thread_waker(); + let mut cx = Context::from_waker(&mut waker); + + pin_mut!(timer); + while !done.load(Ordering::SeqCst) { + drop(timer.as_mut().poll(&mut cx)); + + timer.advance(); + match timer.next_event() { + // Ok, block for the specified time + Some(when) => { + let now = Instant::now(); + if now < when { + thread::park_timeout(when - now) + } else { + // .. continue... + } + } + + // Just wait for one of our futures to wake up + None => thread::park(), + } + } +} + +static VTABLE: RawWakerVTable = RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); + +fn raw_clone(ptr: *const ()) -> RawWaker { + let me = ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) }); + mem::forget(me.clone()); + RawWaker::new(ptr, &VTABLE) +} + +fn raw_wake(ptr: *const ()) { + unsafe { Arc::from_raw(ptr as *const Thread) }.unpark() +} + +fn raw_wake_by_ref(ptr: *const ()) { + ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) }).unpark() +} + +fn raw_drop(ptr: *const ()) { + unsafe { Arc::from_raw(ptr as *const Thread) }; +} + +fn current_thread_waker() -> Waker { + let thread = Arc::new(thread::current()); + unsafe { Waker::from_raw(raw_clone(Arc::into_raw(thread) as *const ())) } +} diff --git a/src/timer/global/wasm.rs b/src/timer/global/wasm.rs new file mode 100644 index 0000000..e8604df --- /dev/null +++ b/src/timer/global/wasm.rs @@ -0,0 +1,77 @@ +use futures::task::ArcWake; +use parking_lot::Mutex; +use std::convert::TryFrom; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::time::Duration; +use wasm_bindgen::{JsCast, closure::Closure}; + +use crate::{Instant, Timer, TimerHandle}; + +/// Starts a background task, creates a `Timer`, and returns a handle to it. +/// +/// > **Note**: Contrary to the original `futures-timer` crate, we don't have +/// > any `forget()` method, as the task is automatically considered +/// > as "forgotten". +pub(crate) fn run() -> TimerHandle { + let timer = Timer::new(); + let handle = timer.handle(); + schedule_callback(Arc::new(Mutex::new(timer)), Duration::new(0, 0)); + handle +} + +/// Calls `Window::setTimeout` with the given `Duration`. The callback wakes up the timer and +/// processes everything. +fn schedule_callback(timer: Arc>, when: Duration) { + let window = web_sys::window().expect("Unable to access Window"); + let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0( + &Closure::once_into_js(move || { + let mut timer_lock = timer.lock(); + + // We start by polling the timer. If any new `Delay` is created, the waker will be used + // to wake up this task pre-emptively. As such, we pass a `Waker` that calls + // `schedule_callback` with a delay of `0`. + let waker = Arc::new(Waker { timer: timer.clone() }).into_waker(); + let _ = Future::poll(Pin::new(&mut *timer_lock), &mut Context::from_waker(&waker)); + + // Notify the timers that are ready. + let now = Instant::now(); + timer_lock.advance_to(now); + + // Each call to `schedule_callback` calls `schedule_callback` again, but also leaves + // the possibility for `schedule_callback` to be called in parallel. Since we don't + // want too many useless callbacks, we... + // TODO: ugh, that's a hack + if Arc::strong_count(&timer) > 20 { + return; + } + + // We call `schedule_callback` again for the next event. + let sleep_dur = timer_lock.next_event() + .map(|next_event| { + if next_event > now { + next_event - now + } else { + Duration::new(0, 0) + } + }) + .unwrap_or(Duration::from_secs(5)); + drop(timer_lock); + schedule_callback(timer, sleep_dur); + + }).unchecked_ref(), + i32::try_from(when.as_millis()).unwrap_or(0) + ).unwrap(); +} + +struct Waker { + timer: Arc>, +} + +impl ArcWake for Waker { + fn wake_by_ref(arc_self: &Arc) { + schedule_callback(arc_self.timer.clone(), Duration::new(0, 0)); + } +} diff --git a/src/timer/heap.rs b/src/timer/heap.rs new file mode 100644 index 0000000..f0f9882 --- /dev/null +++ b/src/timer/heap.rs @@ -0,0 +1,350 @@ +//! A simple binary heap with support for removal of arbitrary elements +//! +//! This heap is used to manage timer state in the event loop. All timeouts go +//! into this heap and we also cancel timeouts from this heap. The crucial +//! feature of this heap over the standard library's `BinaryHeap` is the ability +//! to remove arbitrary elements. (e.g. when a timer is canceled) +//! +//! Note that this heap is not at all optimized right now, it should hopefully +//! just work. + +use std::mem; + +pub struct Heap { + // Binary heap of items, plus the slab index indicating what position in the + // list they're in. + items: Vec<(T, usize)>, + + // A map from a slab index (assigned to an item above) to the actual index + // in the array the item appears at. + index: Vec>, + next_index: usize, +} + +enum SlabSlot { + Empty { next: usize }, + Full { value: T }, +} + +pub struct Slot { + idx: usize, +} + +impl Heap { + pub fn new() -> Heap { + Heap { + items: Vec::new(), + index: Vec::new(), + next_index: 0, + } + } + + /// Pushes an element onto this heap, returning a slot token indicating + /// where it was pushed on to. + /// + /// The slot can later get passed to `remove` to remove the element from the + /// heap, but only if the element was previously not removed from the heap. + pub fn push(&mut self, t: T) -> Slot { + self.assert_consistent(); + let len = self.items.len(); + let slot = SlabSlot::Full { value: len }; + let slot_idx = if self.next_index == self.index.len() { + self.next_index += 1; + self.index.push(slot); + self.index.len() - 1 + } else { + match mem::replace(&mut self.index[self.next_index], slot) { + SlabSlot::Empty { next } => mem::replace(&mut self.next_index, next), + SlabSlot::Full { .. } => panic!(), + } + }; + self.items.push((t, slot_idx)); + self.percolate_up(len); + self.assert_consistent(); + Slot { idx: slot_idx } + } + + pub fn peek(&self) -> Option<&T> { + self.assert_consistent(); + self.items.get(0).map(|i| &i.0) + } + + pub fn pop(&mut self) -> Option { + self.assert_consistent(); + if self.items.len() == 0 { + return None; + } + let slot = Slot { + idx: self.items[0].1, + }; + Some(self.remove(slot)) + } + + pub fn remove(&mut self, slot: Slot) -> T { + self.assert_consistent(); + let empty = SlabSlot::Empty { + next: self.next_index, + }; + let idx = match mem::replace(&mut self.index[slot.idx], empty) { + SlabSlot::Full { value } => value, + SlabSlot::Empty { .. } => panic!(), + }; + self.next_index = slot.idx; + let (item, slot_idx) = self.items.swap_remove(idx); + debug_assert_eq!(slot.idx, slot_idx); + if idx < self.items.len() { + set_index(&mut self.index, self.items[idx].1, idx); + if self.items[idx].0 < item { + self.percolate_up(idx); + } else { + self.percolate_down(idx); + } + } + self.assert_consistent(); + return item; + } + + fn percolate_up(&mut self, mut idx: usize) -> usize { + while idx > 0 { + let parent = (idx - 1) / 2; + if self.items[idx].0 >= self.items[parent].0 { + break; + } + let (a, b) = self.items.split_at_mut(idx); + mem::swap(&mut a[parent], &mut b[0]); + set_index(&mut self.index, a[parent].1, parent); + set_index(&mut self.index, b[0].1, idx); + idx = parent; + } + return idx; + } + + fn percolate_down(&mut self, mut idx: usize) -> usize { + loop { + let left = 2 * idx + 1; + let right = 2 * idx + 2; + + let mut swap_left = true; + match (self.items.get(left), self.items.get(right)) { + (Some(left), None) => { + if left.0 >= self.items[idx].0 { + break; + } + } + (Some(left), Some(right)) => { + if left.0 < self.items[idx].0 { + if right.0 < left.0 { + swap_left = false; + } + } else if right.0 < self.items[idx].0 { + swap_left = false; + } else { + break; + } + } + + (None, None) => break, + (None, Some(_right)) => panic!("not possible"), + } + + let (a, b) = if swap_left { + self.items.split_at_mut(left) + } else { + self.items.split_at_mut(right) + }; + mem::swap(&mut a[idx], &mut b[0]); + set_index(&mut self.index, a[idx].1, idx); + set_index(&mut self.index, b[0].1, a.len()); + idx = a.len(); + } + return idx; + } + + fn assert_consistent(&self) { + if !cfg!(assert_timer_heap_consistent) { + return; + } + + assert_eq!( + self.items.len(), + self.index + .iter() + .filter(|slot| { + match **slot { + SlabSlot::Full { .. } => true, + SlabSlot::Empty { .. } => false, + } + }) + .count() + ); + + for (i, &(_, j)) in self.items.iter().enumerate() { + let index = match self.index[j] { + SlabSlot::Full { value } => value, + SlabSlot::Empty { .. } => panic!(), + }; + if index != i { + panic!( + "self.index[j] != i : i={} j={} self.index[j]={}", + i, j, index + ); + } + } + + for (i, &(ref item, _)) in self.items.iter().enumerate() { + if i > 0 { + assert!(*item >= self.items[(i - 1) / 2].0, "bad at index: {}", i); + } + if let Some(left) = self.items.get(2 * i + 1) { + assert!(*item <= left.0, "bad left at index: {}", i); + } + if let Some(right) = self.items.get(2 * i + 2) { + assert!(*item <= right.0, "bad right at index: {}", i); + } + } + } +} + +fn set_index(slab: &mut Vec>, slab_slot: usize, val: T) { + match slab[slab_slot] { + SlabSlot::Full { ref mut value } => *value = val, + SlabSlot::Empty { .. } => panic!(), + } +} + +#[cfg(test)] +mod tests { + use super::Heap; + + #[test] + fn simple() { + let mut h = Heap::new(); + h.push(1); + h.push(2); + h.push(8); + h.push(4); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.pop(), Some(2)); + assert_eq!(h.pop(), Some(4)); + assert_eq!(h.pop(), Some(8)); + assert_eq!(h.pop(), None); + assert_eq!(h.pop(), None); + } + + #[test] + fn simple2() { + let mut h = Heap::new(); + h.push(5); + h.push(4); + h.push(3); + h.push(2); + h.push(1); + assert_eq!(h.pop(), Some(1)); + h.push(8); + assert_eq!(h.pop(), Some(2)); + h.push(1); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.pop(), Some(3)); + assert_eq!(h.pop(), Some(4)); + h.push(5); + assert_eq!(h.pop(), Some(5)); + assert_eq!(h.pop(), Some(5)); + assert_eq!(h.pop(), Some(8)); + } + + #[test] + fn remove() { + let mut h = Heap::new(); + h.push(5); + h.push(4); + h.push(3); + let two = h.push(2); + h.push(1); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.remove(two), 2); + h.push(1); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.pop(), Some(3)); + } + + fn vec2heap(v: Vec) -> Heap { + let mut h = Heap::new(); + for t in v { + h.push(t); + } + return h; + } + + #[test] + fn test_peek_and_pop() { + let data = vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]; + let mut sorted = data.clone(); + sorted.sort(); + let mut heap = vec2heap(data); + while heap.peek().is_some() { + assert_eq!(heap.peek().unwrap(), sorted.first().unwrap()); + assert_eq!(heap.pop().unwrap(), sorted.remove(0)); + } + } + + #[test] + fn test_push() { + let mut heap = Heap::new(); + heap.push(-2); + heap.push(-4); + heap.push(-9); + assert!(*heap.peek().unwrap() == -9); + heap.push(-11); + assert!(*heap.peek().unwrap() == -11); + heap.push(-5); + assert!(*heap.peek().unwrap() == -11); + heap.push(-27); + assert!(*heap.peek().unwrap() == -27); + heap.push(-3); + assert!(*heap.peek().unwrap() == -27); + heap.push(-103); + assert!(*heap.peek().unwrap() == -103); + } + + fn check_to_vec(mut data: Vec) { + let mut heap = Heap::new(); + for data in data.iter() { + heap.push(*data); + } + data.sort(); + let mut v = Vec::new(); + while let Some(i) = heap.pop() { + v.push(i); + } + assert_eq!(v, data); + } + + #[test] + fn test_to_vec() { + check_to_vec(vec![]); + check_to_vec(vec![5]); + check_to_vec(vec![3, 2]); + check_to_vec(vec![2, 3]); + check_to_vec(vec![5, 1, 2]); + check_to_vec(vec![1, 100, 2, 3]); + check_to_vec(vec![1, 3, 5, 7, 9, 2, 4, 6, 8, 0]); + check_to_vec(vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]); + check_to_vec(vec![9, 11, 9, 9, 9, 9, 11, 2, 3, 4, 11, 9, 0, 0, 0, 0]); + check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + check_to_vec(vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]); + check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 1, 2]); + check_to_vec(vec![5, 4, 3, 2, 1, 5, 4, 3, 2, 1, 5, 4, 3, 2, 1]); + } + + #[test] + fn test_empty_pop() { + let mut heap = Heap::::new(); + assert!(heap.pop().is_none()); + } + + #[test] + fn test_empty_peek() { + let empty = Heap::::new(); + assert!(empty.peek().is_none()); + } +} diff --git a/src/timer/interval.rs b/src/timer/interval.rs new file mode 100644 index 0000000..de201a1 --- /dev/null +++ b/src/timer/interval.rs @@ -0,0 +1,191 @@ +use pin_utils::unsafe_pinned; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::prelude::*; + +use crate::timer::delay; +use crate::{Delay, Instant, TimerHandle}; + +/// A stream representing notifications at fixed interval +/// +/// Intervals are created through the `Interval::new` or +/// `Interval::new_at` methods indicating when a first notification +/// should be triggered and when it will be repeated. +/// +/// Note that intervals are not intended for high resolution timers, but rather +/// they will likely fire some granularity after the exact instant that they're +/// otherwise indicated to fire at. +#[derive(Debug)] +pub struct Interval { + delay: Delay, + interval: Duration, +} + +impl Interval { + unsafe_pinned!(delay: Delay); + + /// Creates a new interval which will fire at `dur` time into the future, + /// and will repeat every `dur` interval after + /// + /// The returned object will be bound to the default timer for this thread. + /// The default timer will be spun up in a helper thread on first use. + pub fn new(dur: Duration) -> Interval { + Interval::new_at(Instant::now() + dur, dur) + } + + /// Creates a new interval which will fire at the time specified by `at`, + /// and then will repeat every `dur` interval after + /// + /// The returned object will be bound to the default timer for this thread. + /// The default timer will be spun up in a helper thread on first use. + pub fn new_at(at: Instant, dur: Duration) -> Interval { + Interval { + delay: Delay::new_at(at), + interval: dur, + } + } + + /// Creates a new interval which will fire at the time specified by `at`, + /// and then will repeat every `dur` interval after + /// + /// The returned object will be bound to the timer specified by `handle`. + pub fn new_handle(at: Instant, dur: Duration, handle: TimerHandle) -> Interval { + Interval { + delay: Delay::new_handle(at, handle), + interval: dur, + } + } +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if Pin::new(&mut *self).delay().poll(cx).is_pending() { + return Poll::Pending; + } + let next = next_interval(delay::fires_at(&self.delay), Instant::now(), self.interval); + self.delay.reset_at(next); + Poll::Ready(Some(())) + } +} + +/// Converts Duration object to raw nanoseconds if possible +/// +/// This is useful to divide intervals. +/// +/// While technically for large duration it's impossible to represent any +/// duration as nanoseconds, the largest duration we can represent is about +/// 427_000 years. Large enough for any interval we would use or calculate in +/// tokio. +fn duration_to_nanos(dur: Duration) -> Option { + dur.as_secs() + .checked_mul(1_000_000_000) + .and_then(|v| v.checked_add(dur.subsec_nanos() as u64)) +} + +fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { + let new = prev + interval; + if new > now { + return new; + } else { + let spent_ns = + duration_to_nanos(now.duration_since(prev)).expect("interval should be expired"); + let interval_ns = + duration_to_nanos(interval).expect("interval is less that 427 thousand years"); + let mult = spent_ns / interval_ns + 1; + assert!( + mult < (1 << 32), + "can't skip more than 4 billion intervals of {:?} \ + (trying to skip {})", + interval, + mult + ); + return prev + interval * (mult as u32); + } +} + +#[cfg(test)] +mod test { + use super::next_interval; + use std::time::{Duration, Instant}; + + struct Timeline(Instant); + + impl Timeline { + fn new() -> Timeline { + Timeline(Instant::now()) + } + fn at(&self, millis: u64) -> Instant { + self.0 + Duration::from_millis(millis) + } + fn at_ns(&self, sec: u64, nanos: u32) -> Instant { + self.0 + Duration::new(sec, nanos) + } + } + + fn dur(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + // The math around Instant/Duration isn't 100% precise due to rounding + // errors, see #249 for more info + fn almost_eq(a: Instant, b: Instant) -> bool { + if a == b { + true + } else if a > b { + a - b < Duration::from_millis(1) + } else { + b - a < Duration::from_millis(1) + } + } + + #[test] + fn norm_next() { + let tm = Timeline::new(); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(2), dur(10)), + tm.at(11) + )); + assert!(almost_eq( + next_interval(tm.at(7777), tm.at(7788), dur(100)), + tm.at(7877) + )); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(1000), dur(2100)), + tm.at(2101) + )); + } + + #[test] + fn fast_forward() { + let tm = Timeline::new(); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(1000), dur(10)), + tm.at(1001) + )); + assert!(almost_eq( + next_interval(tm.at(7777), tm.at(8888), dur(100)), + tm.at(8977) + )); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(10000), dur(2100)), + tm.at(10501) + )); + } + + /// TODO: this test actually should be successful, but since we can't + /// multiply Duration on anything larger than u32 easily we decided + /// to allow it to fail for now + #[test] + #[should_panic(expected = "can't skip more than 4 billion intervals")] + fn large_skip() { + let tm = Timeline::new(); + assert_eq!( + next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), + tm.at_ns(25, 1) + ); + } +} diff --git a/src/wasm.rs b/src/wasm.rs index a430f45..b52cfb1 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -21,13 +21,9 @@ #![cfg(target_arch = "wasm32")] -// TODO: global thing -use futures::{prelude::*, channel::oneshot}; -use std::{error, fmt}; use std::cmp::{Eq, PartialEq, Ord, PartialOrd, Ordering}; use std::ops::{Add, Sub, AddAssign, SubAssign}; use std::time::Duration; -use wasm_bindgen::{prelude::*, JsCast}; #[derive(Debug, Copy, Clone)] pub struct Instant {