diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index bde0e3ce91..6751069d75 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -30,3 +30,13 @@ pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder}; mod enter; #[cfg(feature = "std")] pub use crate::enter::{enter, Enter, EnterError}; + +#[cfg(feature = "std")] +mod park; +#[cfg(feature = "std")] +pub use crate::park::{Park, ParkDuration}; + +#[cfg(feature = "std")] +mod park_thread; +#[cfg(feature = "std")] +pub use crate::park_thread::{ParkThread}; diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 9985dca5b9..66a6145c5a 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -1,16 +1,59 @@ use crate::enter; +use crate::park::{Park, ParkDuration}; +use crate::park_thread::ParkThread; use futures_core::future::{Future, FutureObj, LocalFutureObj}; use futures_core::stream::{Stream}; use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError}; -use futures_util::task::{waker_ref, ArcWake}; use futures_util::stream::FuturesUnordered; use futures_util::stream::StreamExt; use futures_util::pin_mut; use std::cell::{RefCell}; use std::ops::{Deref, DerefMut}; use std::rc::{Rc, Weak}; -use std::sync::Arc; -use std::thread::{self, Thread}; + +// state outside park +#[derive(Debug)] +struct State { + pool: FuturesUnordered>, + incoming: Rc, +} + +impl State { + // Make maximal progress on the entire pool of spawned task, returning `Ready` + // if the pool is empty and `Pending` if no further progress can be made. + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // state for the FuturesUnordered, which will never be used + loop { + let ret = self.poll_pool_once(cx); + + // we queued up some new tasks; add them and poll again + if !self.incoming.borrow().is_empty() { + continue; + } + + // no queued tasks; we may be done + match ret { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + _ => {} + } + } + } + + // Try make minimal progress on the pool of spawned tasks + fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll> { + // empty the incoming queue of newly-spawned tasks + { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) + } + } + + // try to execute the next ready future + self.pool.poll_next_unpin(cx) + } +} /// A single-threaded task pool for polling futures to completion. /// @@ -24,9 +67,9 @@ use std::thread::{self, Thread}; /// single-threaded, it supports a special form of task spawning for non-`Send` /// futures, via [`spawn_local_obj`](futures_core::task::LocalSpawn::spawn_local_obj). #[derive(Debug)] -pub struct LocalPool { - pool: FuturesUnordered>, - incoming: Rc, +pub struct LocalPool

{ + park: P, + state: State, } /// A handle to a [`LocalPool`](LocalPool) that implements @@ -38,66 +81,44 @@ pub struct LocalSpawner { type Incoming = RefCell>>; -pub(crate) struct ThreadNotify { - thread: Thread -} - -thread_local! { - static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { - thread: thread::current(), - }); -} - -impl ArcWake for ThreadNotify { - fn wake_by_ref(arc_self: &Arc) { - arc_self.thread.unpark(); +impl LocalPool { + /// Create a new, empty pool of tasks. + pub fn new() -> Self + { + Self::new_with_park(ParkThread::default()) } } -// Set up and run a basic single-threaded spawner loop, invoking `f` on each -// turn. -fn run_executor) -> Poll>(mut f: F) -> T { - let _enter = enter() - .expect("cannot execute `LocalPool` executor from within \ - another executor"); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - loop { - if let Poll::Ready(t) = f(&mut cx) { - return t; - } - thread::park(); - } - }) -} - -fn poll_executor) -> T>(mut f: F) -> T { - let _enter = enter() - .expect("cannot execute `LocalPool` executor from within \ - another executor"); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - f(&mut cx) - }) -} - -impl LocalPool { +impl

LocalPool

+where + P: Park, + P::Error: std::fmt::Debug, +{ /// Create a new, empty pool of tasks. - pub fn new() -> LocalPool { + pub fn new_with_park(park: P) -> Self { LocalPool { - pool: FuturesUnordered::new(), - incoming: Default::default(), + park, + state: State { + pool: FuturesUnordered::new(), + incoming: Default::default(), + }, } } + /// Returns a reference to the underlying Park instance. + pub fn get_park(&self) -> &P { + &self.park + } + + /// Returns a mutable reference to the underlying Park instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.park + } + /// Get a clonable handle to the pool as a [`Spawn`]. pub fn spawner(&self) -> LocalSpawner { LocalSpawner { - incoming: Rc::downgrade(&self.incoming) + incoming: Rc::downgrade(&self.state.incoming) } } @@ -121,7 +142,7 @@ impl LocalPool { /// The function will block the calling thread until *all* tasks in the pool /// are complete, including any spawned while running existing tasks. pub fn run(&mut self) { - run_executor(|cx| self.poll_pool(cx)) + self.run_executor(|state, cx| state.poll_pool(cx)) } /// Runs all the tasks in the pool until the given future completes. @@ -149,7 +170,7 @@ impl LocalPool { pub fn run_until(&mut self, future: F) -> F::Output { pin_mut!(future); - run_executor(|cx| { + self.run_executor(|state, cx| { { // if our main task is done, so are we let result = future.as_mut().poll(cx); @@ -158,7 +179,7 @@ impl LocalPool { } } - let _ = self.poll_pool(cx); + let _ = state.poll_pool(cx); Poll::Pending }) } @@ -192,15 +213,35 @@ impl LocalPool { /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { - poll_executor(|ctx| { - let ret = self.poll_pool_once(ctx); + let mut enter = enter() + .expect("cannot execute `LocalPool` executor from within \ + another executor"); + + { + // first round + let waker = self.park.waker(); + let mut cx = Context::from_waker(&waker); + + let result = self.state.poll_pool_once(&mut cx); // return if we really have executed a future - match ret { - Poll::Ready(Some(_)) => true, - _ => false + if let Poll::Ready(Some(_)) = result { + return true; } - }) + } + + self.park.park(&mut enter, ParkDuration::Poll).expect("park failed"); + + let waker = self.park.waker(); + let mut cx = Context::from_waker(&waker); + + let result = self.state.poll_pool_once(&mut cx); + + // return whether we really have executed a future + match result { + Poll::Ready(Some(_)) => true, + _ => false + } } /// Runs all tasks in the pool and returns if no more progress can be made @@ -229,58 +270,61 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { - loop { - let result = self.poll_pool_once(ctx); - - // if there are no more ready futures exit - match result { - Poll::Pending | Poll::Ready(None) => return, - _ => continue - } - } - }) - } + let mut enter = enter() + .expect("cannot execute `LocalPool` executor from within \ + another executor"); + + { + // first round + let waker = self.park.waker(); + let mut cx = Context::from_waker(&waker); + + // ignore first result, we need to run park anyway and try again + let _ = self.state.poll_pool_once(&mut cx); + } - // Make maximal progress on the entire pool of spawned task, returning `Ready` - // if the pool is empty and `Pending` if no further progress can be made. - fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // state for the FuturesUnordered, which will never be used loop { - let ret = self.poll_pool_once(cx); + self.park.park(&mut enter, ParkDuration::Poll).expect("park failed"); - // we queued up some new tasks; add them and poll again - if !self.incoming.borrow().is_empty() { - continue; - } + let waker = self.park.waker(); + let mut cx = Context::from_waker(&waker); - // no queued tasks; we may be done - match ret { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(()), - _ => {} + let result = self.state.poll_pool_once(&mut cx); + + // if there are no more ready futures exit + match result { + Poll::Pending | Poll::Ready(None) => return, + _ => continue } } } - // Try make minimal progress on the pool of spawned tasks - fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll> { - // empty the incoming queue of newly-spawned tasks - { - let mut incoming = self.incoming.borrow_mut(); - for task in incoming.drain(..) { - self.pool.push(task) + // Set up and run a basic single-threaded spawner loop, invoking `f` on each + // turn. + fn run_executor) -> Poll>(&mut self, mut f: F) -> T { + let mut enter = enter() + .expect("cannot execute `LocalPool` executor from within \ + another executor"); + + loop { + let waker = self.park.waker(); + let mut cx = Context::from_waker(&waker); + + if let Poll::Ready(t) = f(&mut self.state, &mut cx) { + return t; } + self.park.park(&mut enter, ParkDuration::Block).expect("park failed"); } - - // try to execute the next ready future - self.pool.poll_next_unpin(cx) } } -impl Default for LocalPool { +impl

Default for LocalPool

+where + P: Default + Park, + P::Error: std::fmt::Debug, +{ fn default() -> Self { - Self::new() + Self::new_with_park(P::default()) } } @@ -292,7 +336,21 @@ impl Default for LocalPool { /// spawned tasks. pub fn block_on(f: F) -> F::Output { pin_mut!(f); - run_executor(|cx| f.as_mut().poll(cx)) + + let mut enter = enter() + .expect("cannot execute `block_on` executor from within \ + another executor"); + + let mut park = ParkThread::new(); + let waker = park.waker().clone(); + let mut cx = Context::from_waker(&waker); + + loop { + if let Poll::Ready(t) = f.as_mut().poll(&mut cx) { + return t; + } + park.park(&mut enter, ParkDuration::Block).unwrap_or_else(|i| match i {}); + } } /// Turn a stream into a blocking iterator. @@ -331,7 +389,7 @@ impl Iterator for BlockingStream { type Item = S::Item; fn next(&mut self) -> Option { - LocalPool::new().run_until(self.stream.next()) + LocalPool::::new().run_until(self.stream.next()) } fn size_hint(&self) -> (usize, Option) { diff --git a/futures-executor/src/park.rs b/futures-executor/src/park.rs new file mode 100644 index 0000000000..48299ad8d6 --- /dev/null +++ b/futures-executor/src/park.rs @@ -0,0 +1,86 @@ +use crate::enter::Enter; +use std::time::Duration; +use futures_util::task::WakerRef; + +/// Determines how long `park` will block +#[derive(Clone, Copy, Debug)] +pub enum ParkDuration { + /// Don't block at all + Poll, + /// Block until explicit (possibly spurious) unpark + Block, + /// Block at most for given Duration; might get rounded up to some + /// minimum "sleepable" value by `Park` implementation if timeout is + /// too small (including zero). + Timeout(Duration), +} + +impl ParkDuration { + /// Create a new duration specification which doesn't block longer + /// than the passed limit. + pub fn limit(self, max_duration: Duration) -> Self { + match self { + ParkDuration::Poll => ParkDuration::Poll, + ParkDuration::Block => ParkDuration::Timeout(max_duration), + ParkDuration::Timeout(d) => ParkDuration::Timeout(std::cmp::min(d, max_duration)), + } + } +} + +/// Convert zero duration to `Poll`; other durations are wrapped in +/// `Timeout(..)`. +impl From for ParkDuration { + fn from(duration: Duration) -> Self { + if duration == Duration::from_secs(0) { + ParkDuration::Poll + } else { + ParkDuration::Timeout(duration) + } + } +} + +/// Convert `None` to `Block`, zero durations to `Poll` and other +/// durations are wrapped in `Timeout(..)`. +impl From> for ParkDuration { + fn from(duration: Option) -> Self { + match duration { + Some(duration) => ParkDuration::from(duration), + None => ParkDuration::Block, + } + } +} + +/// Block the current thread. +pub trait Park { + /// Error returned by `park` + type Error; + + /// Get a new `Waker` associated with this `Park` instance. + fn waker(&self) -> WakerRef<'_>; + + /// Block the current thread unless or until the token is available; + /// `duration` determines for how long. + /// + /// A call to `park` does not guarantee that the thread will remain + /// blocked forever, and callers should be prepared for this + /// possibility. This function may wakeup spuriously for any reason. + /// + /// # Panics + /// + /// This function **should** not panic, but ultimately, panics are + /// left as an implementation detail. Refer to the documentation for + /// the specific `Park` implementation + fn park(&mut self, enter: &mut Enter, duration: ParkDuration) -> Result<(), Self::Error>; +} + +impl Park for &'_ mut P { + type Error = P::Error; + + fn waker(&self) -> WakerRef<'_> { + (**self).waker() + } + + fn park(&mut self, enter: &mut Enter, duration: ParkDuration) -> Result<(), Self::Error> { + (**self).park(enter, duration) + } +} diff --git a/futures-executor/src/park_thread.rs b/futures-executor/src/park_thread.rs new file mode 100644 index 0000000000..add679615a --- /dev/null +++ b/futures-executor/src/park_thread.rs @@ -0,0 +1,67 @@ +use crate::enter::Enter; +use crate::park::{Park, ParkDuration}; +use std::sync::Arc; +use std::thread; +use futures_util::task::{waker_ref, ArcWake, WakerRef}; + +/// Implements [`Park`][p] using [`thread::park`] to put the thread to +/// sleep. +/// +/// [`thread::park`]: https://doc.rust-lang.org/std/thread/fn.park.html +/// [p]: ../park/trait.Park.html +#[derive(Debug)] +pub struct ParkThread { + // store a copy of TLS data here to `waker()` below can return a + // reference to it + notify: Arc +} + +impl ParkThread { + /// Create new `ParkThread` instance. + pub fn new() -> Self { + ParkThread { + notify: CURRENT_THREAD_NOTIFY.with(Arc::clone), + } + } +} + +impl Default for ParkThread { + fn default() -> Self { + ParkThread::new() + } +} + +impl Park for ParkThread { + type Error = std::convert::Infallible; + + fn waker(&self) -> WakerRef<'_> { + waker_ref(&self.notify) + } + + fn park(&mut self, _enter: &mut Enter, duration: ParkDuration) -> Result<(), Self::Error> { + match duration { + ParkDuration::Poll => (), + ParkDuration::Block => thread::park(), + ParkDuration::Timeout(duration) => thread::park_timeout(duration), + } + Ok(()) + } +} + +thread_local! { + // allocate only once per thread + static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { + thread: thread::current(), + }); +} + +#[derive(Debug)] +struct ThreadNotify { + thread: thread::Thread, +} + +impl ArcWake for ThreadNotify { + fn wake_by_ref(arc_self: &Arc) { + arc_self.thread.unpark(); + } +}