Skip to content

Park trait, ParkThread implementation and using it in LocalPool #1668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions futures-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,13 @@ pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder};
mod enter;
#[cfg(feature = "std")]
pub use crate::enter::{enter, Enter, EnterError};

#[cfg(feature = "std")]
mod park;
#[cfg(feature = "std")]
pub use crate::park::{Park, ParkDuration};

#[cfg(feature = "std")]
mod park_thread;
#[cfg(feature = "std")]
pub use crate::park_thread::{ParkThread};
266 changes: 162 additions & 104 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,59 @@
use crate::enter;
use crate::park::{Park, ParkDuration};
use crate::park_thread::ParkThread;
use futures_core::future::{Future, FutureObj, LocalFutureObj};
use futures_core::stream::{Stream};
use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError};
use futures_util::task::{waker_ref, ArcWake};
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use futures_util::pin_mut;
use std::cell::{RefCell};
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::thread::{self, Thread};

// state outside park
#[derive(Debug)]
struct State {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>,
}

impl State {
// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// state for the FuturesUnordered, which will never be used
loop {
let ret = self.poll_pool_once(cx);

// we queued up some new tasks; add them and poll again
if !self.incoming.borrow().is_empty() {
continue;
}

// no queued tasks; we may be done
match ret {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
}
}
}

// Try make minimal progress on the pool of spawned tasks
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
// empty the incoming queue of newly-spawned tasks
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}
}

// try to execute the next ready future
self.pool.poll_next_unpin(cx)
}
}

/// A single-threaded task pool for polling futures to completion.
///
Expand All @@ -24,9 +67,9 @@ use std::thread::{self, Thread};
/// single-threaded, it supports a special form of task spawning for non-`Send`
/// futures, via [`spawn_local_obj`](futures_core::task::LocalSpawn::spawn_local_obj).
#[derive(Debug)]
pub struct LocalPool {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>,
pub struct LocalPool<P = ParkThread> {
park: P,
state: State,
}

/// A handle to a [`LocalPool`](LocalPool) that implements
Expand All @@ -38,66 +81,44 @@ pub struct LocalSpawner {

type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;

pub(crate) struct ThreadNotify {
thread: Thread
}

thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
thread: thread::current(),
});
}

impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.thread.unpark();
impl LocalPool<ParkThread> {
/// Create a new, empty pool of tasks.
pub fn new() -> Self
{
Self::new_with_park(ParkThread::default())
}
}

// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
thread::park();
}
})
}

fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
f(&mut cx)
})
}

impl LocalPool {
impl<P> LocalPool<P>
where
P: Park,
P::Error: std::fmt::Debug,
{
/// Create a new, empty pool of tasks.
pub fn new() -> LocalPool {
pub fn new_with_park(park: P) -> Self {
LocalPool {
pool: FuturesUnordered::new(),
incoming: Default::default(),
park,
state: State {
pool: FuturesUnordered::new(),
incoming: Default::default(),
},
}
}

/// Returns a reference to the underlying Park instance.
pub fn get_park(&self) -> &P {
&self.park
}

/// Returns a mutable reference to the underlying Park instance.
pub fn get_park_mut(&mut self) -> &mut P {
&mut self.park
}

/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner {
incoming: Rc::downgrade(&self.incoming)
incoming: Rc::downgrade(&self.state.incoming)
}
}

Expand All @@ -121,7 +142,7 @@ impl LocalPool {
/// The function will block the calling thread until *all* tasks in the pool
/// are complete, including any spawned while running existing tasks.
pub fn run(&mut self) {
run_executor(|cx| self.poll_pool(cx))
self.run_executor(|state, cx| state.poll_pool(cx))
}

/// Runs all the tasks in the pool until the given future completes.
Expand Down Expand Up @@ -149,7 +170,7 @@ impl LocalPool {
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
pin_mut!(future);

run_executor(|cx| {
self.run_executor(|state, cx| {
{
// if our main task is done, so are we
let result = future.as_mut().poll(cx);
Expand All @@ -158,7 +179,7 @@ impl LocalPool {
}
}

let _ = self.poll_pool(cx);
let _ = state.poll_pool(cx);
Poll::Pending
})
}
Expand Down Expand Up @@ -192,15 +213,35 @@ impl LocalPool {
/// further use of one of the pool's run or poll methods.
/// Though only one task will be completed, progress may be made on multiple tasks.
pub fn try_run_one(&mut self) -> bool {
poll_executor(|ctx| {
let ret = self.poll_pool_once(ctx);
let mut enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

{
// first round
let waker = self.park.waker();
let mut cx = Context::from_waker(&waker);

let result = self.state.poll_pool_once(&mut cx);

// return if we really have executed a future
match ret {
Poll::Ready(Some(_)) => true,
_ => false
if let Poll::Ready(Some(_)) = result {
return true;
}
})
}

self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");

let waker = self.park.waker();
let mut cx = Context::from_waker(&waker);

let result = self.state.poll_pool_once(&mut cx);

// return whether we really have executed a future
match result {
Poll::Ready(Some(_)) => true,
_ => false
}
}

/// Runs all tasks in the pool and returns if no more progress can be made
Expand Down Expand Up @@ -229,58 +270,61 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
loop {
let result = self.poll_pool_once(ctx);

// if there are no more ready futures exit
match result {
Poll::Pending | Poll::Ready(None) => return,
_ => continue
}
}
})
}
let mut enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

{
// first round
let waker = self.park.waker();
let mut cx = Context::from_waker(&waker);

// ignore first result, we need to run park anyway and try again
let _ = self.state.poll_pool_once(&mut cx);
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// state for the FuturesUnordered, which will never be used
loop {
let ret = self.poll_pool_once(cx);
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");

// we queued up some new tasks; add them and poll again
if !self.incoming.borrow().is_empty() {
continue;
}
let waker = self.park.waker();
let mut cx = Context::from_waker(&waker);

// no queued tasks; we may be done
match ret {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
let result = self.state.poll_pool_once(&mut cx);

// if there are no more ready futures exit
match result {
Poll::Pending | Poll::Ready(None) => return,
_ => continue
}
}
}

// Try make minimal progress on the pool of spawned tasks
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
// empty the incoming queue of newly-spawned tasks
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&mut State, &mut Context<'_>) -> Poll<T>>(&mut self, mut f: F) -> T {
let mut enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

loop {
let waker = self.park.waker();
let mut cx = Context::from_waker(&waker);

if let Poll::Ready(t) = f(&mut self.state, &mut cx) {
return t;
}
self.park.park(&mut enter, ParkDuration::Block).expect("park failed");
}

// try to execute the next ready future
self.pool.poll_next_unpin(cx)
}
}

impl Default for LocalPool {
impl<P> Default for LocalPool<P>
where
P: Default + Park,
P::Error: std::fmt::Debug,
{
fn default() -> Self {
Self::new()
Self::new_with_park(P::default())
}
}

Expand All @@ -292,7 +336,21 @@ impl Default for LocalPool {
/// spawned tasks.
pub fn block_on<F: Future>(f: F) -> F::Output {
pin_mut!(f);
run_executor(|cx| f.as_mut().poll(cx))

let mut enter = enter()
.expect("cannot execute `block_on` executor from within \
another executor");

let mut park = ParkThread::new();
let waker = park.waker().clone();
let mut cx = Context::from_waker(&waker);

loop {
if let Poll::Ready(t) = f.as_mut().poll(&mut cx) {
return t;
}
park.park(&mut enter, ParkDuration::Block).unwrap_or_else(|i| match i {});
}
}

/// Turn a stream into a blocking iterator.
Expand Down Expand Up @@ -331,7 +389,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
type Item = S::Item;

fn next(&mut self) -> Option<Self::Item> {
LocalPool::new().run_until(self.stream.next())
LocalPool::<ParkThread>::new().run_until(self.stream.next())
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
Loading