From f45fb41d4cb6a567f4299594fa654824d6bae6f4 Mon Sep 17 00:00:00 2001 From: Ashley Date: Tue, 17 Dec 2019 13:00:30 +0100 Subject: [PATCH] Reduce to just Instant and SystemTime --- Cargo.toml | 18 +- src/lib.rs | 3 - src/timer.rs | 625 ------------------------------------ src/timer/arc_list.rs | 155 --------- src/timer/delay.rs | 208 ------------ src/timer/ext.rs | 203 ------------ src/timer/global.rs | 8 - src/timer/global/desktop.rs | 106 ------ src/timer/global/wasm.rs | 77 ----- src/timer/heap.rs | 350 -------------------- src/timer/interval.rs | 191 ----------- 11 files changed, 4 insertions(+), 1940 deletions(-) delete mode 100644 src/timer.rs delete mode 100644 src/timer/arc_list.rs delete mode 100644 src/timer/delay.rs delete mode 100644 src/timer/ext.rs delete mode 100644 src/timer/global.rs delete mode 100644 src/timer/global/desktop.rs delete mode 100644 src/timer/global/wasm.rs delete mode 100644 src/timer/heap.rs delete mode 100644 src/timer/interval.rs diff --git a/Cargo.toml b/Cargo.toml index 857c00f..20e693e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,23 +1,13 @@ [package] name = "wasm-timer" edition = "2018" -description = "Abstraction over std::time::Instant and futures-timer that works on WASM" -version = "0.2.4" +description = "Abstraction over std::time::Instant and std::time::SystemTime that works on WASM" +version = "0.3.0" authors = ["Pierre Krieger "] license = "MIT" repository = "https://github.com/tomaka/wasm-timer" -[dependencies] -futures = "0.3.1" -parking_lot = "0.9" -pin-utils = "0.1.0-alpha.4" - [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] -js-sys = "0.3.31" -send_wrapper = "0.2" -wasm-bindgen = "0.2.37" -wasm-bindgen-futures = "0.4.4" -web-sys = { version = "0.3.31", features = ["Performance", "Window"] } +js-sys = "0.3.32" +web-sys = { version = "0.3.32", features = ["Performance", "Window"] } -[dev-dependencies] -async-std = "1.0" diff --git a/src/lib.rs b/src/lib.rs index 4c46791..b94c7bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,13 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -pub use timer::*; - #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] pub use std::time::{Instant, SystemTime, UNIX_EPOCH}; #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] pub use wasm::*; -mod timer; #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] mod wasm; diff --git a/src/timer.rs b/src/timer.rs deleted file mode 100644 index d0cec62..0000000 --- a/src/timer.rs +++ /dev/null @@ -1,625 +0,0 @@ -// The `timer` module is a copy-paste from the code of `futures-timer`, but -// adjusted for WASM. -// -// Copyright (c) 2014 Alex Crichton -// -// Permission is hereby granted, free of charge, to any -// person obtaining a copy of this software and associated -// documentation files (the "Software"), to deal in the -// Software without restriction, including without -// limitation the rights to use, copy, modify, merge, -// publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software -// is furnished to do so, subject to the following -// conditions: -// -// The above copyright notice and this permission notice -// shall be included in all copies or substantial portions -// of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. -// - // Apache License - // Version 2.0, January 2004 - // http://www.apache.org/licenses/ -// -// TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION -// -// 1. Definitions. -// - // "License" shall mean the terms and conditions for use, reproduction, - // and distribution as defined by Sections 1 through 9 of this document. -// - // "Licensor" shall mean the copyright owner or entity authorized by - // the copyright owner that is granting the License. -// - // "Legal Entity" shall mean the union of the acting entity and all - // other entities that control, are controlled by, or are under common - // control with that entity. For the purposes of this definition, - // "control" means (i) the power, direct or indirect, to cause the - // direction or management of such entity, whether by contract or - // otherwise, or (ii) ownership of fifty percent (50%) or more of the - // outstanding shares, or (iii) beneficial ownership of such entity. -// - // "You" (or "Your") shall mean an individual or Legal Entity - // exercising permissions granted by this License. -// - // "Source" form shall mean the preferred form for making modifications, - // including but not limited to software source code, documentation - // source, and configuration files. -// - // "Object" form shall mean any form resulting from mechanical - // transformation or translation of a Source form, including but - // not limited to compiled object code, generated documentation, - // and conversions to other media types. -// - // "Work" shall mean the work of authorship, whether in Source or - // Object form, made available under the License, as indicated by a - // copyright notice that is included in or attached to the work - // (an example is provided in the Appendix below). -// - // "Derivative Works" shall mean any work, whether in Source or Object - // form, that is based on (or derived from) the Work and for which the - // editorial revisions, annotations, elaborations, or other modifications - // represent, as a whole, an original work of authorship. For the purposes - // of this License, Derivative Works shall not include works that remain - // separable from, or merely link (or bind by name) to the interfaces of, - // the Work and Derivative Works thereof. -// - // "Contribution" shall mean any work of authorship, including - // the original version of the Work and any modifications or additions - // to that Work or Derivative Works thereof, that is intentionally - // submitted to Licensor for inclusion in the Work by the copyright owner - // or by an individual or Legal Entity authorized to submit on behalf of - // the copyright owner. For the purposes of this definition, "submitted" - // means any form of electronic, verbal, or written communication sent - // to the Licensor or its representatives, including but not limited to - // communication on electronic mailing lists, source code control systems, - // and issue tracking systems that are managed by, or on behalf of, the - // Licensor for the purpose of discussing and improving the Work, but - // excluding communication that is conspicuously marked or otherwise - // designated in writing by the copyright owner as "Not a Contribution." -// - // "Contributor" shall mean Licensor and any individual or Legal Entity - // on behalf of whom a Contribution has been received by Licensor and - // subsequently incorporated within the Work. -// -// 2. Grant of Copyright License. Subject to the terms and conditions of - // this License, each Contributor hereby grants to You a perpetual, - // worldwide, non-exclusive, no-charge, royalty-free, irrevocable - // copyright license to reproduce, prepare Derivative Works of, - // publicly display, publicly perform, sublicense, and distribute the - // Work and such Derivative Works in Source or Object form. -// -// 3. Grant of Patent License. Subject to the terms and conditions of - // this License, each Contributor hereby grants to You a perpetual, - // worldwide, non-exclusive, no-charge, royalty-free, irrevocable - // (except as stated in this section) patent license to make, have made, - // use, offer to sell, sell, import, and otherwise transfer the Work, - // where such license applies only to those patent claims licensable - // by such Contributor that are necessarily infringed by their - // Contribution(s) alone or by combination of their Contribution(s) - // with the Work to which such Contribution(s) was submitted. If You - // institute patent litigation against any entity (including a - // cross-claim or counterclaim in a lawsuit) alleging that the Work - // or a Contribution incorporated within the Work constitutes direct - // or contributory patent infringement, then any patent licenses - // granted to You under this License for that Work shall terminate - // as of the date such litigation is filed. -// -// 4. Redistribution. You may reproduce and distribute copies of the - // Work or Derivative Works thereof in any medium, with or without - // modifications, and in Source or Object form, provided that You - // meet the following conditions: -// - // (a) You must give any other recipients of the Work or - // Derivative Works a copy of this License; and -// - // (b) You must cause any modified files to carry prominent notices - // stating that You changed the files; and -// - // (c) You must retain, in the Source form of any Derivative Works - // that You distribute, all copyright, patent, trademark, and - // attribution notices from the Source form of the Work, - // excluding those notices that do not pertain to any part of - // the Derivative Works; and -// - // (d) If the Work includes a "NOTICE" text file as part of its - // distribution, then any Derivative Works that You distribute must - // include a readable copy of the attribution notices contained - // within such NOTICE file, excluding those notices that do not - // pertain to any part of the Derivative Works, in at least one - // of the following places: within a NOTICE text file distributed - // as part of the Derivative Works; within the Source form or - // documentation, if provided along with the Derivative Works; or, - // within a display generated by the Derivative Works, if and - // wherever such third-party notices normally appear. The contents - // of the NOTICE file are for informational purposes only and - // do not modify the License. You may add Your own attribution - // notices within Derivative Works that You distribute, alongside - // or as an addendum to the NOTICE text from the Work, provided - // that such additional attribution notices cannot be construed - // as modifying the License. -// - // You may add Your own copyright statement to Your modifications and - // may provide additional or different license terms and conditions - // for use, reproduction, or distribution of Your modifications, or - // for any such Derivative Works as a whole, provided Your use, - // reproduction, and distribution of the Work otherwise complies with - // the conditions stated in this License. -// -// 5. Submission of Contributions. Unless You explicitly state otherwise, - // any Contribution intentionally submitted for inclusion in the Work - // by You to the Licensor shall be under the terms and conditions of - // this License, without any additional terms or conditions. - // Notwithstanding the above, nothing herein shall supersede or modify - // the terms of any separate license agreement you may have executed - // with Licensor regarding such Contributions. -// -// 6. Trademarks. This License does not grant permission to use the trade - // names, trademarks, service marks, or product names of the Licensor, - // except as required for reasonable and customary use in describing the - // origin of the Work and reproducing the content of the NOTICE file. -// -// 7. Disclaimer of Warranty. Unless required by applicable law or - // agreed to in writing, Licensor provides the Work (and each - // Contributor provides its Contributions) on an "AS IS" BASIS, - // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - // implied, including, without limitation, any warranties or conditions - // of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - // PARTICULAR PURPOSE. You are solely responsible for determining the - // appropriateness of using or redistributing the Work and assume any - // risks associated with Your exercise of permissions under this License. -// -// 8. Limitation of Liability. In no event and under no legal theory, - // whether in tort (including negligence), contract, or otherwise, - // unless required by applicable law (such as deliberate and grossly - // negligent acts) or agreed to in writing, shall any Contributor be - // liable to You for damages, including any direct, indirect, special, - // incidental, or consequential damages of any character arising as a - // result of this License or out of the use or inability to use the - // Work (including but not limited to damages for loss of goodwill, - // work stoppage, computer failure or malfunction, or any and all - // other commercial damages or losses), even if such Contributor - // has been advised of the possibility of such damages. -// -// 9. Accepting Warranty or Additional Liability. While redistributing - // the Work or Derivative Works thereof, You may choose to offer, - // and charge a fee for, acceptance of support, warranty, indemnity, - // or other liability obligations and/or rights consistent with this - // License. However, in accepting such obligations, You may act only - // on Your own behalf and on Your sole responsibility, not on behalf - // of any other Contributor, and only if You agree to indemnify, - // defend, and hold each Contributor harmless for any liability - // incurred by, or claims asserted against, such Contributor by reason - // of your accepting any such warranty or additional liability. -// -// END OF TERMS AND CONDITIONS -// -// APPENDIX: How to apply the Apache License to your work. -// - // To apply the Apache License to your work, attach the following - // boilerplate notice, with the fields enclosed by brackets "[]" - // replaced with your own identifying information. (Don't include - // the brackets!) The text should be enclosed in the appropriate - // comment syntax for the file format. We also recommend that a - // file or class name and description of purpose be included on the - // same "printed page" as the copyright notice for easier - // identification within third-party archives. -// -// Copyright [yyyy] [name of copyright owner] -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// - // http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::Instant; -use std::cmp::Ordering; -use std::mem; -use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::{Arc, Mutex, Weak}; -use std::task::{Context, Poll}; -use std::fmt; - -use futures::prelude::*; -use futures::task::AtomicWaker; - -use arc_list::{ArcList, Node}; -use heap::{Heap, Slot}; - -mod arc_list; -mod global; -mod heap; - -pub mod ext; -pub use ext::{TryFutureExt, TryStreamExt}; - -/// A "timer heap" used to power separately owned instances of `Delay` and -/// `Interval`. -/// -/// This timer is implemented as a priority queued-based heap. Each `Timer` -/// contains a few primary methods which which to drive it: -/// -/// * `next_wake` indicates how long the ambient system needs to sleep until it -/// invokes further processing on a `Timer` -/// * `advance_to` is what actually fires timers on the `Timer`, and should be -/// called essentially every iteration of the event loop, or when the time -/// specified by `next_wake` has elapsed. -/// * The `Future` implementation for `Timer` is used to process incoming timer -/// updates and requests. This is used to schedule new timeouts, update -/// existing ones, or delete existing timeouts. The `Future` implementation -/// will never resolve, but it'll schedule notifications of when to wake up -/// and process more messages. -/// -/// Note that if you're using this crate you probably don't need to use a -/// `Timer` as there is a global one already available for you run on a helper -/// thread. If this isn't desirable, though, then the -/// `TimerHandle::set_fallback` method can be used instead! -pub struct Timer { - inner: Arc, - timer_heap: Heap, -} - -/// A handle to a `Timer` which is used to create instances of a `Delay`. -#[derive(Clone)] -pub struct TimerHandle { - inner: Weak, -} - -mod delay; -mod interval; -pub use self::delay::Delay; -pub use self::interval::Interval; - -struct Inner { - /// List of updates the `Timer` needs to process - list: ArcList, - - /// The blocked `Timer` task to receive notifications to the `list` above. - waker: AtomicWaker, -} - -/// Shared state between the `Timer` and a `Delay`. -struct ScheduledTimer { - waker: AtomicWaker, - - // The lowest bit here is whether the timer has fired or not, the second - // lowest bit is whether the timer has been invalidated, and all the other - // bits are the "generation" of the timer which is reset during the `reset` - // function. Only timers for a matching generation are fired. - state: AtomicUsize, - - inner: Weak, - at: Mutex>, - - // TODO: this is only accessed by the timer thread, should have a more - // lightweight protection than a `Mutex` - slot: Mutex>, -} - -/// Entries in the timer heap, sorted by the instant they're firing at and then -/// also containing some payload data. -struct HeapTimer { - at: Instant, - gen: usize, - node: Arc>, -} - -impl Timer { - /// Creates a new timer heap ready to create new timers. - pub fn new() -> Timer { - Timer { - inner: Arc::new(Inner { - list: ArcList::new(), - waker: AtomicWaker::new(), - }), - timer_heap: Heap::new(), - } - } - - /// Returns a handle to this timer heap, used to create new timeouts. - pub fn handle(&self) -> TimerHandle { - TimerHandle { - inner: Arc::downgrade(&self.inner), - } - } - - /// Returns the time at which this timer next needs to be invoked with - /// `advance_to`. - /// - /// Event loops or threads typically want to sleep until the specified - /// instant. - pub fn next_event(&self) -> Option { - self.timer_heap.peek().map(|t| t.at) - } - - /// Proces any timers which are supposed to fire at or before the current - /// instant. - /// - /// This method is equivalent to `self.advance_to(Instant::now())`. - pub fn advance(&mut self) { - self.advance_to(Instant::now()) - } - - /// Proces any timers which are supposed to fire before `now` specified. - /// - /// This method should be called on `Timer` periodically to advance the - /// internal state and process any pending timers which need to fire. - pub fn advance_to(&mut self, now: Instant) { - loop { - match self.timer_heap.peek() { - Some(head) if head.at <= now => {} - Some(_) => break, - None => break, - }; - - // Flag the timer as fired and then notify its task, if any, that's - // blocked. - let heap_timer = self.timer_heap.pop().unwrap(); - *heap_timer.node.slot.lock().unwrap() = None; - let bits = heap_timer.gen << 2; - match heap_timer - .node - .state - .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst) - { - Ok(_) => heap_timer.node.waker.wake(), - Err(_b) => {} - } - } - } - - /// Either updates the timer at slot `idx` to fire at `at`, or adds a new - /// timer at `idx` and sets it to fire at `at`. - fn update_or_add(&mut self, at: Instant, node: Arc>) { - // TODO: avoid remove + push and instead just do one sift of the heap? - // In theory we could update it in place and then do the percolation - // as necessary - let gen = node.state.load(SeqCst) >> 2; - let mut slot = node.slot.lock().unwrap(); - if let Some(heap_slot) = slot.take() { - self.timer_heap.remove(heap_slot); - } - *slot = Some(self.timer_heap.push(HeapTimer { - at: at, - gen: gen, - node: node.clone(), - })); - } - - fn remove(&mut self, node: Arc>) { - // If this `idx` is still around and it's still got a registered timer, - // then we jettison it form the timer heap. - let mut slot = node.slot.lock().unwrap(); - let heap_slot = match slot.take() { - Some(slot) => slot, - None => return, - }; - self.timer_heap.remove(heap_slot); - } - - fn invalidate(&mut self, node: Arc>) { - node.state.fetch_or(0b10, SeqCst); - node.waker.wake(); - } -} - -impl Future for Timer { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.inner).waker.register(cx.waker()); - let mut list = self.inner.list.take(); - while let Some(node) = list.pop() { - let at = *node.at.lock().unwrap(); - match at { - Some(at) => self.update_or_add(at, node), - None => self.remove(node), - } - } - Poll::Pending - } -} - -impl Drop for Timer { - fn drop(&mut self) { - // Seal off our list to prevent any more updates from getting pushed on. - // Any timer which sees an error from the push will immediately become - // inert. - let mut list = self.inner.list.take_and_seal(); - - // Now that we'll never receive another timer, drain the list of all - // updates and also drain our heap of all active timers, invalidating - // everything. - while let Some(t) = list.pop() { - self.invalidate(t); - } - while let Some(t) = self.timer_heap.pop() { - self.invalidate(t.node); - } - } -} - -impl fmt::Debug for Timer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Timer").field("heap", &"...").finish() - } -} - -impl PartialEq for HeapTimer { - fn eq(&self, other: &HeapTimer) -> bool { - self.at == other.at - } -} - -impl Eq for HeapTimer {} - -impl PartialOrd for HeapTimer { - fn partial_cmp(&self, other: &HeapTimer) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for HeapTimer { - fn cmp(&self, other: &HeapTimer) -> Ordering { - self.at.cmp(&other.at) - } -} - -static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0); - -/// Error returned from `TimerHandle::set_fallback`. -#[derive(Clone, Debug)] -pub struct SetDefaultError(()); - -impl TimerHandle { - /// Configures this timer handle to be the one returned by - /// `TimerHandle::default`. - /// - /// By default a global thread is initialized on the first call to - /// `TimerHandle::default`. This first call can happen transitively through - /// `Delay::new`. If, however, that hasn't happened yet then the global - /// default timer handle can be configured through this method. - /// - /// This method can be used to prevent the global helper thread from - /// spawning. If this method is successful then the global helper thread - /// will never get spun up. - /// - /// On success this timer handle will have installed itself globally to be - /// used as the return value for `TimerHandle::default` unless otherwise - /// specified. - /// - /// # Errors - /// - /// If another thread has already called `set_as_global_fallback` or this - /// thread otherwise loses a race to call this method then it will fail - /// returning an error. Once a call to `set_as_global_fallback` is - /// successful then no future calls may succeed. - pub fn set_as_global_fallback(self) -> Result<(), SetDefaultError> { - unsafe { - let val = self.into_usize(); - match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { - Ok(_) => Ok(()), - Err(_) => { - drop(TimerHandle::from_usize(val)); - Err(SetDefaultError(())) - } - } - } - } - - fn into_usize(self) -> usize { - unsafe { mem::transmute::, usize>(self.inner) } - } - - unsafe fn from_usize(val: usize) -> TimerHandle { - let inner = mem::transmute::>(val); - TimerHandle { inner } - } -} - -impl Default for TimerHandle { - #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - fn default() -> TimerHandle { - let mut fallback = HANDLE_FALLBACK.load(SeqCst); - - // If the fallback hasn't been previously initialized then let's spin - // up a helper thread and try to initialize with that. If we can't - // actually create a helper thread then we'll just return a "defunkt" - // handle which will return errors when timer objects are attempted to - // be associated. - if fallback == 0 { - let helper = match global::HelperThread::new() { - Ok(helper) => helper, - Err(_) => return TimerHandle { inner: Weak::new() }, - }; - - // If we successfully set ourselves as the actual fallback then we - // want to `forget` the helper thread to ensure that it persists - // globally. If we fail to set ourselves as the fallback that means - // that someone was racing with this call to - // `TimerHandle::default`. They ended up winning so we'll destroy - // our helper thread (which shuts down the thread) and reload the - // fallback. - if helper.handle().set_as_global_fallback().is_ok() { - let ret = helper.handle(); - helper.forget(); - return ret; - } - fallback = HANDLE_FALLBACK.load(SeqCst); - } - - // At this point our fallback handle global was configured so we use - // its value to reify a handle, clone it, and then forget our reified - // handle as we don't actually have an owning reference to it. - assert!(fallback != 0); - unsafe { - let handle = TimerHandle::from_usize(fallback); - let ret = handle.clone(); - drop(handle.into_usize()); - return ret; - } - } - - #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - fn default() -> TimerHandle { - let mut fallback = HANDLE_FALLBACK.load(SeqCst); - - // If the fallback hasn't been previously initialized then let's spin - // up a helper thread and try to initialize with that. If we can't - // actually create a helper thread then we'll just return a "defunkt" - // handle which will return errors when timer objects are attempted to - // be associated. - if fallback == 0 { - let handle = global::run(); - - // If we successfully set ourselves as the actual fallback then we - // want to `forget` the helper thread to ensure that it persists - // globally. If we fail to set ourselves as the fallback that means - // that someone was racing with this call to - // `TimerHandle::default`. They ended up winning so we'll destroy - // our helper thread (which shuts down the thread) and reload the - // fallback. - if handle.clone().set_as_global_fallback().is_ok() { - return handle; - } - fallback = HANDLE_FALLBACK.load(SeqCst); - } - - // At this point our fallback handle global was configured so we use - // its value to reify a handle, clone it, and then forget our reified - // handle as we don't actually have an owning reference to it. - assert!(fallback != 0); - unsafe { - let handle = TimerHandle::from_usize(fallback); - let ret = handle.clone(); - drop(handle.into_usize()); - return ret; - } - } -} - -impl fmt::Debug for TimerHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("TimerHandle").field("inner", &"...").finish() - } -} - diff --git a/src/timer/arc_list.rs b/src/timer/arc_list.rs deleted file mode 100644 index 4388052..0000000 --- a/src/timer/arc_list.rs +++ /dev/null @@ -1,155 +0,0 @@ -//! An atomically managed intrusive linked list of `Arc` nodes - -use std::marker; -use std::ops::Deref; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::{AtomicBool, AtomicUsize}; -use std::sync::Arc; - -pub struct ArcList { - list: AtomicUsize, - _marker: marker::PhantomData, -} - -impl ArcList { - pub fn new() -> ArcList { - ArcList { - list: AtomicUsize::new(0), - _marker: marker::PhantomData, - } - } - - /// Pushes the `data` provided onto this list if it's not already enqueued - /// in this list. - /// - /// If `data` is already enqueued in this list then this is a noop, - /// otherwise, the `data` here is pushed on the end of the list. - pub fn push(&self, data: &Arc>) -> Result<(), ()> { - if data.enqueued.swap(true, SeqCst) { - // note that even if our list is sealed off then the other end is - // still guaranteed to see us because we were previously enqueued. - return Ok(()); - } - let mut head = self.list.load(SeqCst); - let node = Arc::into_raw(data.clone()) as usize; - loop { - // If we've been sealed off, abort and return an error - if head == 1 { - unsafe { - drop(Arc::from_raw(node as *mut Node)); - } - return Err(()); - } - - // Otherwise attempt to push this node - data.next.store(head, SeqCst); - match self.list.compare_exchange(head, node, SeqCst, SeqCst) { - Ok(_) => break Ok(()), - Err(new_head) => head = new_head, - } - } - } - - /// Atomically empties this list, returning a new owned copy which can be - /// used to iterate over the entries. - pub fn take(&self) -> ArcList { - let mut list = self.list.load(SeqCst); - loop { - if list == 1 { - break; - } - match self.list.compare_exchange(list, 0, SeqCst, SeqCst) { - Ok(_) => break, - Err(l) => list = l, - } - } - ArcList { - list: AtomicUsize::new(list), - _marker: marker::PhantomData, - } - } - - /// Atomically empties this list and prevents further successful calls to - /// `push`. - pub fn take_and_seal(&self) -> ArcList { - ArcList { - list: AtomicUsize::new(self.list.swap(1, SeqCst)), - _marker: marker::PhantomData, - } - } - - /// Removes the head of the list of nodes, returning `None` if this is an - /// empty list. - pub fn pop(&mut self) -> Option>> { - let head = *self.list.get_mut(); - if head == 0 || head == 1 { - return None; - } - let head = unsafe { Arc::from_raw(head as *const Node) }; - *self.list.get_mut() = head.next.load(SeqCst); - // At this point, the node is out of the list, so store `false` so we - // can enqueue it again and see further changes. - assert!(head.enqueued.swap(false, SeqCst)); - Some(head) - } -} - -impl Drop for ArcList { - fn drop(&mut self) { - while let Some(_) = self.pop() { - // ... - } - } -} - -pub struct Node { - next: AtomicUsize, - enqueued: AtomicBool, - data: T, -} - -impl Node { - pub fn new(data: T) -> Node { - Node { - next: AtomicUsize::new(0), - enqueued: AtomicBool::new(false), - data: data, - } - } -} - -impl Deref for Node { - type Target = T; - - fn deref(&self) -> &T { - &self.data - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn smoke() { - let a = ArcList::new(); - let n = Arc::new(Node::new(1)); - assert!(a.push(&n).is_ok()); - - let mut l = a.take(); - assert_eq!(**l.pop().unwrap(), 1); - assert!(l.pop().is_none()); - } - - #[test] - fn seal() { - let a = ArcList::new(); - let n = Arc::new(Node::new(1)); - let mut l = a.take_and_seal(); - assert!(l.pop().is_none()); - assert!(a.push(&n).is_err()); - - assert!(a.take().pop().is_none()); - assert!(a.take_and_seal().pop().is_none()); - } -} diff --git a/src/timer/delay.rs b/src/timer/delay.rs deleted file mode 100644 index 17cd334..0000000 --- a/src/timer/delay.rs +++ /dev/null @@ -1,208 +0,0 @@ -//! Support for creating futures that represent timeouts. -//! -//! This module contains the `Delay` type which is a future that will resolve -//! at a particular point in the future. - -use std::fmt; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; -use std::time::Duration; - -use futures::task::AtomicWaker; - -use crate::Instant; -use crate::timer::arc_list::Node; -use crate::timer::{ScheduledTimer, TimerHandle}; - -/// A future representing the notification that an elapsed duration has -/// occurred. -/// -/// This is created through the `Delay::new` or `Delay::new_at` methods -/// indicating when the future should fire at. Note that these futures are not -/// intended for high resolution timers, but rather they will likely fire some -/// granularity after the exact instant that they're otherwise indicated to -/// fire at. -pub struct Delay { - state: Option>>, - when: Instant, -} - -impl Delay { - /// Creates a new future which will fire at `dur` time into the future. - /// - /// The returned object will be bound to the default timer for this thread. - /// The default timer will be spun up in a helper thread on first use. - #[inline] - pub fn new(dur: Duration) -> Delay { - Delay::new_at(Instant::now() + dur) - } - - /// Creates a new future which will fire at the time specified by `at`. - /// - /// The returned object will be bound to the default timer for this thread. - /// The default timer will be spun up in a helper thread on first use. - #[inline] - pub fn new_at(at: Instant) -> Delay { - Delay::new_handle(at, Default::default()) - } - - /// Creates a new future which will fire at the time specified by `at`. - /// - /// The returned instance of `Delay` will be bound to the timer specified by - /// the `handle` argument. - pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay { - let inner = match handle.inner.upgrade() { - Some(i) => i, - None => { - return Delay { - state: None, - when: at, - } - } - }; - let state = Arc::new(Node::new(ScheduledTimer { - at: Mutex::new(Some(at)), - state: AtomicUsize::new(0), - waker: AtomicWaker::new(), - inner: handle.inner, - slot: Mutex::new(None), - })); - - // If we fail to actually push our node then we've become an inert - // timer, meaning that we'll want to immediately return an error from - // `poll`. - if inner.list.push(&state).is_err() { - return Delay { - state: None, - when: at, - }; - } - - inner.waker.wake(); - Delay { - state: Some(state), - when: at, - } - } - - /// Resets this timeout to an new timeout which will fire at the time - /// specified by `dur`. - /// - /// This is equivalent to calling `reset_at` with `Instant::now() + dur` - #[inline] - pub fn reset(&mut self, dur: Duration) { - self.reset_at(Instant::now() + dur) - } - - /// Resets this timeout to an new timeout which will fire at the time - /// specified by `at`. - /// - /// This method is usable even of this instance of `Delay` has "already - /// fired". That is, if this future has resovled, calling this method means - /// that the future will still re-resolve at the specified instant. - /// - /// If `at` is in the past then this future will immediately be resolved - /// (when `poll` is called). - /// - /// Note that if any task is currently blocked on this future then that task - /// will be dropped. It is required to call `poll` again after this method - /// has been called to ensure tha ta task is blocked on this future. - #[inline] - pub fn reset_at(&mut self, at: Instant) { - self.when = at; - if self._reset(at).is_err() { - self.state = None - } - } - - fn _reset(&mut self, at: Instant) -> Result<(), ()> { - let state = match self.state { - Some(ref state) => state, - None => return Err(()), - }; - if let Some(timeouts) = state.inner.upgrade() { - let mut bits = state.state.load(SeqCst); - loop { - // If we've been invalidated, cancel this reset - if bits & 0b10 != 0 { - return Err(()); - } - let new = bits.wrapping_add(0b100) & !0b11; - match state.state.compare_exchange(bits, new, SeqCst, SeqCst) { - Ok(_) => break, - Err(s) => bits = s, - } - } - *state.at.lock().unwrap() = Some(at); - // If we fail to push our node then we've become an inert timer, so - // we'll want to clear our `state` field accordingly - timeouts.list.push(state)?; - timeouts.waker.wake(); - } - - Ok(()) - } -} - -#[inline] -pub fn fires_at(timeout: &Delay) -> Instant { - timeout.when -} - -impl Future for Delay { - type Output = io::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let state = match self.state { - Some(ref state) => state, - None => { - let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away")); - return Poll::Ready(err); - } - }; - - if state.state.load(SeqCst) & 1 != 0 { - return Poll::Ready(Ok(())); - } - - state.waker.register(&cx.waker()); - - // Now that we've registered, do the full check of our own internal - // state. If we've fired the first bit is set, and if we've been - // invalidated the second bit is set. - match state.state.load(SeqCst) { - n if n & 0b01 != 0 => Poll::Ready(Ok(())), - n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "timer has gone away", - ))), - _ => Poll::Pending, - } - } -} - -impl Drop for Delay { - fn drop(&mut self) { - let state = match self.state { - Some(ref s) => s, - None => return, - }; - if let Some(timeouts) = state.inner.upgrade() { - *state.at.lock().unwrap() = None; - if timeouts.list.push(state).is_ok() { - timeouts.waker.wake(); - } - } - } -} - -impl fmt::Debug for Delay { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Delay").field("when", &self.when).finish() - } -} diff --git a/src/timer/ext.rs b/src/timer/ext.rs deleted file mode 100644 index 060ae83..0000000 --- a/src/timer/ext.rs +++ /dev/null @@ -1,203 +0,0 @@ -//! Extension traits for the standard `Stream` and `Future` traits. - -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use futures::prelude::*; -use pin_utils::unsafe_pinned; - -use crate::{Delay, Instant}; - -/// An extension trait for futures which provides convenient accessors for -/// timing out execution and such. -pub trait TryFutureExt: TryFuture + Sized { - /// Creates a new future which will take at most `dur` time to resolve from - /// the point at which this method is called. - /// - /// This combinator creates a new future which wraps the receiving future - /// in a timeout. The future returned will resolve in at most `dur` time - /// specified (relative to when this function is called). - /// - /// If the future completes before `dur` elapses then the future will - /// resolve with that item. Otherwise the future will resolve to an error - /// once `dur` has elapsed. - /// - /// # Examples - /// - /// ```no_run - /// use std::time::Duration; - /// use futures::prelude::*; - /// use wasm_timer::TryFutureExt; - /// - /// # fn long_future() -> impl TryFuture { - /// # futures::future::ok(()) - /// # } - /// # - /// fn main() { - /// let future = long_future(); - /// let timed_out = future.timeout(Duration::from_secs(1)); - /// - /// async_std::task::block_on(async { - /// match timed_out.await { - /// Ok(item) => println!("got {:?} within enough time!", item), - /// Err(_) => println!("took too long to produce the item"), - /// } - /// }) - /// } - /// ``` - fn timeout(self, dur: Duration) -> Timeout - where - Self::Error: From, - { - Timeout { - timeout: Delay::new(dur), - future: self, - } - } - - /// Creates a new future which will resolve no later than `at` specified. - /// - /// This method is otherwise equivalent to the `timeout` method except that - /// it tweaks the moment at when the timeout elapsed to being specified with - /// an absolute value rather than a relative one. For more documentation see - /// the `timeout` method. - fn timeout_at(self, at: Instant) -> Timeout - where - Self::Error: From, - { - Timeout { - timeout: Delay::new_at(at), - future: self, - } - } -} - -impl TryFutureExt for F {} - -/// Future returned by the `FutureExt::timeout` method. -#[derive(Debug)] -pub struct Timeout -where - F: TryFuture, - F::Error: From, -{ - future: F, - timeout: Delay, -} - -impl Timeout -where - F: TryFuture, - F::Error: From, -{ - unsafe_pinned!(future: F); - unsafe_pinned!(timeout: Delay); -} - -impl Future for Timeout -where - F: TryFuture, - F::Error: From, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().future().try_poll(cx) { - Poll::Pending => {} - other => return other, - } - - if self.timeout().poll(cx).is_ready() { - let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); - Poll::Ready(err) - } else { - Poll::Pending - } - } -} - -/// An extension trait for streams which provides convenient accessors for -/// timing out execution and such. -pub trait TryStreamExt: TryStream + Sized { - /// Creates a new stream which will take at most `dur` time to yield each - /// item of the stream. - /// - /// This combinator creates a new stream which wraps the receiving stream - /// in a timeout-per-item. The stream returned will resolve in at most - /// `dur` time for each item yielded from the stream. The first item's timer - /// starts when this method is called. - /// - /// If a stream's item completes before `dur` elapses then the timer will be - /// reset for the next item. If the timeout elapses, however, then an error - /// will be yielded on the stream and the timer will be reset. - fn timeout(self, dur: Duration) -> TimeoutStream - where - Self::Error: From, - { - TimeoutStream { - timeout: Delay::new(dur), - dur, - stream: self, - } - } -} - -impl TryStreamExt for S {} - -/// Stream returned by the `StreamExt::timeout` method. -#[derive(Debug)] -pub struct TimeoutStream -where - S: TryStream, - S::Error: From, -{ - timeout: Delay, - dur: Duration, - stream: S, -} - -impl TimeoutStream -where - S: TryStream, - S::Error: From, -{ - unsafe_pinned!(timeout: Delay); - unsafe_pinned!(stream: S); -} - -impl Stream for TimeoutStream -where - S: TryStream, - S::Error: From, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let dur = self.dur; - - let r = self.as_mut().stream().try_poll_next(cx); - match r { - Poll::Pending => {} - other => { - self.as_mut().timeout().reset(dur); - return other; - } - } - - if self.as_mut().timeout().poll(cx).is_ready() { - self.as_mut().timeout().reset(dur); - Poll::Ready(Some(Err(io::Error::new( - io::ErrorKind::TimedOut, - "stream item timed out", - ) - .into()))) - } else { - Poll::Pending - } - } -} diff --git a/src/timer/global.rs b/src/timer/global.rs deleted file mode 100644 index 6f70c5c..0000000 --- a/src/timer/global.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub(crate) use self::platform::*; - -#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] -#[path = "global/desktop.rs"] -mod platform; -#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] -#[path = "global/wasm.rs"] -mod platform; diff --git a/src/timer/global/desktop.rs b/src/timer/global/desktop.rs deleted file mode 100644 index 072daac..0000000 --- a/src/timer/global/desktop.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::future::Future; -use std::io; -use std::mem::{self, ManuallyDrop}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::task::{Context, RawWaker, RawWakerVTable, Waker}; -use std::thread; -use std::thread::Thread; -use std::time::Instant; - -use pin_utils::pin_mut; - -use crate::{Timer, TimerHandle}; - -pub struct HelperThread { - thread: Option>, - timer: TimerHandle, - done: Arc, -} - -impl HelperThread { - pub fn new() -> io::Result { - let timer = Timer::new(); - let timer_handle = timer.handle(); - let done = Arc::new(AtomicBool::new(false)); - let done2 = done.clone(); - let thread = thread::Builder::new().spawn(move || run(timer, done2))?; - - Ok(HelperThread { - thread: Some(thread), - done, - timer: timer_handle, - }) - } - - pub fn handle(&self) -> TimerHandle { - self.timer.clone() - } - - pub fn forget(mut self) { - self.thread.take(); - } -} - -impl Drop for HelperThread { - fn drop(&mut self) { - let thread = match self.thread.take() { - Some(thread) => thread, - None => return, - }; - self.done.store(true, Ordering::SeqCst); - thread.thread().unpark(); - drop(thread.join()); - } -} - -fn run(timer: Timer, done: Arc) { - let mut waker = current_thread_waker(); - let mut cx = Context::from_waker(&mut waker); - - pin_mut!(timer); - while !done.load(Ordering::SeqCst) { - drop(timer.as_mut().poll(&mut cx)); - - timer.advance(); - match timer.next_event() { - // Ok, block for the specified time - Some(when) => { - let now = Instant::now(); - if now < when { - thread::park_timeout(when - now) - } else { - // .. continue... - } - } - - // Just wait for one of our futures to wake up - None => thread::park(), - } - } -} - -static VTABLE: RawWakerVTable = RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); - -fn raw_clone(ptr: *const ()) -> RawWaker { - let me = ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) }); - mem::forget(me.clone()); - RawWaker::new(ptr, &VTABLE) -} - -fn raw_wake(ptr: *const ()) { - unsafe { Arc::from_raw(ptr as *const Thread) }.unpark() -} - -fn raw_wake_by_ref(ptr: *const ()) { - ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) }).unpark() -} - -fn raw_drop(ptr: *const ()) { - unsafe { Arc::from_raw(ptr as *const Thread) }; -} - -fn current_thread_waker() -> Waker { - let thread = Arc::new(thread::current()); - unsafe { Waker::from_raw(raw_clone(Arc::into_raw(thread) as *const ())) } -} diff --git a/src/timer/global/wasm.rs b/src/timer/global/wasm.rs deleted file mode 100644 index d04b0e2..0000000 --- a/src/timer/global/wasm.rs +++ /dev/null @@ -1,77 +0,0 @@ -use futures::task::{self, ArcWake}; -use parking_lot::Mutex; -use std::convert::TryFrom; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Context; -use std::time::Duration; -use wasm_bindgen::{JsCast, closure::Closure}; - -use crate::{Instant, Timer, TimerHandle}; - -/// Starts a background task, creates a `Timer`, and returns a handle to it. -/// -/// > **Note**: Contrary to the original `futures-timer` crate, we don't have -/// > any `forget()` method, as the task is automatically considered -/// > as "forgotten". -pub(crate) fn run() -> TimerHandle { - let timer = Timer::new(); - let handle = timer.handle(); - schedule_callback(Arc::new(Mutex::new(timer)), Duration::new(0, 0)); - handle -} - -/// Calls `Window::setTimeout` with the given `Duration`. The callback wakes up the timer and -/// processes everything. -fn schedule_callback(timer: Arc>, when: Duration) { - let window = web_sys::window().expect("Unable to access Window"); - let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0( - &Closure::once_into_js(move || { - let mut timer_lock = timer.lock(); - - // We start by polling the timer. If any new `Delay` is created, the waker will be used - // to wake up this task pre-emptively. As such, we pass a `Waker` that calls - // `schedule_callback` with a delay of `0`. - let waker = task::waker(Arc::new(Waker { timer: timer.clone() })); - let _ = Future::poll(Pin::new(&mut *timer_lock), &mut Context::from_waker(&waker)); - - // Notify the timers that are ready. - let now = Instant::now(); - timer_lock.advance_to(now); - - // Each call to `schedule_callback` calls `schedule_callback` again, but also leaves - // the possibility for `schedule_callback` to be called in parallel. Since we don't - // want too many useless callbacks, we... - // TODO: ugh, that's a hack - if Arc::strong_count(&timer) > 20 { - return; - } - - // We call `schedule_callback` again for the next event. - let sleep_dur = timer_lock.next_event() - .map(|next_event| { - if next_event > now { - next_event - now - } else { - Duration::new(0, 0) - } - }) - .unwrap_or(Duration::from_secs(5)); - drop(timer_lock); - schedule_callback(timer, sleep_dur); - - }).unchecked_ref(), - i32::try_from(when.as_millis()).unwrap_or(0) - ).unwrap(); -} - -struct Waker { - timer: Arc>, -} - -impl ArcWake for Waker { - fn wake_by_ref(arc_self: &Arc) { - schedule_callback(arc_self.timer.clone(), Duration::new(0, 0)); - } -} diff --git a/src/timer/heap.rs b/src/timer/heap.rs deleted file mode 100644 index f0f9882..0000000 --- a/src/timer/heap.rs +++ /dev/null @@ -1,350 +0,0 @@ -//! A simple binary heap with support for removal of arbitrary elements -//! -//! This heap is used to manage timer state in the event loop. All timeouts go -//! into this heap and we also cancel timeouts from this heap. The crucial -//! feature of this heap over the standard library's `BinaryHeap` is the ability -//! to remove arbitrary elements. (e.g. when a timer is canceled) -//! -//! Note that this heap is not at all optimized right now, it should hopefully -//! just work. - -use std::mem; - -pub struct Heap { - // Binary heap of items, plus the slab index indicating what position in the - // list they're in. - items: Vec<(T, usize)>, - - // A map from a slab index (assigned to an item above) to the actual index - // in the array the item appears at. - index: Vec>, - next_index: usize, -} - -enum SlabSlot { - Empty { next: usize }, - Full { value: T }, -} - -pub struct Slot { - idx: usize, -} - -impl Heap { - pub fn new() -> Heap { - Heap { - items: Vec::new(), - index: Vec::new(), - next_index: 0, - } - } - - /// Pushes an element onto this heap, returning a slot token indicating - /// where it was pushed on to. - /// - /// The slot can later get passed to `remove` to remove the element from the - /// heap, but only if the element was previously not removed from the heap. - pub fn push(&mut self, t: T) -> Slot { - self.assert_consistent(); - let len = self.items.len(); - let slot = SlabSlot::Full { value: len }; - let slot_idx = if self.next_index == self.index.len() { - self.next_index += 1; - self.index.push(slot); - self.index.len() - 1 - } else { - match mem::replace(&mut self.index[self.next_index], slot) { - SlabSlot::Empty { next } => mem::replace(&mut self.next_index, next), - SlabSlot::Full { .. } => panic!(), - } - }; - self.items.push((t, slot_idx)); - self.percolate_up(len); - self.assert_consistent(); - Slot { idx: slot_idx } - } - - pub fn peek(&self) -> Option<&T> { - self.assert_consistent(); - self.items.get(0).map(|i| &i.0) - } - - pub fn pop(&mut self) -> Option { - self.assert_consistent(); - if self.items.len() == 0 { - return None; - } - let slot = Slot { - idx: self.items[0].1, - }; - Some(self.remove(slot)) - } - - pub fn remove(&mut self, slot: Slot) -> T { - self.assert_consistent(); - let empty = SlabSlot::Empty { - next: self.next_index, - }; - let idx = match mem::replace(&mut self.index[slot.idx], empty) { - SlabSlot::Full { value } => value, - SlabSlot::Empty { .. } => panic!(), - }; - self.next_index = slot.idx; - let (item, slot_idx) = self.items.swap_remove(idx); - debug_assert_eq!(slot.idx, slot_idx); - if idx < self.items.len() { - set_index(&mut self.index, self.items[idx].1, idx); - if self.items[idx].0 < item { - self.percolate_up(idx); - } else { - self.percolate_down(idx); - } - } - self.assert_consistent(); - return item; - } - - fn percolate_up(&mut self, mut idx: usize) -> usize { - while idx > 0 { - let parent = (idx - 1) / 2; - if self.items[idx].0 >= self.items[parent].0 { - break; - } - let (a, b) = self.items.split_at_mut(idx); - mem::swap(&mut a[parent], &mut b[0]); - set_index(&mut self.index, a[parent].1, parent); - set_index(&mut self.index, b[0].1, idx); - idx = parent; - } - return idx; - } - - fn percolate_down(&mut self, mut idx: usize) -> usize { - loop { - let left = 2 * idx + 1; - let right = 2 * idx + 2; - - let mut swap_left = true; - match (self.items.get(left), self.items.get(right)) { - (Some(left), None) => { - if left.0 >= self.items[idx].0 { - break; - } - } - (Some(left), Some(right)) => { - if left.0 < self.items[idx].0 { - if right.0 < left.0 { - swap_left = false; - } - } else if right.0 < self.items[idx].0 { - swap_left = false; - } else { - break; - } - } - - (None, None) => break, - (None, Some(_right)) => panic!("not possible"), - } - - let (a, b) = if swap_left { - self.items.split_at_mut(left) - } else { - self.items.split_at_mut(right) - }; - mem::swap(&mut a[idx], &mut b[0]); - set_index(&mut self.index, a[idx].1, idx); - set_index(&mut self.index, b[0].1, a.len()); - idx = a.len(); - } - return idx; - } - - fn assert_consistent(&self) { - if !cfg!(assert_timer_heap_consistent) { - return; - } - - assert_eq!( - self.items.len(), - self.index - .iter() - .filter(|slot| { - match **slot { - SlabSlot::Full { .. } => true, - SlabSlot::Empty { .. } => false, - } - }) - .count() - ); - - for (i, &(_, j)) in self.items.iter().enumerate() { - let index = match self.index[j] { - SlabSlot::Full { value } => value, - SlabSlot::Empty { .. } => panic!(), - }; - if index != i { - panic!( - "self.index[j] != i : i={} j={} self.index[j]={}", - i, j, index - ); - } - } - - for (i, &(ref item, _)) in self.items.iter().enumerate() { - if i > 0 { - assert!(*item >= self.items[(i - 1) / 2].0, "bad at index: {}", i); - } - if let Some(left) = self.items.get(2 * i + 1) { - assert!(*item <= left.0, "bad left at index: {}", i); - } - if let Some(right) = self.items.get(2 * i + 2) { - assert!(*item <= right.0, "bad right at index: {}", i); - } - } - } -} - -fn set_index(slab: &mut Vec>, slab_slot: usize, val: T) { - match slab[slab_slot] { - SlabSlot::Full { ref mut value } => *value = val, - SlabSlot::Empty { .. } => panic!(), - } -} - -#[cfg(test)] -mod tests { - use super::Heap; - - #[test] - fn simple() { - let mut h = Heap::new(); - h.push(1); - h.push(2); - h.push(8); - h.push(4); - assert_eq!(h.pop(), Some(1)); - assert_eq!(h.pop(), Some(2)); - assert_eq!(h.pop(), Some(4)); - assert_eq!(h.pop(), Some(8)); - assert_eq!(h.pop(), None); - assert_eq!(h.pop(), None); - } - - #[test] - fn simple2() { - let mut h = Heap::new(); - h.push(5); - h.push(4); - h.push(3); - h.push(2); - h.push(1); - assert_eq!(h.pop(), Some(1)); - h.push(8); - assert_eq!(h.pop(), Some(2)); - h.push(1); - assert_eq!(h.pop(), Some(1)); - assert_eq!(h.pop(), Some(3)); - assert_eq!(h.pop(), Some(4)); - h.push(5); - assert_eq!(h.pop(), Some(5)); - assert_eq!(h.pop(), Some(5)); - assert_eq!(h.pop(), Some(8)); - } - - #[test] - fn remove() { - let mut h = Heap::new(); - h.push(5); - h.push(4); - h.push(3); - let two = h.push(2); - h.push(1); - assert_eq!(h.pop(), Some(1)); - assert_eq!(h.remove(two), 2); - h.push(1); - assert_eq!(h.pop(), Some(1)); - assert_eq!(h.pop(), Some(3)); - } - - fn vec2heap(v: Vec) -> Heap { - let mut h = Heap::new(); - for t in v { - h.push(t); - } - return h; - } - - #[test] - fn test_peek_and_pop() { - let data = vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]; - let mut sorted = data.clone(); - sorted.sort(); - let mut heap = vec2heap(data); - while heap.peek().is_some() { - assert_eq!(heap.peek().unwrap(), sorted.first().unwrap()); - assert_eq!(heap.pop().unwrap(), sorted.remove(0)); - } - } - - #[test] - fn test_push() { - let mut heap = Heap::new(); - heap.push(-2); - heap.push(-4); - heap.push(-9); - assert!(*heap.peek().unwrap() == -9); - heap.push(-11); - assert!(*heap.peek().unwrap() == -11); - heap.push(-5); - assert!(*heap.peek().unwrap() == -11); - heap.push(-27); - assert!(*heap.peek().unwrap() == -27); - heap.push(-3); - assert!(*heap.peek().unwrap() == -27); - heap.push(-103); - assert!(*heap.peek().unwrap() == -103); - } - - fn check_to_vec(mut data: Vec) { - let mut heap = Heap::new(); - for data in data.iter() { - heap.push(*data); - } - data.sort(); - let mut v = Vec::new(); - while let Some(i) = heap.pop() { - v.push(i); - } - assert_eq!(v, data); - } - - #[test] - fn test_to_vec() { - check_to_vec(vec![]); - check_to_vec(vec![5]); - check_to_vec(vec![3, 2]); - check_to_vec(vec![2, 3]); - check_to_vec(vec![5, 1, 2]); - check_to_vec(vec![1, 100, 2, 3]); - check_to_vec(vec![1, 3, 5, 7, 9, 2, 4, 6, 8, 0]); - check_to_vec(vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]); - check_to_vec(vec![9, 11, 9, 9, 9, 9, 11, 2, 3, 4, 11, 9, 0, 0, 0, 0]); - check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); - check_to_vec(vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]); - check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 1, 2]); - check_to_vec(vec![5, 4, 3, 2, 1, 5, 4, 3, 2, 1, 5, 4, 3, 2, 1]); - } - - #[test] - fn test_empty_pop() { - let mut heap = Heap::::new(); - assert!(heap.pop().is_none()); - } - - #[test] - fn test_empty_peek() { - let empty = Heap::::new(); - assert!(empty.peek().is_none()); - } -} diff --git a/src/timer/interval.rs b/src/timer/interval.rs deleted file mode 100644 index de201a1..0000000 --- a/src/timer/interval.rs +++ /dev/null @@ -1,191 +0,0 @@ -use pin_utils::unsafe_pinned; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use futures::prelude::*; - -use crate::timer::delay; -use crate::{Delay, Instant, TimerHandle}; - -/// A stream representing notifications at fixed interval -/// -/// Intervals are created through the `Interval::new` or -/// `Interval::new_at` methods indicating when a first notification -/// should be triggered and when it will be repeated. -/// -/// Note that intervals are not intended for high resolution timers, but rather -/// they will likely fire some granularity after the exact instant that they're -/// otherwise indicated to fire at. -#[derive(Debug)] -pub struct Interval { - delay: Delay, - interval: Duration, -} - -impl Interval { - unsafe_pinned!(delay: Delay); - - /// Creates a new interval which will fire at `dur` time into the future, - /// and will repeat every `dur` interval after - /// - /// The returned object will be bound to the default timer for this thread. - /// The default timer will be spun up in a helper thread on first use. - pub fn new(dur: Duration) -> Interval { - Interval::new_at(Instant::now() + dur, dur) - } - - /// Creates a new interval which will fire at the time specified by `at`, - /// and then will repeat every `dur` interval after - /// - /// The returned object will be bound to the default timer for this thread. - /// The default timer will be spun up in a helper thread on first use. - pub fn new_at(at: Instant, dur: Duration) -> Interval { - Interval { - delay: Delay::new_at(at), - interval: dur, - } - } - - /// Creates a new interval which will fire at the time specified by `at`, - /// and then will repeat every `dur` interval after - /// - /// The returned object will be bound to the timer specified by `handle`. - pub fn new_handle(at: Instant, dur: Duration, handle: TimerHandle) -> Interval { - Interval { - delay: Delay::new_handle(at, handle), - interval: dur, - } - } -} - -impl Stream for Interval { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if Pin::new(&mut *self).delay().poll(cx).is_pending() { - return Poll::Pending; - } - let next = next_interval(delay::fires_at(&self.delay), Instant::now(), self.interval); - self.delay.reset_at(next); - Poll::Ready(Some(())) - } -} - -/// Converts Duration object to raw nanoseconds if possible -/// -/// This is useful to divide intervals. -/// -/// While technically for large duration it's impossible to represent any -/// duration as nanoseconds, the largest duration we can represent is about -/// 427_000 years. Large enough for any interval we would use or calculate in -/// tokio. -fn duration_to_nanos(dur: Duration) -> Option { - dur.as_secs() - .checked_mul(1_000_000_000) - .and_then(|v| v.checked_add(dur.subsec_nanos() as u64)) -} - -fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { - let new = prev + interval; - if new > now { - return new; - } else { - let spent_ns = - duration_to_nanos(now.duration_since(prev)).expect("interval should be expired"); - let interval_ns = - duration_to_nanos(interval).expect("interval is less that 427 thousand years"); - let mult = spent_ns / interval_ns + 1; - assert!( - mult < (1 << 32), - "can't skip more than 4 billion intervals of {:?} \ - (trying to skip {})", - interval, - mult - ); - return prev + interval * (mult as u32); - } -} - -#[cfg(test)] -mod test { - use super::next_interval; - use std::time::{Duration, Instant}; - - struct Timeline(Instant); - - impl Timeline { - fn new() -> Timeline { - Timeline(Instant::now()) - } - fn at(&self, millis: u64) -> Instant { - self.0 + Duration::from_millis(millis) - } - fn at_ns(&self, sec: u64, nanos: u32) -> Instant { - self.0 + Duration::new(sec, nanos) - } - } - - fn dur(millis: u64) -> Duration { - Duration::from_millis(millis) - } - - // The math around Instant/Duration isn't 100% precise due to rounding - // errors, see #249 for more info - fn almost_eq(a: Instant, b: Instant) -> bool { - if a == b { - true - } else if a > b { - a - b < Duration::from_millis(1) - } else { - b - a < Duration::from_millis(1) - } - } - - #[test] - fn norm_next() { - let tm = Timeline::new(); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(2), dur(10)), - tm.at(11) - )); - assert!(almost_eq( - next_interval(tm.at(7777), tm.at(7788), dur(100)), - tm.at(7877) - )); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(1000), dur(2100)), - tm.at(2101) - )); - } - - #[test] - fn fast_forward() { - let tm = Timeline::new(); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(1000), dur(10)), - tm.at(1001) - )); - assert!(almost_eq( - next_interval(tm.at(7777), tm.at(8888), dur(100)), - tm.at(8977) - )); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(10000), dur(2100)), - tm.at(10501) - )); - } - - /// TODO: this test actually should be successful, but since we can't - /// multiply Duration on anything larger than u32 easily we decided - /// to allow it to fail for now - #[test] - #[should_panic(expected = "can't skip more than 4 billion intervals")] - fn large_skip() { - let tm = Timeline::new(); - assert_eq!( - next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), - tm.at_ns(25, 1) - ); - } -}