Skip to content

Commit c807efd

Browse files
committed
split lib.rs
Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
1 parent bddfff3 commit c807efd

File tree

3 files changed

+357
-348
lines changed

3 files changed

+357
-348
lines changed

src/heap_timer.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::cmp::Ordering;
2+
use std::sync::Arc;
3+
use std::time::Instant;
4+
5+
use crate::{Node, ScheduledTimer};
6+
7+
/// Entries in the timer heap, sorted by the instant they're firing at and then
8+
/// also containing some payload data.
9+
pub(crate) struct HeapTimer {
10+
pub(crate) at: Instant,
11+
pub(crate) gen: usize,
12+
pub(crate) node: Arc<Node<ScheduledTimer>>,
13+
}
14+
15+
impl PartialEq for HeapTimer {
16+
fn eq(&self, other: &HeapTimer) -> bool {
17+
self.at == other.at
18+
}
19+
}
20+
21+
impl Eq for HeapTimer {}
22+
23+
impl PartialOrd for HeapTimer {
24+
fn partial_cmp(&self, other: &HeapTimer) -> Option<Ordering> {
25+
Some(self.cmp(other))
26+
}
27+
}
28+
29+
impl Ord for HeapTimer {
30+
fn cmp(&self, other: &HeapTimer) -> Ordering {
31+
self.at.cmp(&other.at)
32+
}
33+
}

src/lib.rs

Lines changed: 4 additions & 348 deletions
Original file line numberDiff line numberDiff line change
@@ -46,360 +46,16 @@
4646
#![deny(missing_docs)]
4747
#![warn(missing_debug_implementations)]
4848

49-
use std::cmp::Ordering;
50-
use std::fmt;
51-
use std::mem;
52-
use std::pin::Pin;
53-
use std::sync::atomic::AtomicUsize;
54-
use std::sync::atomic::Ordering::SeqCst;
55-
use std::sync::{Arc, Mutex, Weak};
56-
use std::task::{Context, Poll};
57-
use std::time::Instant;
58-
59-
use futures_core::future::Future;
60-
use futures_util::task::AtomicWaker;
61-
6249
use arc_list::{ArcList, Node};
6350
use heap::{Heap, Slot};
51+
use heap_timer::HeapTimer;
52+
use timer::{Timer, TimerHandle, ScheduledTimer};
6453

54+
mod heap_timer;
6555
mod arc_list;
6656
mod global;
6757
mod heap;
68-
69-
/// A "timer heap" used to power separately owned instances of `Delay`.
70-
///
71-
/// This timer is implemented as a priority queued-based heap. Each `Timer`
72-
/// contains a few primary methods which which to drive it:
73-
///
74-
/// * `next_wake` indicates how long the ambient system needs to sleep until it
75-
/// invokes further processing on a `Timer`
76-
/// * `advance_to` is what actually fires timers on the `Timer`, and should be
77-
/// called essentially every iteration of the event loop, or when the time
78-
/// specified by `next_wake` has elapsed.
79-
/// * The `Future` implementation for `Timer` is used to process incoming timer
80-
/// updates and requests. This is used to schedule new timeouts, update
81-
/// existing ones, or delete existing timeouts. The `Future` implementation
82-
/// will never resolve, but it'll schedule notifications of when to wake up
83-
/// and process more messages.
84-
///
85-
/// Note that if you're using this crate you probably don't need to use a
86-
/// `Timer` as there is a global one already available for you run on a helper
87-
/// thread. If this isn't desirable, though, then the
88-
/// `TimerHandle::set_fallback` method can be used instead!
89-
pub struct Timer {
90-
inner: Arc<Inner>,
91-
timer_heap: Heap<HeapTimer>,
92-
}
93-
94-
/// A handle to a `Timer` which is used to create instances of a `Delay`.
95-
#[derive(Clone)]
96-
pub struct TimerHandle {
97-
inner: Weak<Inner>,
98-
}
58+
mod timer;
9959

10060
mod delay;
10161
pub use self::delay::Delay;
102-
103-
struct Inner {
104-
/// List of updates the `Timer` needs to process
105-
list: ArcList<ScheduledTimer>,
106-
107-
/// The blocked `Timer` task to receive notifications to the `list` above.
108-
waker: AtomicWaker,
109-
}
110-
111-
/// Shared state between the `Timer` and a `Delay`.
112-
struct ScheduledTimer {
113-
waker: AtomicWaker,
114-
115-
// The lowest bit here is whether the timer has fired or not, the second
116-
// lowest bit is whether the timer has been invalidated, and all the other
117-
// bits are the "generation" of the timer which is reset during the `reset`
118-
// function. Only timers for a matching generation are fired.
119-
state: AtomicUsize,
120-
121-
inner: Weak<Inner>,
122-
at: Mutex<Option<Instant>>,
123-
124-
// TODO: this is only accessed by the timer thread, should have a more
125-
// lightweight protection than a `Mutex`
126-
slot: Mutex<Option<Slot>>,
127-
}
128-
129-
/// Entries in the timer heap, sorted by the instant they're firing at and then
130-
/// also containing some payload data.
131-
struct HeapTimer {
132-
at: Instant,
133-
gen: usize,
134-
node: Arc<Node<ScheduledTimer>>,
135-
}
136-
137-
impl Timer {
138-
/// Creates a new timer heap ready to create new timers.
139-
pub fn new() -> Timer {
140-
Timer {
141-
inner: Arc::new(Inner {
142-
list: ArcList::new(),
143-
waker: AtomicWaker::new(),
144-
}),
145-
timer_heap: Heap::new(),
146-
}
147-
}
148-
149-
/// Returns a handle to this timer heap, used to create new timeouts.
150-
pub fn handle(&self) -> TimerHandle {
151-
TimerHandle {
152-
inner: Arc::downgrade(&self.inner),
153-
}
154-
}
155-
156-
/// Returns the time at which this timer next needs to be invoked with
157-
/// `advance_to`.
158-
///
159-
/// Event loops or threads typically want to sleep until the specified
160-
/// instant.
161-
pub fn next_event(&self) -> Option<Instant> {
162-
self.timer_heap.peek().map(|t| t.at)
163-
}
164-
165-
/// Proces any timers which are supposed to fire at or before the current
166-
/// instant.
167-
///
168-
/// This method is equivalent to `self.advance_to(Instant::now())`.
169-
pub fn advance(&mut self) {
170-
self.advance_to(Instant::now())
171-
}
172-
173-
/// Proces any timers which are supposed to fire before `now` specified.
174-
///
175-
/// This method should be called on `Timer` periodically to advance the
176-
/// internal state and process any pending timers which need to fire.
177-
pub fn advance_to(&mut self, now: Instant) {
178-
loop {
179-
match self.timer_heap.peek() {
180-
Some(head) if head.at <= now => {}
181-
Some(_) => break,
182-
None => break,
183-
};
184-
185-
// Flag the timer as fired and then notify its task, if any, that's
186-
// blocked.
187-
let heap_timer = self.timer_heap.pop().unwrap();
188-
*heap_timer.node.slot.lock().unwrap() = None;
189-
let bits = heap_timer.gen << 2;
190-
match heap_timer
191-
.node
192-
.state
193-
.compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
194-
{
195-
Ok(_) => heap_timer.node.waker.wake(),
196-
Err(_b) => {}
197-
}
198-
}
199-
}
200-
201-
/// Either updates the timer at slot `idx` to fire at `at`, or adds a new
202-
/// timer at `idx` and sets it to fire at `at`.
203-
fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
204-
// TODO: avoid remove + push and instead just do one sift of the heap?
205-
// In theory we could update it in place and then do the percolation
206-
// as necessary
207-
let gen = node.state.load(SeqCst) >> 2;
208-
let mut slot = node.slot.lock().unwrap();
209-
if let Some(heap_slot) = slot.take() {
210-
self.timer_heap.remove(heap_slot);
211-
}
212-
*slot = Some(self.timer_heap.push(HeapTimer {
213-
at,
214-
gen,
215-
node: node.clone(),
216-
}));
217-
}
218-
219-
fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
220-
// If this `idx` is still around and it's still got a registered timer,
221-
// then we jettison it form the timer heap.
222-
let mut slot = node.slot.lock().unwrap();
223-
let heap_slot = match slot.take() {
224-
Some(slot) => slot,
225-
None => return,
226-
};
227-
self.timer_heap.remove(heap_slot);
228-
}
229-
230-
fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
231-
node.state.fetch_or(0b10, SeqCst);
232-
node.waker.wake();
233-
}
234-
}
235-
236-
impl Future for Timer {
237-
type Output = ();
238-
239-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
240-
Pin::new(&mut self.inner).waker.register(cx.waker());
241-
let mut list = self.inner.list.take();
242-
while let Some(node) = list.pop() {
243-
let at = *node.at.lock().unwrap();
244-
match at {
245-
Some(at) => self.update_or_add(at, node),
246-
None => self.remove(node),
247-
}
248-
}
249-
Poll::Pending
250-
}
251-
}
252-
253-
impl Drop for Timer {
254-
fn drop(&mut self) {
255-
// Seal off our list to prevent any more updates from getting pushed on.
256-
// Any timer which sees an error from the push will immediately become
257-
// inert.
258-
let mut list = self.inner.list.take_and_seal();
259-
260-
// Now that we'll never receive another timer, drain the list of all
261-
// updates and also drain our heap of all active timers, invalidating
262-
// everything.
263-
while let Some(t) = list.pop() {
264-
self.invalidate(t);
265-
}
266-
while let Some(t) = self.timer_heap.pop() {
267-
self.invalidate(t.node);
268-
}
269-
}
270-
}
271-
272-
impl fmt::Debug for Timer {
273-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
274-
f.debug_struct("Timer").field("heap", &"...").finish()
275-
}
276-
}
277-
278-
impl Default for Timer {
279-
fn default() -> Self {
280-
Self::new()
281-
}
282-
}
283-
284-
impl PartialEq for HeapTimer {
285-
fn eq(&self, other: &HeapTimer) -> bool {
286-
self.at == other.at
287-
}
288-
}
289-
290-
impl Eq for HeapTimer {}
291-
292-
impl PartialOrd for HeapTimer {
293-
fn partial_cmp(&self, other: &HeapTimer) -> Option<Ordering> {
294-
Some(self.cmp(other))
295-
}
296-
}
297-
298-
impl Ord for HeapTimer {
299-
fn cmp(&self, other: &HeapTimer) -> Ordering {
300-
self.at.cmp(&other.at)
301-
}
302-
}
303-
304-
static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
305-
306-
/// Error returned from `TimerHandle::set_fallback`.
307-
#[derive(Clone, Debug)]
308-
pub struct SetDefaultError(());
309-
310-
impl TimerHandle {
311-
/// Configures this timer handle to be the one returned by
312-
/// `TimerHandle::default`.
313-
///
314-
/// By default a global thread is initialized on the first call to
315-
/// `TimerHandle::default`. This first call can happen transitively through
316-
/// `Delay::new`. If, however, that hasn't happened yet then the global
317-
/// default timer handle can be configured through this method.
318-
///
319-
/// This method can be used to prevent the global helper thread from
320-
/// spawning. If this method is successful then the global helper thread
321-
/// will never get spun up.
322-
///
323-
/// On success this timer handle will have installed itself globally to be
324-
/// used as the return value for `TimerHandle::default` unless otherwise
325-
/// specified.
326-
///
327-
/// # Errors
328-
///
329-
/// If another thread has already called `set_as_global_fallback` or this
330-
/// thread otherwise loses a race to call this method then it will fail
331-
/// returning an error. Once a call to `set_as_global_fallback` is
332-
/// successful then no future calls may succeed.
333-
pub fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
334-
unsafe {
335-
let val = self.into_usize();
336-
match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
337-
Ok(_) => Ok(()),
338-
Err(_) => {
339-
drop(TimerHandle::from_usize(val));
340-
Err(SetDefaultError(()))
341-
}
342-
}
343-
}
344-
}
345-
346-
fn into_usize(self) -> usize {
347-
unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
348-
}
349-
350-
unsafe fn from_usize(val: usize) -> TimerHandle {
351-
let inner = mem::transmute::<usize, Weak<Inner>>(val);
352-
TimerHandle { inner }
353-
}
354-
}
355-
356-
impl Default for TimerHandle {
357-
fn default() -> TimerHandle {
358-
let mut fallback = HANDLE_FALLBACK.load(SeqCst);
359-
360-
// If the fallback hasn't been previously initialized then let's spin
361-
// up a helper thread and try to initialize with that. If we can't
362-
// actually create a helper thread then we'll just return a "defunkt"
363-
// handle which will return errors when timer objects are attempted to
364-
// be associated.
365-
if fallback == 0 {
366-
let helper = match global::HelperThread::new() {
367-
Ok(helper) => helper,
368-
Err(_) => return TimerHandle { inner: Weak::new() },
369-
};
370-
371-
// If we successfully set ourselves as the actual fallback then we
372-
// want to `forget` the helper thread to ensure that it persists
373-
// globally. If we fail to set ourselves as the fallback that means
374-
// that someone was racing with this call to
375-
// `TimerHandle::default`. They ended up winning so we'll destroy
376-
// our helper thread (which shuts down the thread) and reload the
377-
// fallback.
378-
if helper.handle().set_as_global_fallback().is_ok() {
379-
let ret = helper.handle();
380-
helper.forget();
381-
return ret;
382-
}
383-
fallback = HANDLE_FALLBACK.load(SeqCst);
384-
}
385-
386-
// At this point our fallback handle global was configured so we use
387-
// its value to reify a handle, clone it, and then forget our reified
388-
// handle as we don't actually have an owning reference to it.
389-
assert!(fallback != 0);
390-
unsafe {
391-
let handle = TimerHandle::from_usize(fallback);
392-
let ret = handle.clone();
393-
let _ = handle.into_usize();
394-
ret
395-
}
396-
}
397-
}
398-
399-
impl fmt::Debug for TimerHandle {
400-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
401-
f.debug_struct("TimerHandle")
402-
.field("inner", &"...")
403-
.finish()
404-
}
405-
}

0 commit comments

Comments
 (0)