Skip to content

Commit afff1ce

Browse files
committed
FuturesUnordered: Limit max value of yield_every
1 parent e148244 commit afff1ce

File tree

2 files changed

+31
-16
lines changed

2 files changed

+31
-16
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use crate::task::AtomicWaker;
77
use alloc::sync::{Arc, Weak};
88
use core::cell::UnsafeCell;
9+
use core::cmp;
910
use core::fmt::{self, Debug};
1011
use core::iter::FromIterator;
1112
use core::marker::PhantomData;
@@ -30,6 +31,33 @@ use self::task::Task;
3031
mod ready_to_run_queue;
3132
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
3233

34+
/// Constant used for a `FuturesUnordered` to determine how many times it is
35+
/// allowed to poll underlying futures without yielding.
36+
///
37+
/// A single call to `poll_next` may potentially do a lot of work before
38+
/// yielding. This happens in particular if the underlying futures are awoken
39+
/// frequently but continue to return `Pending`. This is problematic if other
40+
/// tasks are waiting on the executor, since they do not get to run. This value
41+
/// caps the number of calls to `poll` on underlying futures a single call to
42+
/// `poll_next` is allowed to make.
43+
///
44+
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
45+
/// that amortize wakeup and scheduling costs, but low enough that we do not
46+
/// starve other tasks for long.
47+
///
48+
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
49+
///
50+
/// Note that using the length of the `FuturesUnordered` instead of this value
51+
/// may cause problems if the number of futures is large.
52+
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
53+
///
54+
/// Additionally, polling the same future twice per iteration may cause another
55+
/// problem. So, when using this value, it is necessary to limit the max value
56+
/// based on the length of the `FuturesUnordered`.
57+
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
58+
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
59+
const YIELD_EVERY: usize = 32;
60+
3361
/// A set of futures which may complete in any order.
3462
///
3563
/// This structure is optimized to manage a large number of futures.
@@ -383,21 +411,8 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
383411
type Item = Fut::Output;
384412

385413
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
386-
// Variable to determine how many times it is allowed to poll underlying
387-
// futures without yielding.
388-
//
389-
// A single call to `poll_next` may potentially do a lot of work before
390-
// yielding. This happens in particular if the underlying futures are awoken
391-
// frequently but continue to return `Pending`. This is problematic if other
392-
// tasks are waiting on the executor, since they do not get to run. This value
393-
// caps the number of calls to `poll` on underlying futures a single call to
394-
// `poll_next` is allowed to make.
395-
//
396-
// The value is the length of FuturesUnordered. This ensures that each
397-
// future is polled only once at most per iteration.
398-
//
399-
// See also https://github.com/rust-lang/futures-rs/issues/2047.
400-
let yield_every = self.len();
414+
// See YIELD_EVERY docs for more.
415+
let yield_every = cmp::min(self.len(), YIELD_EVERY);
401416

402417
// Keep track of how many child futures we have polled,
403418
// in case we want to forcibly yield.

futures/tests/stream_futures_unordered.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ fn polled_only_once_at_most_per_iteration() {
340340

341341
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
342342
assert!(tasks.poll_next_unpin(cx).is_pending());
343-
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
343+
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
344344

345345
let mut tasks = FuturesUnordered::<F>::new();
346346
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));

0 commit comments

Comments
 (0)