From 4a70fb97e948d8fa60248001efc2019ada14ea39 Mon Sep 17 00:00:00 2001 From: Kevan Hollbach Date: Thu, 21 Apr 2022 16:48:41 -0400 Subject: [PATCH 1/5] Make run_until_stalled handle self-waking futures LocalPool::try_run_one and run_until_stalled now correctly re-try when a future "yields" by calling wake and returning Pending. --- futures-executor/src/local_pool.rs | 92 ++++++++++++++++-------------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index bee96d8db9..06ce8ba80c 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -106,17 +106,9 @@ fn run_executor) -> Poll>(mut f: F) -> T { }) } -fn poll_executor) -> T>(mut f: F) -> T { - let _enter = enter().expect( - "cannot execute `LocalPool` executor from within \ - another executor", - ); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - f(&mut cx) - }) +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst)) } impl LocalPool { @@ -212,20 +204,26 @@ impl LocalPool { /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { - poll_executor(|ctx| { + run_executor(|cx| { loop { - let ret = self.poll_pool_once(ctx); - - // return if we have executed a future - if let Poll::Ready(Some(_)) = ret { - return true; + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), } - // if there are no new incoming futures - // then there is no feature that can make progress - // and we can return without having completed a single future - if self.incoming.borrow().is_empty() { - return false; + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); } } }) @@ -257,44 +255,50 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { - let _ = self.poll_pool(ctx); + run_executor(|cx| match self.poll_pool(cx) { + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + Poll::Ready(()) + } + } }); } - // Make maximal progress on the entire pool of spawned task, returning `Ready` - // if the pool is empty and `Pending` if no further progress can be made. + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // state for the FuturesUnordered, which will never be used loop { - let ret = self.poll_pool_once(cx); + self.drain_incoming(); - // we queued up some new tasks; add them and poll again + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. if !self.incoming.borrow().is_empty() { continue; } - // no queued tasks; we may be done - match ret { - Poll::Pending => return Poll::Pending, + match pool_ret { + Poll::Ready(Some(())) => continue, Poll::Ready(None) => return Poll::Ready(()), - _ => {} + Poll::Pending => return Poll::Pending, } } } - // Try make minimal progress on the pool of spawned tasks - fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll> { - // empty the incoming queue of newly-spawned tasks - { - let mut incoming = self.incoming.borrow_mut(); - for task in incoming.drain(..) { - self.pool.push(task) - } + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) } - - // try to execute the next ready future - self.pool.poll_next_unpin(cx) } } From bed05b84548b2d6063e7e858ce07eabdf8ae4b75 Mon Sep 17 00:00:00 2001 From: Kevan Hollbach Date: Thu, 21 Apr 2022 18:12:13 -0400 Subject: [PATCH 2/5] Add clarifying comments to run_until_stalled --- futures-executor/src/local_pool.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 06ce8ba80c..9691060725 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -256,11 +256,13 @@ impl LocalPool { /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. Poll::Ready(()) => Poll::Ready(()), Poll::Pending => { if woken() { Poll::Pending } else { + // We're stalled for now. Poll::Ready(()) } } From 99dd3c8f220c787589750b5a0787be5fc029234a Mon Sep 17 00:00:00 2001 From: Kevan Hollbach Date: Wed, 11 May 2022 14:41:57 -0400 Subject: [PATCH 3/5] Add regression tests --- futures-executor/tests/local_pool.rs | 64 +++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 8e5e27981d..d00894dbc5 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,7 +1,7 @@ use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, lazy, poll_fn, Future}; -use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker}; +use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; @@ -435,3 +435,65 @@ fn park_unpark_independence() { futures::executor::block_on(future) } + +struct SelfWaking { + wakeups_remaining: Arc>, +} + +impl Future for SelfWaking { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if *self.wakeups_remaining.borrow() != 0 { + *self.wakeups_remaining.borrow_mut() -= 1; + cx.waker().wake_by_ref(); + } + + Poll::Pending + } +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `run_until_stalled` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_run_until_stalled() { + let wakeups_remaining = Arc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Arc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + // This should keep polling until there are no more wakeups. + pool.run_until_stalled(); + + assert_eq!(*wakeups_remaining.borrow(), 0); +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `try_run_one` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_try_run_one() { + let wakeups_remaining = Arc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Arc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + spawner.spawn(future::ready(())).unwrap(); + + // The `ready` future should complete. + assert!(pool.try_run_one()); + + // The self-waking futures are each polled once. + assert_eq!(*wakeups_remaining.borrow(), 7); +} From 26db1e99866ab81dffd6778ce25c9d2da625fcea Mon Sep 17 00:00:00 2001 From: khollbach Date: Wed, 11 May 2022 16:23:59 -0400 Subject: [PATCH 4/5] Update futures-executor/tests/local_pool.rs Co-authored-by: Taiki Endo --- futures-executor/tests/local_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index d00894dbc5..6305e02d64 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -437,7 +437,7 @@ fn park_unpark_independence() { } struct SelfWaking { - wakeups_remaining: Arc>, + wakeups_remaining: Rc>, } impl Future for SelfWaking { From 45045c9d5f3d4bd9218378be5a06e4647cc63b77 Mon Sep 17 00:00:00 2001 From: Kevan Hollbach Date: Wed, 11 May 2022 16:29:08 -0400 Subject: [PATCH 5/5] Arc -> Rc --- futures-executor/tests/local_pool.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 6305e02d64..6e908d2444 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -459,12 +459,12 @@ impl Future for SelfWaking { /// to exit early, even when progress could still be made. #[test] fn self_waking_run_until_stalled() { - let wakeups_remaining = Arc::new(RefCell::new(10)); + let wakeups_remaining = Rc::new(RefCell::new(10)); let mut pool = LocalPool::new(); let spawner = pool.spawner(); for _ in 0..3 { - let wakeups_remaining = Arc::clone(&wakeups_remaining); + let wakeups_remaining = Rc::clone(&wakeups_remaining); spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); } @@ -480,12 +480,12 @@ fn self_waking_run_until_stalled() { /// to exit early, even when progress could still be made. #[test] fn self_waking_try_run_one() { - let wakeups_remaining = Arc::new(RefCell::new(10)); + let wakeups_remaining = Rc::new(RefCell::new(10)); let mut pool = LocalPool::new(); let spawner = pool.spawner(); for _ in 0..3 { - let wakeups_remaining = Arc::clone(&wakeups_remaining); + let wakeups_remaining = Rc::clone(&wakeups_remaining); spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); }