From a7b5b64e3269a0fc976b8c8aa4e2f61db6c210fc Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 18 Dec 2021 22:54:32 +0900 Subject: [PATCH 1/4] Remove read-initializer feature --- futures-io/Cargo.toml | 1 - futures-io/src/lib.rs | 45 ------------------------- futures-util/Cargo.toml | 1 - futures-util/src/compat/compat01as03.rs | 12 ------- futures-util/src/compat/compat03as01.rs | 12 +------ futures-util/src/future/either.rs | 10 ------ futures-util/src/io/allow_std.rs | 11 ------ futures-util/src/io/buf_reader.rs | 8 ----- futures-util/src/io/chain.rs | 12 ------- futures-util/src/io/empty.rs | 8 ----- futures-util/src/io/mod.rs | 12 +------ futures-util/src/io/repeat.rs | 8 ----- futures-util/src/io/take.rs | 7 ---- futures-util/src/lib.rs | 9 ----- futures/Cargo.toml | 1 - futures/src/lib.rs | 4 --- 16 files changed, 2 insertions(+), 159 deletions(-) diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml index a3895b8057..f87c34a9c1 100644 --- a/futures-io/Cargo.toml +++ b/futures-io/Cargo.toml @@ -18,7 +18,6 @@ std = [] # These features are outside of the normal semver guarantees and require the # `unstable` feature as an explicit opt-in to unstable API. unstable = [] -read-initializer = [] [dependencies] diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs index 7e03b8e5a0..e91eb78492 100644 --- a/futures-io/src/lib.rs +++ b/futures-io/src/lib.rs @@ -8,7 +8,6 @@ //! All items of this library are only available when the `std` feature of this //! library is activated, and it is activated by default. -#![cfg_attr(all(feature = "read-initializer", feature = "std"), feature(read_initializer))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. @@ -22,9 +21,6 @@ ))] #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] -compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(feature = "std")] mod if_std { use std::io; @@ -34,11 +30,6 @@ mod if_std { // Re-export some types from `std::io` so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. - #[cfg(feature = "read-initializer")] - #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] - #[doc(no_inline)] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use io::Initializer; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 #[doc(no_inline)] pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; @@ -51,27 +42,6 @@ mod if_std { /// for wakeup and return if data is not yet available, rather than blocking /// the calling thread. pub trait AsyncRead { - /// Determines if this `AsyncRead`er can work with buffers of - /// uninitialized memory. - /// - /// The default implementation returns an initializer which will zero - /// buffers. - /// - /// This method is only available when the `read-initializer` feature of this - /// library is activated. - /// - /// # Safety - /// - /// This method is `unsafe` because an `AsyncRead`er could otherwise - /// return a non-zeroing `Initializer` from another `AsyncRead` type - /// without an `unsafe` block. - #[cfg(feature = "read-initializer")] - #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::zeroing() - } - /// Attempt to read from the `AsyncRead` into `buf`. /// /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. @@ -329,11 +299,6 @@ mod if_std { macro_rules! deref_async_read { () => { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - (**self).initializer() - } - fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -365,11 +330,6 @@ mod if_std { P: DerefMut + Unpin, P::Target: AsyncRead, { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - (**self).initializer() - } - fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -389,11 +349,6 @@ mod if_std { macro_rules! delegate_async_read_to_stdio { () => { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - io::Read::initializer(self) - } - fn poll_read( mut self: Pin<&mut Self>, _: &mut Context<'_>, diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 276c392504..a8f29d4cb4 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -27,7 +27,6 @@ channel = ["std", "futures-channel"] # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable"] bilock = [] -read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] write-all-vectored = ["io"] # These features are no longer used. diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index 17239a4e57..754e3d82a1 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -351,8 +351,6 @@ unsafe impl UnsafeNotify01 for NotifyWaker { #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] mod io { use super::*; - #[cfg(feature = "read-initializer")] - use futures_io::Initializer; use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; use std::io::Error; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; @@ -416,16 +414,6 @@ mod io { impl AsyncWrite01CompatExt for W {} impl AsyncRead03 for Compat01As03 { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - // check if `prepare_uninitialized_buffer` needs zeroing - if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) { - Initializer::zeroing() - } else { - Initializer::nop() - } - } - fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/futures-util/src/compat/compat03as01.rs b/futures-util/src/compat/compat03as01.rs index 2573fe7a74..5d3a6e920b 100644 --- a/futures-util/src/compat/compat03as01.rs +++ b/futures-util/src/compat/compat03as01.rs @@ -236,17 +236,7 @@ mod io { } } - impl AsyncRead01 for Compat { - #[cfg(feature = "read-initializer")] - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - let initializer = self.inner.initializer(); - let does_init = initializer.should_initialize(); - if does_init { - initializer.initialize(buf); - } - does_init - } - } + impl AsyncRead01 for Compat {} impl std::io::Write for Compat { fn write(&mut self, buf: &[u8]) -> std::io::Result { diff --git a/futures-util/src/future/either.rs b/futures-util/src/future/either.rs index 35650daa99..9602de7a42 100644 --- a/futures-util/src/future/either.rs +++ b/futures-util/src/future/either.rs @@ -184,8 +184,6 @@ mod if_std { use core::pin::Pin; use core::task::{Context, Poll}; - #[cfg(feature = "read-initializer")] - use futures_io::Initializer; use futures_io::{ AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom, }; @@ -195,14 +193,6 @@ mod if_std { A: AsyncRead, B: AsyncRead, { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - match self { - Either::Left(x) => x.initializer(), - Either::Right(x) => x.initializer(), - } - } - fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/futures-util/src/io/allow_std.rs b/futures-util/src/io/allow_std.rs index 1d13e0c177..ec30ee31e5 100644 --- a/futures-util/src/io/allow_std.rs +++ b/futures-util/src/io/allow_std.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use std::pin::Pin; use std::{fmt, io}; @@ -121,10 +119,6 @@ where fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { self.0.read_vectored(bufs) } - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.0.initializer() - } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { self.0.read_to_end(buf) } @@ -155,11 +149,6 @@ where ) -> Poll> { Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs)))) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.0.initializer() - } } impl io::Seek for AllowStdIo diff --git a/futures-util/src/io/buf_reader.rs b/futures-util/src/io/buf_reader.rs index 2d585a9eb6..0334a9f081 100644 --- a/futures-util/src/io/buf_reader.rs +++ b/futures-util/src/io/buf_reader.rs @@ -2,8 +2,6 @@ use super::DEFAULT_BUF_SIZE; use futures_core::future::Future; use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; use pin_project_lite::pin_project; use std::io::{self, Read}; @@ -144,12 +142,6 @@ impl AsyncRead for BufReader { self.consume(nread); Poll::Ready(Ok(nread)) } - - // we can't skip unconditionally because of the large buffer case in read. - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } impl AsyncBufRead for BufReader { diff --git a/futures-util/src/io/chain.rs b/futures-util/src/io/chain.rs index a35c50de35..728a3d2dc0 100644 --- a/futures-util/src/io/chain.rs +++ b/futures-util/src/io/chain.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; use pin_project_lite::pin_project; use std::fmt; @@ -111,16 +109,6 @@ where } this.second.poll_read_vectored(cx, bufs) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - let initializer = self.first.initializer(); - if initializer.should_initialize() { - initializer - } else { - self.second.initializer() - } - } } impl AsyncBufRead for Chain diff --git a/futures-util/src/io/empty.rs b/futures-util/src/io/empty.rs index ab2395a8af..02f6103f54 100644 --- a/futures-util/src/io/empty.rs +++ b/futures-util/src/io/empty.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead}; use std::fmt; use std::io; @@ -43,12 +41,6 @@ impl AsyncRead for Empty { ) -> Poll> { Poll::Ready(Ok(0)) } - - #[cfg(feature = "read-initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } impl AsyncBufRead for Empty { diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 4a51804802..4dd2e029bf 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -26,10 +26,6 @@ use std::{pin::Pin, ptr}; // Re-export some types from `std::io` so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. #[doc(no_inline)] -#[cfg(feature = "read-initializer")] -#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] -pub use std::io::Initializer; -#[doc(no_inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; @@ -40,15 +36,9 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024; /// Initializes a buffer if necessary. /// -/// A buffer is always initialized if `read-initializer` feature is disabled. +/// A buffer is currently always initialized. #[inline] unsafe fn initialize(_reader: &R, buf: &mut [u8]) { - #[cfg(feature = "read-initializer")] - { - if !_reader.initializer().should_initialize() { - return; - } - } ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } diff --git a/futures-util/src/io/repeat.rs b/futures-util/src/io/repeat.rs index 4cefcb2a28..2828bf0114 100644 --- a/futures-util/src/io/repeat.rs +++ b/futures-util/src/io/repeat.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncRead, IoSliceMut}; use std::fmt; use std::io; @@ -59,12 +57,6 @@ impl AsyncRead for Repeat { } Poll::Ready(Ok(nwritten)) } - - #[cfg(feature = "read-initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } impl fmt::Debug for Repeat { diff --git a/futures-util/src/io/take.rs b/futures-util/src/io/take.rs index 05830203d0..2c494804d9 100644 --- a/futures-util/src/io/take.rs +++ b/futures-util/src/io/take.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead}; use pin_project_lite::pin_project; use std::pin::Pin; @@ -100,11 +98,6 @@ impl AsyncRead for Take { *this.limit -= n as u64; Poll::Ready(Ok(n)) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } impl AsyncBufRead for Take { diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 76d37994ff..9a10c93c9a 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -1,7 +1,6 @@ //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, //! and the `AsyncRead` and `AsyncWrite` traits. -#![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] #![cfg_attr(not(feature = "std"), no_std)] #![warn( @@ -23,9 +22,6 @@ #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] -compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(feature = "alloc")] extern crate alloc; @@ -148,11 +144,6 @@ macro_rules! delegate_async_write { #[cfg(feature = "std")] macro_rules! delegate_async_read { ($field:ident) => { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> $crate::io::Initializer { - self.$field.initializer() - } - fn poll_read( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 730c89d697..43f0b10ffb 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -47,7 +47,6 @@ thread-pool = ["executor", "futures-executor/thread-pool"] # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] bilock = ["futures-util/bilock"] -read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] write-all-vectored = ["futures-util/write-all-vectored"] # These features are no longer used. diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 362aa3cd99..8e21c8ebe8 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -78,7 +78,6 @@ //! The majority of examples and code snippets in this crate assume that they are //! inside an async block as written above. -#![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(not(feature = "std"), no_std)] #![warn( missing_debug_implementations, @@ -99,9 +98,6 @@ #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] -compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[doc(no_inline)] pub use futures_core::future::{Future, TryFuture}; #[doc(no_inline)] From f742154a3136de6a736680166c6a0e3ed52fe2cb Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 18 Dec 2021 23:15:25 +0900 Subject: [PATCH 2/4] Revert "Remove implicit clear in `ReadyToRunQueue::drop` (#2493)" This reverts commit 37dfb05b6c9414ede608d61e4c4507cd7c148038. --- .../src/stream/futures_unordered/mod.rs | 21 ++++++++++++++++--- .../futures_unordered/ready_to_run_queue.rs | 15 ++++++++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 6918a26b91..4a05d8823e 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -558,7 +558,7 @@ impl FuturesUnordered { pub fn clear(&mut self) { self.clear_head_all(); - // SAFETY: we just cleared all the tasks and we have &mut self + // we just cleared all the tasks, and we have &mut self, so this is safe. unsafe { self.ready_to_run_queue.clear() }; self.is_terminated.store(false, Relaxed); @@ -575,9 +575,24 @@ impl FuturesUnordered { impl Drop for FuturesUnordered { fn drop(&mut self) { + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // wakers flying around which contain `Task` references + // inside them. We'll let those naturally get deallocated. self.clear_head_all(); - // SAFETY: we just cleared all the tasks and we have &mut self - unsafe { self.ready_to_run_queue.clear() }; + + // Note that at this point we could still have a bunch of tasks in the + // ready to run queue. None of those tasks, however, have futures + // associated with them so they're safe to destroy on any thread. At + // this point the `FuturesUnordered` struct, the owner of the one strong + // reference to the ready to run queue will drop the strong reference. + // At that point whichever thread releases the strong refcount last (be + // it this thread or some other thread as part of an `upgrade`) will + // clear out the ready to run queue and free all remaining tasks. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. } } diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 2bc208682a..5ef6cde83d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -94,7 +94,7 @@ impl ReadyToRunQueue { // // # Safety // - // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear_head_all) + // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) // - The caller **must** guarantee unique access to `self` pub(crate) unsafe fn clear(&self) { loop { @@ -107,3 +107,16 @@ impl ReadyToRunQueue { } } } + +impl Drop for ReadyToRunQueue { + fn drop(&mut self) { + // Once we're in the destructor for `Inner` we need to clear out + // the ready to run queue of tasks if there's anything left in there. + + // All tasks have had their futures dropped already by the `FuturesUnordered` + // destructor above, and we have &mut self, so this is safe. + unsafe { + self.clear(); + } + } +} From 6865a62b4932f2ce287460cd20a63d52381d301b Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 18 Dec 2021 23:56:35 +0900 Subject: [PATCH 3/4] FuturesUnordered: Limit max value of yield_every --- .../src/stream/futures_unordered/mod.rs | 45 ++++++++++++------- futures/tests/stream_futures_unordered.rs | 2 +- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 4a05d8823e..aab2bb4467 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -6,6 +6,7 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; +use core::cmp; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -30,6 +31,33 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; +/// Constant used for a `FuturesUnordered` to determine how many times it is +/// allowed to poll underlying futures without yielding. +/// +/// A single call to `poll_next` may potentially do a lot of work before +/// yielding. This happens in particular if the underlying futures are awoken +/// frequently but continue to return `Pending`. This is problematic if other +/// tasks are waiting on the executor, since they do not get to run. This value +/// caps the number of calls to `poll` on underlying futures a single call to +/// `poll_next` is allowed to make. +/// +/// The value itself is chosen somewhat arbitrarily. It needs to be high enough +/// that amortize wakeup and scheduling costs, but low enough that we do not +/// starve other tasks for long. +/// +/// See also https://github.com/rust-lang/futures-rs/issues/2047. +/// +/// Note that using the length of the `FuturesUnordered` instead of this value +/// may cause problems if the number of futures is large. +/// See also https://github.com/rust-lang/futures-rs/pull/2527. +/// +/// Additionally, polling the same future twice per iteration may cause another +/// problem. So, when using this value, it is necessary to limit the max value +/// based on the length of the `FuturesUnordered`. +/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`) +/// See also https://github.com/rust-lang/futures-rs/pull/2333. +const YIELD_EVERY: usize = 32; + /// A set of futures which may complete in any order. /// /// This structure is optimized to manage a large number of futures. @@ -383,21 +411,8 @@ impl Stream for FuturesUnordered { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Variable to determine how many times it is allowed to poll underlying - // futures without yielding. - // - // A single call to `poll_next` may potentially do a lot of work before - // yielding. This happens in particular if the underlying futures are awoken - // frequently but continue to return `Pending`. This is problematic if other - // tasks are waiting on the executor, since they do not get to run. This value - // caps the number of calls to `poll` on underlying futures a single call to - // `poll_next` is allowed to make. - // - // The value is the length of FuturesUnordered. This ensures that each - // future is polled only once at most per iteration. - // - // See also https://github.com/rust-lang/futures-rs/issues/2047. - let yield_every = self.len(); + // See YIELD_EVERY docs for more. + let yield_every = cmp::min(self.len(), YIELD_EVERY); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 4b9afccaf9..439c809bef 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -340,7 +340,7 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); assert!(tasks.poll_next_unpin(cx).is_pending()); - assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); + assert_eq!(32, tasks.iter().filter(|f| f.polled).count()); let mut tasks = FuturesUnordered::::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); From c7efe9dd47472d51b375279566599be3aeac18b7 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 8 Sep 2021 16:34:23 +0200 Subject: [PATCH 4/4] futures-util: add StreamExt::count method Signed-off-by: Petros Angelatos --- futures-util/src/stream/stream/count.rs | 53 +++++++++++++++++++++++++ futures-util/src/stream/stream/mod.rs | 36 +++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 futures-util/src/stream/stream/count.rs diff --git a/futures-util/src/stream/stream/count.rs b/futures-util/src/stream/stream/count.rs new file mode 100644 index 0000000000..513cab7b6a --- /dev/null +++ b/futures-util/src/stream/stream/count.rs @@ -0,0 +1,53 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`count`](super::StreamExt::count) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Count { + #[pin] + stream: St, + count: usize + } +} + +impl fmt::Debug for Count +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish() + } +} + +impl Count { + pub(super) fn new(stream: St) -> Self { + Self { stream, count: 0 } + } +} + +impl FusedFuture for Count { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl Future for Count { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + Poll::Ready(loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(_) => *this.count += 1, + None => break *this.count, + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 86997f45c9..9cfcc09bac 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -40,6 +40,10 @@ mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; +mod count; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::count::Count; + mod cycle; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::cycle::Cycle; @@ -576,6 +580,38 @@ pub trait StreamExt: Stream { assert_future::(Concat::new(self)) } + /// Drives the stream to completion, counting the number of items. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of a + /// stream with more than [`usize::MAX`] elements either produces the wrong + /// result or panics. If debug assertions are enabled, a panic is guaranteed. + /// + /// # Panics + /// + /// This function might panic if the iterator has more than [`usize::MAX`] + /// elements. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let count = stream.count().await; + /// + /// assert_eq!(count, 10); + /// # }); + /// ``` + fn count(self) -> Count + where + Self: Sized, + { + assert_future::(Count::new(self)) + } + /// Repeats a stream endlessly. /// /// The stream never terminates. Note that you likely want to avoid