Skip to content

Commit d4eebb4

Browse files
committed
FuturesUnordered: Respect yielding from future
1 parent ca04ac6 commit d4eebb4

File tree

3 files changed

+28
-37
lines changed

3 files changed

+28
-37
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
use crate::task::AtomicWaker;
77
use alloc::sync::{Arc, Weak};
88
use core::cell::UnsafeCell;
9-
use core::cmp;
109
use core::fmt::{self, Debug};
1110
use core::iter::FromIterator;
1211
use core::marker::PhantomData;
@@ -31,33 +30,6 @@ use self::task::Task;
3130
mod ready_to_run_queue;
3231
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
3332

34-
/// Constant used for a `FuturesUnordered` to determine how many times it is
35-
/// allowed to poll underlying futures without yielding.
36-
///
37-
/// A single call to `poll_next` may potentially do a lot of work before
38-
/// yielding. This happens in particular if the underlying futures are awoken
39-
/// frequently but continue to return `Pending`. This is problematic if other
40-
/// tasks are waiting on the executor, since they do not get to run. This value
41-
/// caps the number of calls to `poll` on underlying futures a single call to
42-
/// `poll_next` is allowed to make.
43-
///
44-
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
45-
/// that amortize wakeup and scheduling costs, but low enough that we do not
46-
/// starve other tasks for long.
47-
///
48-
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
49-
///
50-
/// Note that using the length of the `FuturesUnordered` instead of this value
51-
/// may cause problems if the number of futures is large.
52-
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
53-
///
54-
/// Additionally, polling the same future twice per iteration may cause another
55-
/// problem. So, when using this value, it is necessary to limit the max value
56-
/// based on the length of the `FuturesUnordered`.
57-
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
58-
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
59-
const YIELD_EVERY: usize = 32;
60-
6133
/// A set of futures which may complete in any order.
6234
///
6335
/// This structure is optimized to manage a large number of futures.
@@ -149,6 +121,7 @@ impl<Fut> FuturesUnordered<Fut> {
149121
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
150122
queued: AtomicBool::new(true),
151123
ready_to_run_queue: Weak::new(),
124+
woken: AtomicBool::new(false),
152125
});
153126
let stub_ptr = Arc::as_ptr(&stub);
154127
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
@@ -195,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> {
195168
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
196169
queued: AtomicBool::new(true),
197170
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
171+
woken: AtomicBool::new(false),
198172
});
199173

200174
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -411,8 +385,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
411385
type Item = Fut::Output;
412386

413387
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
414-
// See YIELD_EVERY docs for more.
415-
let yield_every = cmp::min(self.len(), YIELD_EVERY);
388+
let len = self.len();
416389

417390
// Keep track of how many child futures we have polled,
418391
// in case we want to forcibly yield.
@@ -527,7 +500,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
527500
// the internal allocation, appropriately accessing fields and
528501
// deallocating the task if need be.
529502
let res = {
530-
let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
503+
let task = bomb.task.as_ref().unwrap();
504+
// We are only interested in whether the future waken before it
505+
// finishes polling, so reset the flag here.
506+
task.woken.store(false, Relaxed);
507+
let waker = Task::waker_ref(task);
531508
let mut cx = Context::from_waker(&waker);
532509

533510
// Safety: We won't move the future ever again
@@ -540,12 +517,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
540517
match res {
541518
Poll::Pending => {
542519
let task = bomb.task.take().unwrap();
520+
// If the future waken before it finishes polling, we assume
521+
// the future yields.
522+
let yielded = task.woken.load(Relaxed);
543523
bomb.queue.link(task);
544524

545-
if polled == yield_every {
546-
// We have polled a large number of futures in a row without yielding.
547-
// To ensure we do not starve other tasks waiting on the executor,
548-
// we yield here, but immediately wake ourselves up to continue.
525+
// If a future yields, we respect it and yield here.
526+
// If all futures have been polled, we also yield here to
527+
// avoid starving other tasks waiting on the executor.
528+
// (polling the same future twice per iteration may cause
529+
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
530+
if yielded || polled == len {
549531
cx.waker().wake_by_ref();
550532
return Poll::Pending;
551533
}

futures-util/src/stream/futures_unordered/task.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use alloc::sync::{Arc, Weak};
22
use core::cell::UnsafeCell;
3-
use core::sync::atomic::Ordering::{self, SeqCst};
3+
use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
44
use core::sync::atomic::{AtomicBool, AtomicPtr};
55

66
use super::abort::abort;
@@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {
3131

3232
// Whether or not this task is currently in the ready to run queue
3333
pub(super) queued: AtomicBool,
34+
35+
// Whether the future waken before it finishes polling
36+
// It is possible for this flag to be set to true after the polling,
37+
// but it will be ignored.
38+
pub(super) woken: AtomicBool,
3439
}
3540

3641
// `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
4853
None => return,
4954
};
5055

56+
arc_self.woken.store(true, Relaxed);
57+
5158
// It's our job to enqueue this task it into the ready to run queue. To
5259
// do this we set the `queued` flag, and if successful we then do the
5360
// actual queueing operation, ensuring that we're only queued once.

futures/tests/stream_futures_unordered.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ fn futures_not_moved_after_poll() {
268268
let fut = future::ready(()).pending_once().assert_unmoved();
269269
let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
270270
assert_stream_pending!(stream);
271+
assert_stream_pending!(stream);
272+
assert_stream_pending!(stream);
271273
assert_stream_next!(stream, ());
272274
assert_stream_next!(stream, ());
273275
assert_stream_next!(stream, ());
@@ -342,7 +344,7 @@ fn polled_only_once_at_most_per_iteration() {
342344

343345
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
344346
assert!(tasks.poll_next_unpin(cx).is_pending());
345-
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
347+
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
346348

347349
let mut tasks = FuturesUnordered::<F>::new();
348350
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));

0 commit comments

Comments
 (0)