Skip to content

Commit 8f20722

Browse files
khollbachtaiki-e
authored andcommitted
Make run_until_stalled handle self-waking futures (#2593)
LocalPool::try_run_one and run_until_stalled now correctly re-try when a future "yields" by calling wake and returning Pending.
1 parent 3207f4f commit 8f20722

File tree

2 files changed

+113
-45
lines changed

2 files changed

+113
-45
lines changed

futures-executor/src/local_pool.rs

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,9 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
106106
})
107107
}
108108

109-
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
110-
let _enter = enter().expect(
111-
"cannot execute `LocalPool` executor from within \
112-
another executor",
113-
);
114-
115-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
116-
let waker = waker_ref(thread_notify);
117-
let mut cx = Context::from_waker(&waker);
118-
f(&mut cx)
119-
})
109+
/// Check for a wakeup, but don't consume it.
110+
fn woken() -> bool {
111+
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst))
120112
}
121113

122114
impl LocalPool {
@@ -212,20 +204,26 @@ impl LocalPool {
212204
/// further use of one of the pool's run or poll methods.
213205
/// Though only one task will be completed, progress may be made on multiple tasks.
214206
pub fn try_run_one(&mut self) -> bool {
215-
poll_executor(|ctx| {
207+
run_executor(|cx| {
216208
loop {
217-
let ret = self.poll_pool_once(ctx);
218-
219-
// return if we have executed a future
220-
if let Poll::Ready(Some(_)) = ret {
221-
return true;
209+
self.drain_incoming();
210+
211+
match self.pool.poll_next_unpin(cx) {
212+
// Success!
213+
Poll::Ready(Some(())) => return Poll::Ready(true),
214+
// The pool was empty.
215+
Poll::Ready(None) => return Poll::Ready(false),
216+
Poll::Pending => (),
222217
}
223218

224-
// if there are no new incoming futures
225-
// then there is no feature that can make progress
226-
// and we can return without having completed a single future
227-
if self.incoming.borrow().is_empty() {
228-
return false;
219+
if !self.incoming.borrow().is_empty() {
220+
// New tasks were spawned; try again.
221+
continue;
222+
} else if woken() {
223+
// The pool yielded to us, but there's more progress to be made.
224+
return Poll::Pending;
225+
} else {
226+
return Poll::Ready(false);
229227
}
230228
}
231229
})
@@ -257,44 +255,52 @@ impl LocalPool {
257255
/// of the pool's run or poll methods. While the function is running, all tasks
258256
/// in the pool will try to make progress.
259257
pub fn run_until_stalled(&mut self) {
260-
poll_executor(|ctx| {
261-
let _ = self.poll_pool(ctx);
258+
run_executor(|cx| match self.poll_pool(cx) {
259+
// The pool is empty.
260+
Poll::Ready(()) => Poll::Ready(()),
261+
Poll::Pending => {
262+
if woken() {
263+
Poll::Pending
264+
} else {
265+
// We're stalled for now.
266+
Poll::Ready(())
267+
}
268+
}
262269
});
263270
}
264271

265-
// Make maximal progress on the entire pool of spawned task, returning `Ready`
266-
// if the pool is empty and `Pending` if no further progress can be made.
272+
/// Poll `self.pool`, re-filling it with any newly-spawned tasks.
273+
/// Repeat until either the pool is empty, or it returns `Pending`.
274+
///
275+
/// Returns `Ready` if the pool was empty, and `Pending` otherwise.
276+
///
277+
/// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
278+
/// mean that the pool can't make progress.
267279
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
268-
// state for the FuturesUnordered, which will never be used
269280
loop {
270-
let ret = self.poll_pool_once(cx);
281+
self.drain_incoming();
271282

272-
// we queued up some new tasks; add them and poll again
283+
let pool_ret = self.pool.poll_next_unpin(cx);
284+
285+
// We queued up some new tasks; add them and poll again.
273286
if !self.incoming.borrow().is_empty() {
274287
continue;
275288
}
276289

277-
// no queued tasks; we may be done
278-
match ret {
279-
Poll::Pending => return Poll::Pending,
290+
match pool_ret {
291+
Poll::Ready(Some(())) => continue,
280292
Poll::Ready(None) => return Poll::Ready(()),
281-
_ => {}
293+
Poll::Pending => return Poll::Pending,
282294
}
283295
}
284296
}
285297

286-
// Try make minimal progress on the pool of spawned tasks
287-
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
288-
// empty the incoming queue of newly-spawned tasks
289-
{
290-
let mut incoming = self.incoming.borrow_mut();
291-
for task in incoming.drain(..) {
292-
self.pool.push(task)
293-
}
298+
/// Empty the incoming queue of newly-spawned tasks.
299+
fn drain_incoming(&mut self) {
300+
let mut incoming = self.incoming.borrow_mut();
301+
for task in incoming.drain(..) {
302+
self.pool.push(task)
294303
}
295-
296-
// try to execute the next ready future
297-
self.pool.poll_next_unpin(cx)
298304
}
299305
}
300306

futures-executor/tests/local_pool.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use futures::channel::oneshot;
22
use futures::executor::LocalPool;
33
use futures::future::{self, lazy, poll_fn, Future};
4-
use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
4+
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
55
use std::cell::{Cell, RefCell};
66
use std::pin::Pin;
77
use std::rc::Rc;
@@ -435,3 +435,65 @@ fn park_unpark_independence() {
435435

436436
futures::executor::block_on(future)
437437
}
438+
439+
struct SelfWaking {
440+
wakeups_remaining: Rc<RefCell<usize>>,
441+
}
442+
443+
impl Future for SelfWaking {
444+
type Output = ();
445+
446+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
447+
if *self.wakeups_remaining.borrow() != 0 {
448+
*self.wakeups_remaining.borrow_mut() -= 1;
449+
cx.waker().wake_by_ref();
450+
}
451+
452+
Poll::Pending
453+
}
454+
}
455+
456+
/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
457+
///
458+
/// The issue was that self-waking futures could cause `run_until_stalled`
459+
/// to exit early, even when progress could still be made.
460+
#[test]
461+
fn self_waking_run_until_stalled() {
462+
let wakeups_remaining = Rc::new(RefCell::new(10));
463+
464+
let mut pool = LocalPool::new();
465+
let spawner = pool.spawner();
466+
for _ in 0..3 {
467+
let wakeups_remaining = Rc::clone(&wakeups_remaining);
468+
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
469+
}
470+
471+
// This should keep polling until there are no more wakeups.
472+
pool.run_until_stalled();
473+
474+
assert_eq!(*wakeups_remaining.borrow(), 0);
475+
}
476+
477+
/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
478+
///
479+
/// The issue was that self-waking futures could cause `try_run_one`
480+
/// to exit early, even when progress could still be made.
481+
#[test]
482+
fn self_waking_try_run_one() {
483+
let wakeups_remaining = Rc::new(RefCell::new(10));
484+
485+
let mut pool = LocalPool::new();
486+
let spawner = pool.spawner();
487+
for _ in 0..3 {
488+
let wakeups_remaining = Rc::clone(&wakeups_remaining);
489+
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
490+
}
491+
492+
spawner.spawn(future::ready(())).unwrap();
493+
494+
// The `ready` future should complete.
495+
assert!(pool.try_run_one());
496+
497+
// The self-waking futures are each polled once.
498+
assert_eq!(*wakeups_remaining.borrow(), 7);
499+
}

0 commit comments

Comments
 (0)