From 1f6f8d07747213347ad1966122d43244376ba6d2 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 14 Feb 2022 01:03:27 +0900 Subject: [PATCH 01/33] Fix Sync impl of BiLockGuard --- futures-util/src/lock/bilock.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 2f51ae7c98..2174079c83 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -224,6 +224,9 @@ pub struct BiLockGuard<'a, T> { bilock: &'a BiLock, } +// We allow parallel access to T via Deref, so Sync bound is also needed here. +unsafe impl Sync for BiLockGuard<'_, T> {} + impl Deref for BiLockGuard<'_, T> { type Target = T; fn deref(&self) -> &T { From 9e8a7847a41429456536f5482ca2c89fad9a6260 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 14 Feb 2022 01:07:34 +0900 Subject: [PATCH 02/33] Fix stable_features warning ``` error: the feature `cfg_target_has_atomic` has been stable since 1.60.0 and no longer requires an attribute to enable --> futures/tests/no-std/src/lib.rs:3:12 | 3 | #![feature(cfg_target_has_atomic)] | ^^^^^^^^^^^^^^^^^^^^^ ``` --- futures/tests/no-std/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/futures/tests/no-std/src/lib.rs b/futures/tests/no-std/src/lib.rs index 308218d6b7..89a8fa1ff1 100644 --- a/futures/tests/no-std/src/lib.rs +++ b/futures/tests/no-std/src/lib.rs @@ -1,6 +1,5 @@ #![cfg(nightly)] #![no_std] -#![feature(cfg_target_has_atomic)] #[cfg(feature = "futures-core-alloc")] #[cfg(target_has_atomic = "ptr")] From a619ea77290d93f7d83c925b5656a3196fdd875d Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 14 Feb 2022 01:20:59 +0900 Subject: [PATCH 03/33] Fix clippy::single_match warning ``` error: you seem to be trying to use `match` for destructuring a single pattern. Consider using `if let` --> futures-executor/src/thread_pool.rs:349:9 | 349 | / match arc_self.mutex.notify() { 350 | | Ok(task) => arc_self.exec.state.send(Message::Run(task)), 351 | | Err(()) => {} 352 | | } | |_________^ help: try this: `if let Ok(task) = arc_self.mutex.notify() { arc_self.exec.state.send(Message::Run(task)) }` | = note: `-D clippy::single-match` implied by `-D warnings` = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#single_match ``` --- futures-executor/src/thread_pool.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 5e1f586eb8..3eca9d1440 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -346,9 +346,8 @@ impl fmt::Debug for Task { impl ArcWake for WakeHandle { fn wake_by_ref(arc_self: &Arc) { - match arc_self.mutex.notify() { - Ok(task) => arc_self.exec.state.send(Message::Run(task)), - Err(()) => {} + if let Ok(task) = arc_self.mutex.notify() { + arc_self.exec.state.send(Message::Run(task)) } } } From cff6c4c9277673eca777a04e59e35f8fa00683eb Mon Sep 17 00:00:00 2001 From: Bill Fraser Date: Wed, 23 Feb 2022 04:22:06 -0800 Subject: [PATCH 04/33] FuturesUnordered: fix partial iteration (#2574) The IntoIter impl advances the head pointer every iteration, but this breaks the linked list invariant that the head's prev should be null. If the iteration is not done to completion, on subsequent drop, FuturesUnordered::unlink relies on this broken invariant and ends up panicking. The fix is to maintain the `head->prev == null` invariant while iterating. Also added a test for this bug. --- futures-util/src/stream/futures_unordered/iter.rs | 4 ++++ futures/tests/stream_futures_unordered.rs | 14 ++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 04db5ee753..20248c70fe 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -2,6 +2,7 @@ use super::task::Task; use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; +use core::ptr; use core::sync::atomic::Ordering::Relaxed; /// Mutable iterator over all futures in the unordered set. @@ -58,6 +59,9 @@ impl Iterator for IntoIter { // valid `next_all` checks can be skipped. let next = (**task).next_all.load(Relaxed); *task = next; + if !task.is_null() { + *(**task).prev_all.get() = ptr::null_mut(); + } self.len -= 1; Some(future) } diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index f62f733610..398170a7cf 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -261,6 +261,20 @@ fn into_iter_len() { assert!(into_iter.next().is_none()); } +#[test] +fn into_iter_partial() { + let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)] + .into_iter() + .collect::>(); + + let mut into_iter = stream.into_iter(); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + // don't panic when iterator is dropped before completing +} + #[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, From 589687b3f8534a55a4910415c0ea4ca1715e0ce7 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Mar 2022 17:32:18 +0000 Subject: [PATCH 05/33] Shared: fix false detection of inner panics (#2576) Fixes #2575. --- futures-util/src/future/future/shared.rs | 22 +++++++++------- futures/tests/future_shared.rs | 32 ++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index 9b31932fe3..985931564b 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -262,19 +262,20 @@ where let waker = waker_ref(&inner.notifier); let mut cx = Context::from_waker(&waker); - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, + } impl Drop for Reset<'_> { fn drop(&mut self) { - use std::thread; - - if thread::panicking() { - self.0.store(POISONED, SeqCst); + if !self.did_not_panic { + self.state.store(POISONED, SeqCst); } } } - let _reset = Reset(&inner.notifier.state); + let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; let output = { let future = unsafe { @@ -284,12 +285,15 @@ where } }; - match future.poll(&mut cx) { + let poll_result = future.poll(&mut cx); + reset.did_not_panic = true; + + match poll_result { Poll::Pending => { if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success - drop(_reset); + drop(reset); this.inner = Some(inner); return Poll::Pending; } else { @@ -313,7 +317,7 @@ where waker.wake(); } - drop(_reset); // Make borrow checker happy + drop(reset); // Make borrow checker happy drop(wakers_guard); // Safety: We're in the COMPLETE state diff --git a/futures/tests/future_shared.rs b/futures/tests/future_shared.rs index 3ceaebb5c8..20a021bc35 100644 --- a/futures/tests/future_shared.rs +++ b/futures/tests/future_shared.rs @@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool}; use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; use futures::task::LocalSpawn; use std::cell::{Cell, RefCell}; +use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::task::Poll; use std::thread; @@ -194,3 +195,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() { // has returned pending assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); } + +#[test] +#[should_panic(expected = "inner future panicked during poll")] +fn panic_while_poll() { + let fut = futures::future::poll_fn::(|_cx| panic!("test")).shared(); + + let fut_captured = fut.clone(); + std::panic::catch_unwind(AssertUnwindSafe(|| { + block_on(fut_captured); + })) + .unwrap_err(); + + block_on(fut); +} + +#[test] +#[should_panic(expected = "test_marker")] +fn poll_while_panic() { + struct S; + + impl Drop for S { + fn drop(&mut self) { + let fut = futures::future::ready(1).shared(); + assert_eq!(block_on(fut.clone()), 1); + assert_eq!(block_on(fut), 1); + } + } + + let _s = S {}; + panic!("test_marker"); +} From 3cefb033cd4d5f17957eb26b3e4a2565ea1d4241 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Tue, 8 Mar 2022 21:01:21 +0800 Subject: [PATCH 06/33] Feat: add Mutex::lock_owned and Mutex::try_lock_owned (#2571) --- futures-util/src/lock/mod.rs | 22 ++--- futures-util/src/lock/mutex.rs | 155 +++++++++++++++++++++++++++++++-- 2 files changed, 162 insertions(+), 15 deletions(-) diff --git a/futures-util/src/lock/mod.rs b/futures-util/src/lock/mod.rs index cf374c016f..0be72717c8 100644 --- a/futures-util/src/lock/mod.rs +++ b/futures-util/src/lock/mod.rs @@ -4,11 +4,18 @@ //! library is activated, and it is activated by default. #[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "std")] -mod mutex; +#[cfg(any(feature = "sink", feature = "io"))] +#[cfg(not(feature = "bilock"))] +pub(crate) use self::bilock::BiLock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "std")] -pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; +pub use self::mutex::{ + MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] @@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; #[cfg(not(futures_no_atomic_cas))] -#[cfg(any(feature = "sink", feature = "io"))] -#[cfg(not(feature = "bilock"))] -pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "bilock")] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; +#[cfg(feature = "std")] +mod mutex; diff --git a/futures-util/src/lock/mutex.rs b/futures-util/src/lock/mutex.rs index 85dcb1537b..335ad14273 100644 --- a/futures-util/src/lock/mutex.rs +++ b/futures-util/src/lock/mutex.rs @@ -1,14 +1,16 @@ -use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll, Waker}; -use slab::Slab; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex as StdMutex; +use std::sync::{Arc, Mutex as StdMutex}; use std::{fmt, mem}; +use slab::Slab; + +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; + /// A futures-aware mutex. /// /// # Fairness @@ -107,6 +109,18 @@ impl Mutex { } } + /// Attempt to acquire the lock immediately. + /// + /// If the lock is currently held, this will return `None`. + pub fn try_lock_owned(self: &Arc) -> Option> { + let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); + if (old_state & IS_LOCKED) == 0 { + Some(OwnedMutexGuard { mutex: self.clone() }) + } else { + None + } + } + /// Acquire the lock asynchronously. /// /// This method returns a future that will resolve once the lock has been @@ -115,6 +129,14 @@ impl Mutex { MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } + /// Acquire the lock asynchronously. + /// + /// This method returns a future that will resolve once the lock has been + /// successfully acquired. + pub fn lock_owned(self: Arc) -> OwnedMutexLockFuture { + OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } + } + /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `Mutex` mutably, no actual locking needs to @@ -173,7 +195,118 @@ impl Mutex { } // Sentinel for when no slot in the `Slab` has been dedicated to this object. -const WAIT_KEY_NONE: usize = usize::max_value(); +const WAIT_KEY_NONE: usize = usize::MAX; + +/// A future which resolves when the target mutex has been successfully acquired, owned version. +pub struct OwnedMutexLockFuture { + // `None` indicates that the mutex was successfully acquired. + mutex: Option>>, + wait_key: usize, +} + +impl fmt::Debug for OwnedMutexLockFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexLockFuture") + .field("was_acquired", &self.mutex.is_none()) + .field("mutex", &self.mutex) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) + .finish() + } +} + +impl FusedFuture for OwnedMutexLockFuture { + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl Future for OwnedMutexLockFuture { + type Output = OwnedMutexGuard; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion"); + + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + { + let mut waiters = mutex.waiters.lock().unwrap(); + if this.wait_key == WAIT_KEY_NONE { + this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); + if waiters.len() == 1 { + mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock + } + } else { + waiters[this.wait_key].register(cx.waker()); + } + } + + // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by + // attempting to acquire the lock again. + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + Poll::Pending + } +} + +impl Drop for OwnedMutexLockFuture { + fn drop(&mut self) { + if let Some(mutex) = self.mutex.as_ref() { + // This future was dropped before it acquired the mutex. + // + // Remove ourselves from the map, waking up another waiter if we + // had been awoken to acquire the lock. + mutex.remove_waker(self.wait_key, true); + } + } +} + +/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods. +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +pub struct OwnedMutexGuard { + mutex: Arc>, +} + +impl fmt::Debug for OwnedMutexGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexGuard") + .field("value", &&**self) + .field("mutex", &self.mutex) + .finish() + } +} + +impl Drop for OwnedMutexGuard { + fn drop(&mut self) { + self.mutex.unlock() + } +} + +impl Deref for OwnedMutexGuard { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl DerefMut for OwnedMutexGuard { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} /// A future which resolves when the target mutex has been successfully acquired. pub struct MutexLockFuture<'a, T: ?Sized> { @@ -386,13 +519,25 @@ unsafe impl Sync for Mutex {} // It's safe to switch which thread the acquire is being attempted on so long as // `T` can be accessed on that thread. unsafe impl Send for MutexLockFuture<'_, T> {} + // doesn't have any interesting `&self` methods (only Debug) unsafe impl Sync for MutexLockFuture<'_, T> {} +// It's safe to switch which thread the acquire is being attempted on so long as +// `T` can be accessed on that thread. +unsafe impl Send for OwnedMutexLockFuture {} + +// doesn't have any interesting `&self` methods (only Debug) +unsafe impl Sync for OwnedMutexLockFuture {} + // Safe to send since we don't track any thread-specific details-- the inner // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) unsafe impl Send for MutexGuard<'_, T> {} unsafe impl Sync for MutexGuard<'_, T> {} + +unsafe impl Send for OwnedMutexGuard {} +unsafe impl Sync for OwnedMutexGuard {} + unsafe impl Send for MappedMutexGuard<'_, T, U> {} unsafe impl Sync for MappedMutexGuard<'_, T, U> {} From a8b98aa80c9b29a5efaff8c3ebc9fff87a61dc25 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 15 Mar 2022 01:28:29 +0000 Subject: [PATCH 07/33] Update no_atomic_cas.rs --- no_atomic_cas.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs index 9b05d4b9f4..341e6ac7d2 100644 --- a/no_atomic_cas.rs +++ b/no_atomic_cas.rs @@ -7,6 +7,7 @@ const NO_ATOMIC_CAS: &[&str] = &[ "bpfel-unknown-none", "msp430-none-elf", "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", "riscv32imc-unknown-none-elf", "thumbv4t-none-eabi", "thumbv6m-none-eabi", From 03e618bc9368173fd23b9565eda90b40ab95c6f6 Mon Sep 17 00:00:00 2001 From: Tim Leslie Date: Tue, 15 Mar 2022 15:30:35 +1100 Subject: [PATCH 08/33] Fix sentence in try_buffered docs. (#2579) Updates the docs to use the same phrasing as `.buffered()` https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffered --- futures-util/src/stream/try_stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 6bf2cb74a5..72a74f2166 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -918,7 +918,7 @@ pub trait TryStreamExt: TryStream { /// that matches the stream's `Error` type. /// /// This adaptor will buffer up to `n` futures and then return their - /// outputs in the order. If the underlying stream returns an error, it will + /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will /// be immediately propagated. /// /// The returned stream will be a stream of results, each containing either From d00d9355448f4f131bb1cd37e04b18de52812660 Mon Sep 17 00:00:00 2001 From: Owen Shepherd <414owen@gmail.com> Date: Mon, 21 Mar 2022 10:03:33 +0000 Subject: [PATCH 09/33] Add `select` benchmark (#2582) --- futures-util/benches/select.rs | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 futures-util/benches/select.rs diff --git a/futures-util/benches/select.rs b/futures-util/benches/select.rs new file mode 100644 index 0000000000..5410a95299 --- /dev/null +++ b/futures-util/benches/select.rs @@ -0,0 +1,35 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::stream::{repeat, select, StreamExt}; + +#[bench] +fn select_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + + b.iter(|| { + let stream1 = repeat(1).take(STREAM_COUNT); + let stream2 = repeat(2).take(STREAM_COUNT); + let stream3 = repeat(3).take(STREAM_COUNT); + let stream4 = repeat(4).take(STREAM_COUNT); + let stream5 = repeat(5).take(STREAM_COUNT); + let stream6 = repeat(6).take(STREAM_COUNT); + let stream7 = repeat(7).take(STREAM_COUNT); + let count = block_on(async { + let count = select( + stream1, + select( + stream2, + select(stream3, select(stream4, select(stream5, select(stream6, stream7)))), + ), + ) + .count() + .await; + count + }); + assert_eq!(count, STREAM_COUNT * 7); + }); +} From bd8ddd24841297632f669e3368a23f929f729200 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 21 Mar 2022 20:53:57 +0900 Subject: [PATCH 10/33] Run more tests with Miri (#2584) --- .github/workflows/ci.yml | 4 +- futures-executor/Cargo.toml | 2 +- futures-executor/src/thread_pool.rs | 28 ++++++++----- futures-executor/tests/local_pool.rs | 3 ++ futures-util/src/task/spawn.rs | 9 ++-- futures/src/lib.rs | 4 +- futures/tests/eventual.rs | 9 ++-- futures/tests/future_shared.rs | 1 - futures/tests/lock_mutex.rs | 50 ++++++++++++----------- futures/tests/macro_comma_support.rs | 1 - futures/tests/recurse.rs | 1 - futures/tests/sink.rs | 1 - futures/tests/stream_futures_ordered.rs | 2 - futures/tests/stream_futures_unordered.rs | 2 - futures/tests/stream_try_stream.rs | 2 - 15 files changed, 64 insertions(+), 55 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dfb7eea4ff..110b1b7c0a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -273,10 +273,10 @@ jobs: - uses: actions/checkout@v2 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - # futures-executor uses boxed futures so many tests trigger https://github.com/rust-lang/miri/issues/1038 - - run: cargo miri test --workspace --exclude futures-executor --all-features + - run: cargo miri test --workspace --all-features env: MIRIFLAGS: -Zmiri-check-number-validity -Zmiri-symbolic-alignment-check -Zmiri-tag-raw-pointers -Zmiri-disable-isolation + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout san: name: cargo test -Z sanitizer=${{ matrix.sanitizer }} diff --git a/futures-executor/Cargo.toml b/futures-executor/Cargo.toml index dae5f22e77..e1ce688581 100644 --- a/futures-executor/Cargo.toml +++ b/futures-executor/Cargo.toml @@ -22,7 +22,7 @@ futures-util = { path = "../futures-util", version = "0.3.21", default-features num_cpus = { version = "1.8.0", optional = true } [dev-dependencies] -futures = { path = "../futures" } +futures = { path = "../futures", features = ["thread-pool"] } [package.metadata.docs.rs] all-features = true diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 3eca9d1440..8c93b476bc 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -108,12 +108,15 @@ impl ThreadPool { /// completion. /// /// ``` + /// # { /// use futures::executor::ThreadPool; /// /// let pool = ThreadPool::new().unwrap(); /// /// let future = async { /* ... */ }; /// pool.spawn_ok(future); + /// # } + /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed /// ``` /// /// > **Note**: This method is similar to `SpawnExt::spawn`, except that @@ -359,16 +362,19 @@ mod tests { #[test] fn test_drop_after_start() { - let (tx, rx) = mpsc::sync_channel(2); - let _cpu_pool = ThreadPoolBuilder::new() - .pool_size(2) - .after_start(move |_| tx.send(1).unwrap()) - .create() - .unwrap(); - - // After ThreadPoolBuilder is deconstructed, the tx should be dropped - // so that we can use rx as an iterator. - let count = rx.into_iter().count(); - assert_eq!(count, 2); + { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = ThreadPoolBuilder::new() + .pool_size(2) + .after_start(move |_| tx.send(1).unwrap()) + .create() + .unwrap(); + + // After ThreadPoolBuilder is deconstructed, the tx should be dropped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } + std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed } } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 9b1316b998..8e5e27981d 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -288,6 +288,9 @@ fn run_until_stalled_runs_spawned_sub_futures() { #[test] fn run_until_stalled_executes_all_ready() { + #[cfg(miri)] + const ITER: usize = 50; + #[cfg(not(miri))] const ITER: usize = 200; const PER_ITER: usize = 3; diff --git a/futures-util/src/task/spawn.rs b/futures-util/src/task/spawn.rs index 87ca360516..8e78717c27 100644 --- a/futures-util/src/task/spawn.rs +++ b/futures-util/src/task/spawn.rs @@ -34,7 +34,7 @@ pub trait SpawnExt: Spawn { /// today. Feel free to use this method in the meantime. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 + /// # { /// use futures::executor::ThreadPool; /// use futures::task::SpawnExt; /// @@ -42,6 +42,8 @@ pub trait SpawnExt: Spawn { /// /// let future = async { /* ... */ }; /// executor.spawn(future).unwrap(); + /// # } + /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed /// ``` #[cfg(feature = "alloc")] fn spawn(&self, future: Fut) -> Result<(), SpawnError> @@ -59,7 +61,7 @@ pub trait SpawnExt: Spawn { /// resolves to the output of the spawned future. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 + /// # { /// use futures::executor::{block_on, ThreadPool}; /// use futures::future; /// use futures::task::SpawnExt; @@ -69,6 +71,8 @@ pub trait SpawnExt: Spawn { /// let future = future::ready(1); /// let join_handle_fut = executor.spawn_with_handle(future).unwrap(); /// assert_eq!(block_on(join_handle_fut), 1); + /// # } + /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed /// ``` #[cfg(feature = "channel")] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] @@ -138,7 +142,6 @@ pub trait LocalSpawnExt: LocalSpawn { /// resolves to the output of the spawned future. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 /// use futures::executor::LocalPool; /// use futures::task::LocalSpawnExt; /// diff --git a/futures/src/lib.rs b/futures/src/lib.rs index b8ebc614e2..3ae9091dca 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -25,13 +25,13 @@ //! within macros and keywords such as async and await!. //! //! ```rust -//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 //! # use futures::channel::mpsc; //! # use futures::executor; ///standard executors to provide a context for futures and streams //! # use futures::executor::ThreadPool; //! # use futures::StreamExt; //! # //! fn main() { +//! # { //! let pool = ThreadPool::new().expect("Failed to build pool"); //! let (tx, rx) = mpsc::unbounded::(); //! @@ -73,6 +73,8 @@ //! let values: Vec = executor::block_on(fut_values); //! //! println!("Values={:?}", values); +//! # } +//! # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed //! } //! ``` //! diff --git a/futures/tests/eventual.rs b/futures/tests/eventual.rs index 34613806c4..96e21a12a4 100644 --- a/futures/tests/eventual.rs +++ b/futures/tests/eventual.rs @@ -1,5 +1,3 @@ -#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 - use futures::channel::oneshot; use futures::executor::ThreadPool; use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; @@ -136,6 +134,11 @@ fn select3() { #[test] fn select4() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + let (tx, rx) = mpsc::channel::>(); let t = thread::spawn(move || { @@ -145,7 +148,7 @@ fn select4() { }); let (tx2, rx2) = mpsc::channel(); - for _ in 0..10000 { + for _ in 0..N { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); diff --git a/futures/tests/future_shared.rs b/futures/tests/future_shared.rs index 20a021bc35..6bf43d23cf 100644 --- a/futures/tests/future_shared.rs +++ b/futures/tests/future_shared.rs @@ -97,7 +97,6 @@ fn drop_in_poll() { assert_eq!(block_on(future1), 1); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn peek() { let mut local_pool = LocalPool::new(); diff --git a/futures/tests/lock_mutex.rs b/futures/tests/lock_mutex.rs index c92ef50ad8..0bd2607565 100644 --- a/futures/tests/lock_mutex.rs +++ b/futures/tests/lock_mutex.rs @@ -34,34 +34,36 @@ fn mutex_wakes_waiters() { assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn mutex_contested() { - let (tx, mut rx) = mpsc::unbounded(); - let pool = ThreadPool::builder().pool_size(16).create().unwrap(); + { + let (tx, mut rx) = mpsc::unbounded(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); - let tx = Arc::new(tx); - let mutex = Arc::new(Mutex::new(0)); + let tx = Arc::new(tx); + let mutex = Arc::new(Mutex::new(0)); - let num_tasks = 1000; - for _ in 0..num_tasks { - let tx = tx.clone(); - let mutex = mutex.clone(); - pool.spawn(async move { - let mut lock = mutex.lock().await; - ready(()).pending_once().await; - *lock += 1; - tx.unbounded_send(()).unwrap(); - drop(lock); - }) - .unwrap(); - } - - block_on(async { + let num_tasks = 1000; for _ in 0..num_tasks { - rx.next().await.unwrap(); + let tx = tx.clone(); + let mutex = mutex.clone(); + pool.spawn(async move { + let mut lock = mutex.lock().await; + ready(()).pending_once().await; + *lock += 1; + tx.unbounded_send(()).unwrap(); + drop(lock); + }) + .unwrap(); } - let lock = mutex.lock().await; - assert_eq!(num_tasks, *lock); - }) + + block_on(async { + for _ in 0..num_tasks { + rx.next().await.unwrap(); + } + let lock = mutex.lock().await; + assert_eq!(num_tasks, *lock); + }); + } + std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed } diff --git a/futures/tests/macro_comma_support.rs b/futures/tests/macro_comma_support.rs index 3b082d211f..85871e98be 100644 --- a/futures/tests/macro_comma_support.rs +++ b/futures/tests/macro_comma_support.rs @@ -14,7 +14,6 @@ fn ready() { })) } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn poll() { use futures::poll; diff --git a/futures/tests/recurse.rs b/futures/tests/recurse.rs index f06524f85a..d81753c9d7 100644 --- a/futures/tests/recurse.rs +++ b/futures/tests/recurse.rs @@ -3,7 +3,6 @@ use futures::future::{self, BoxFuture, FutureExt}; use std::sync::mpsc; use std::thread; -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn lots() { #[cfg(not(futures_sanitizer))] diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index dc826bda98..f3cf11b931 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -288,7 +288,6 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn with_flush() { let (tx, rx) = oneshot::channel(); diff --git a/futures/tests/stream_futures_ordered.rs b/futures/tests/stream_futures_ordered.rs index 84e0bcc1df..7506c65a63 100644 --- a/futures/tests/stream_futures_ordered.rs +++ b/futures/tests/stream_futures_ordered.rs @@ -26,7 +26,6 @@ fn works_1() { assert_eq!(None, iter.next()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::(); @@ -55,7 +54,6 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn queue_never_unblocked() { let (_a_tx, a_rx) = oneshot::channel::>(); diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 398170a7cf..b568280479 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -56,7 +56,6 @@ fn works_1() { assert_eq!(None, iter.next()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::(); @@ -86,7 +85,6 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn finished_future() { let (_a_tx, a_rx) = oneshot::channel::(); diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index d83fc54b1c..194e74db74 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -1,5 +1,3 @@ -#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 - use futures::{ stream::{self, StreamExt, TryStreamExt}, task::Poll, From b3a334276af04478b5c15b17a180056c5d7d090f Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 16 Apr 2022 23:59:20 +0900 Subject: [PATCH 11/33] Update MIRIFLAGS (#2589) --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 110b1b7c0a..8c83181a6f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -275,7 +275,7 @@ jobs: run: rustup toolchain install nightly --component miri && rustup default nightly - run: cargo miri test --workspace --all-features env: - MIRIFLAGS: -Zmiri-check-number-validity -Zmiri-symbolic-alignment-check -Zmiri-tag-raw-pointers -Zmiri-disable-isolation + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout san: From f54c3667fd40e7e32f107cf88ce5de3ae20db815 Mon Sep 17 00:00:00 2001 From: maleicacid <4982384+kazuki0824@users.noreply.github.com> Date: Tue, 19 Apr 2022 10:53:15 +0900 Subject: [PATCH 12/33] Create copy_buf_abortable, which enables to stop copying in the middle (#2507) --- futures-util/src/abortable.rs | 8 +- futures-util/src/io/copy_buf_abortable.rs | 124 ++++++++++++++++++++++ futures-util/src/io/mod.rs | 3 + 3 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 futures-util/src/io/copy_buf_abortable.rs diff --git a/futures-util/src/abortable.rs b/futures-util/src/abortable.rs index bb82dd0db8..e0afd47218 100644 --- a/futures-util/src/abortable.rs +++ b/futures-util/src/abortable.rs @@ -75,7 +75,7 @@ impl Abortable { /// in calls to `Abortable::new`. #[derive(Debug)] pub struct AbortRegistration { - inner: Arc, + pub(crate) inner: Arc, } /// A handle to an `Abortable` task. @@ -100,9 +100,9 @@ impl AbortHandle { // Inner type storing the waker to awaken and a bool indicating that it // should be aborted. #[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - aborted: AtomicBool, +pub(crate) struct AbortInner { + pub(crate) waker: AtomicWaker, + pub(crate) aborted: AtomicBool, } /// Indicator that the `Abortable` task was aborted. diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs new file mode 100644 index 0000000000..fdbc4a5f00 --- /dev/null +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -0,0 +1,124 @@ +use crate::abortable::{AbortHandle, AbortInner, Aborted}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`. +/// +/// The returned future will copy all the bytes read from this `AsyncBufRead` into the +/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// use futures::future::Aborted; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer); +/// let bytes = fut.await; +/// abort_handle.abort(); +/// writer.close().await.unwrap(); +/// match bytes { +/// Ok(Ok(n)) => { +/// assert_eq!(n, 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// Ok(n) +/// }, +/// Ok(Err(a)) => { +/// Err::(a) +/// } +/// Err(e) => panic!("{}", e) +/// } +/// # }).unwrap(); +/// ``` +pub fn copy_buf_abortable( + reader: R, + writer: &mut W, +) -> (CopyBufAbortable<'_, R, W>, AbortHandle) +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + let (handle, reg) = AbortHandle::new_pair(); + (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle) +} + +pin_project! { + /// Future for the [`copy_buf()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBufAbortable<'a, R, W: ?Sized> { + #[pin] + reader: R, + writer: &'a mut W, + amt: u64, + inner: Arc + } +} + +macro_rules! ready_or_break { + ($e:expr $(,)?) => { + match $e { + $crate::task::Poll::Ready(t) => t, + $crate::task::Poll::Pending => break, + } + }; +} + +impl Future for CopyBufAbortable<'_, R, W> +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + Sized, +{ + type Output = Result, io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + // Check if the task has been aborted + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + + // Read some bytes from the reader, and if we have reached EOF, return total bytes read + let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?; + return Poll::Ready(Ok(Ok(*this.amt))); + } + + // Pass the buffer to the writer, and update the amount written + let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *this.amt += i as u64; + this.reader.as_mut().consume(i); + } + // Schedule the task to be woken up again. + // Never called unless Poll::Pending is returned from io objects. + this.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + Poll::Pending + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 4dd2e029bf..8ce3ad644b 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -66,6 +66,9 @@ pub use self::copy::{copy, Copy}; mod copy_buf; pub use self::copy_buf::{copy_buf, CopyBuf}; +mod copy_buf_abortable; +pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable}; + mod cursor; pub use self::cursor::Cursor; From 628881fccc276894daed07d3c3879824e785ec48 Mon Sep 17 00:00:00 2001 From: Owen Shepherd Date: Fri, 6 May 2022 00:06:18 +0100 Subject: [PATCH 13/33] Remove `Fuse`s from `select`, and only poll non-terminated streams (#2583) --- futures-util/Cargo.toml | 2 +- .../src/stream/select_with_strategy.rs | 140 +++++++++++++----- 2 files changed, 106 insertions(+), 36 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 46ec854b04..e32b642aa2 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -45,7 +45,7 @@ memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" -pin-project-lite = "0.2.4" +pin-project-lite = "0.2.6" [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index bd86990cdb..7423519df1 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -1,5 +1,4 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -18,13 +17,15 @@ impl PollNext { /// Toggle the value and return the old one. pub fn toggle(&mut self) -> Self { let old = *self; + *self = self.other(); + old + } + fn other(&self) -> PollNext { match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, } - - old } } @@ -34,14 +35,41 @@ impl Default for PollNext { } } +enum InternalState { + Start, + LeftFinished, + RightFinished, + BothFinished, +} + +impl InternalState { + fn finish(&mut self, ps: PollNext) { + match (&self, ps) { + (InternalState::Start, PollNext::Left) => { + *self = InternalState::LeftFinished; + } + (InternalState::Start, PollNext::Right) => { + *self = InternalState::RightFinished; + } + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { + *self = InternalState::BothFinished; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy { #[pin] - stream1: Fuse, + stream1: St1, #[pin] - stream2: Fuse, + stream2: St2, + internal_state: InternalState, state: State, clos: Clos, } @@ -120,9 +148,10 @@ where State: Default, { assert_stream::(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), + internal_state: InternalState::Start, clos: which, }) } @@ -131,7 +160,7 @@ impl SelectWithStrategy { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -140,7 +169,7 @@ impl SelectWithStrategy { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -150,7 +179,7 @@ impl SelectWithStrategy { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -158,7 +187,7 @@ impl SelectWithStrategy { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } @@ -169,47 +198,88 @@ where Clos: FnMut(&mut State) -> PollNext, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + match self.internal_state { + InternalState::BothFinished => true, + _ => false, + } } } -impl Stream for SelectWithStrategy +#[inline] +fn poll_side( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, + cx: &mut Context<'_>, +) -> Poll> where St1: Stream, St2: Stream, - Clos: FnMut(&mut State) -> PollNext, { - type Item = St1::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), - } + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), } } -fn poll_inner( - a: Pin<&mut St1>, - b: Pin<&mut St2>, +#[inline] +fn poll_inner( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, cx: &mut Context<'_>, ) -> Poll> where St1: Stream, St2: Stream, { - let a_done = match a.poll_next(cx) { + match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, - Poll::Pending => false, + Poll::Ready(None) => { + select.internal_state.finish(side); + } + Poll::Pending => (), }; + let other = side.other(); + match poll_side(select, other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(other); + Poll::Ready(None) + } + a => a, + } +} + +impl Stream for SelectWithStrategy +where + St1: Stream, + St2: Stream, + Clos: FnMut(&mut State) -> PollNext, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + match this.internal_state { + InternalState::Start => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } + InternalState::LeftFinished => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::RightFinished => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::BothFinished => Poll::Ready(None), + } } } From d11fd0074c90d91a687ff8662860d3d361a26b37 Mon Sep 17 00:00:00 2001 From: khollbach Date: Wed, 11 May 2022 15:56:40 -0400 Subject: [PATCH 14/33] Remove TryStreamExt::into_async_read Unpin bound (#2599) --- .../src/stream/try_stream/into_async_read.rs | 101 +++++++++--------- futures-util/src/stream/try_stream/mod.rs | 17 ++- 2 files changed, 57 insertions(+), 61 deletions(-) diff --git a/futures-util/src/stream/try_stream/into_async_read.rs b/futures-util/src/stream/try_stream/into_async_read.rs index 914b277a02..ffbfc7eae9 100644 --- a/futures-util/src/stream/try_stream/into_async_read.rs +++ b/futures-util/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState, -} - -impl Unpin for IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead + where + St: TryStream, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState> { impl IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl AsyncRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl AsyncWrite for IntoAsyncRead where - St: TryStream + AsyncWrite + Unpin, + St: TryStream + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_close(cx) } } impl AsyncBufRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 72a74f2166..bc4c6e4f6a 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -1031,12 +1031,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1048,12 +1043,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1061,7 +1056,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead where - Self: Sized + TryStreamExt + Unpin, + Self: Sized + TryStreamExt, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self)) From b7a4e59984a679d75e2b987c1aea78ac7e365f45 Mon Sep 17 00:00:00 2001 From: khollbach Date: Wed, 11 May 2022 17:32:18 -0400 Subject: [PATCH 15/33] Make run_until_stalled handle self-waking futures (#2593) LocalPool::try_run_one and run_until_stalled now correctly re-try when a future "yields" by calling wake and returning Pending. --- futures-executor/src/local_pool.rs | 94 +++++++++++++++------------- futures-executor/tests/local_pool.rs | 64 ++++++++++++++++++- 2 files changed, 113 insertions(+), 45 deletions(-) diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index bee96d8db9..9691060725 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -106,17 +106,9 @@ fn run_executor) -> Poll>(mut f: F) -> T { }) } -fn poll_executor) -> T>(mut f: F) -> T { - let _enter = enter().expect( - "cannot execute `LocalPool` executor from within \ - another executor", - ); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - f(&mut cx) - }) +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst)) } impl LocalPool { @@ -212,20 +204,26 @@ impl LocalPool { /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { - poll_executor(|ctx| { + run_executor(|cx| { loop { - let ret = self.poll_pool_once(ctx); - - // return if we have executed a future - if let Poll::Ready(Some(_)) = ret { - return true; + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), } - // if there are no new incoming futures - // then there is no feature that can make progress - // and we can return without having completed a single future - if self.incoming.borrow().is_empty() { - return false; + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); } } }) @@ -257,44 +255,52 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { - let _ = self.poll_pool(ctx); + run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } }); } - // Make maximal progress on the entire pool of spawned task, returning `Ready` - // if the pool is empty and `Pending` if no further progress can be made. + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // state for the FuturesUnordered, which will never be used loop { - let ret = self.poll_pool_once(cx); + self.drain_incoming(); - // we queued up some new tasks; add them and poll again + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. if !self.incoming.borrow().is_empty() { continue; } - // no queued tasks; we may be done - match ret { - Poll::Pending => return Poll::Pending, + match pool_ret { + Poll::Ready(Some(())) => continue, Poll::Ready(None) => return Poll::Ready(()), - _ => {} + Poll::Pending => return Poll::Pending, } } } - // Try make minimal progress on the pool of spawned tasks - fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll> { - // empty the incoming queue of newly-spawned tasks - { - let mut incoming = self.incoming.borrow_mut(); - for task in incoming.drain(..) { - self.pool.push(task) - } + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) } - - // try to execute the next ready future - self.pool.poll_next_unpin(cx) } } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 8e5e27981d..6e908d2444 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,7 +1,7 @@ use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, lazy, poll_fn, Future}; -use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker}; +use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; @@ -435,3 +435,65 @@ fn park_unpark_independence() { futures::executor::block_on(future) } + +struct SelfWaking { + wakeups_remaining: Rc>, +} + +impl Future for SelfWaking { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if *self.wakeups_remaining.borrow() != 0 { + *self.wakeups_remaining.borrow_mut() -= 1; + cx.waker().wake_by_ref(); + } + + Poll::Pending + } +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `run_until_stalled` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_run_until_stalled() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + // This should keep polling until there are no more wakeups. + pool.run_until_stalled(); + + assert_eq!(*wakeups_remaining.borrow(), 0); +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `try_run_one` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_try_run_one() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + spawner.spawn(future::ready(())).unwrap(); + + // The `ready` future should complete. + assert!(pool.try_run_one()); + + // The self-waking futures are each polled once. + assert_eq!(*wakeups_remaining.borrow(), 7); +} From dac38c0a8ad25e50870c9a62ccd1c5eb08e53a53 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 28 May 2022 18:54:24 +0900 Subject: [PATCH 16/33] Fix rustdoc::broken_intra_doc_links warning (#2602) ``` error: unresolved link to `select` --> futures-util/src/future/try_future/mod.rs:272:17 | 272 | /// using [`select!`] or [`join!`]. | ^^^^^^^ no item named `select` in scope | = note: `-D rustdoc::broken-intra-doc-links` implied by `-D warnings` = note: `macro_rules` named `select` exists in this crate, but it is not in scope at this link's location error: unresolved link to `join` --> futures-util/src/future/try_future/mod.rs:272:32 | 272 | /// using [`select!`] or [`join!`]. | ^^^^^ no item named `join` in scope | = note: `macro_rules` named `join` exists in this crate, but it is not in scope at this link's location error: unresolved link to `select` --> futures-util/src/future/try_future/mod.rs:320:27 | 320 | /// type when using [`select!`] or [`join!`]. | ^^^^^^^ no item named `select` in scope | = note: `macro_rules` named `select` exists in this crate, but it is not in scope at this link's location error: unresolved link to `join` --> futures-util/src/future/try_future/mod.rs:320:42 | 320 | /// type when using [`select!`] or [`join!`]. | ^^^^^ no item named `join` in scope | = note: `macro_rules` named `join` exists in this crate, but it is not in scope at this link's location error: unresolved link to `select` --> futures-util/src/stream/stream/mod.rs:1802:15 | 1802 | /// the [`select!`] macro. | ^^^^^^^ no item named `select` in scope | = note: `macro_rules` named `select` exists in this crate, but it is not in scope at this link's location ``` --- futures-util/src/future/try_future/mod.rs | 6 ++++++ futures-util/src/stream/stream/mod.rs | 2 ++ 2 files changed, 8 insertions(+) diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index fb3bdd8a02..e5bc700714 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture { /// assert_eq!(future.await, Ok(1)); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn map_err(self, f: F) -> MapErr where F: FnOnce(Self::Error) -> E, @@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture { /// let future_err_i32 = future_err_u8.err_into::(); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn err_into(self) -> ErrInto where Self: Sized, diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 642b91e823..a823fab123 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -1674,6 +1674,8 @@ pub trait StreamExt: Stream { /// assert_eq!(total, 6); /// # }); /// ``` + /// + /// [`select!`]: crate::select fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream, From fc55afb65e640897557d9df92d4d4729fbd68e52 Mon Sep 17 00:00:00 2001 From: Basti Ortiz <39114273+Some-Dood@users.noreply.github.com> Date: Sat, 28 May 2022 17:56:17 +0800 Subject: [PATCH 17/33] Refactor: prefer early return in `future::select` (#2587) Although a relatively small change, it makes the code a little bit more readable than the originally nested `match` expressions. --- futures-util/src/future/select.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index bd44f20f77..e693a30b00 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -100,16 +100,17 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), - Poll::Pending => match b.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending - } - }, + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, b))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, a))); } + + self.inner = Some((a, b)); + Poll::Pending } } From de5eee151b994ba68ef5055f606a69b4f0cd02dc Mon Sep 17 00:00:00 2001 From: Daniel Mangum <31777345+hasheddan@users.noreply.github.com> Date: Sun, 29 May 2022 10:39:26 -0500 Subject: [PATCH 18/33] Fix minor typo in enter doc comment (#2604) Fixes a minor typo by changing "a tasks" to "a task" in the documentation comment for enter. Signed-off-by: hasheddan --- futures-executor/src/enter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-executor/src/enter.rs b/futures-executor/src/enter.rs index 5895a9efb6..cb58c30bb7 100644 --- a/futures-executor/src/enter.rs +++ b/futures-executor/src/enter.rs @@ -34,7 +34,7 @@ impl std::error::Error for EnterError {} /// executor. /// /// Executor implementations should call this function before beginning to -/// execute a tasks, and drop the returned [`Enter`](Enter) value after +/// execute a task, and drop the returned [`Enter`](Enter) value after /// completing task execution: /// /// ``` From f81fdb451bf1b2ce9a5a0758f08a8e346e0daf44 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Sat, 4 Jun 2022 22:40:01 -0400 Subject: [PATCH 19/33] Switch to `FuturesOrdered` dynamically in `try_join_all` (#2556) --- futures-util/src/future/join_all.rs | 29 +++--- futures-util/src/future/try_join_all.rs | 123 +++++++++++++++++------- futures/tests/auto_traits.rs | 4 +- 3 files changed, 102 insertions(+), 54 deletions(-) diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 2e52ac17f4..7dc159ba07 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { +pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind +enum JoinAllKind where F: Future, { @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; + assert_future::::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::>().collect() }, }; + assert_future::::Output>, _>(JoinAll { kind }) } } diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 29244af837..25fcfcb6c2 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,14 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState { Pending, @@ -31,7 +28,20 @@ pub struct TryJoinAll where F: TryFuture, { - elems: Pin]>>, + kind: TryJoinAllKind, +} + +enum TryJoinAllKind +where + F: TryFuture, +{ + Small { + elems: Pin>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect>, Vec>, + }, } impl fmt::Debug for TryJoinAll @@ -39,9 +49,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -83,15 +100,37 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all(i: I) -> TryJoinAll +pub fn try_join_all(iter: I) -> TryJoinAll where I: IntoIterator, I::Item: TryFuture, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::::Ok>, ::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }; + + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, + }; + + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } } impl Future for TryJoinAll @@ -101,36 +140,46 @@ where type Output = Result, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut state = FinalState::AllDone; - - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl FromIterator for TryJoinAll { +impl FromIterator for TryJoinAll +where + F: TryFuture, +{ fn from_iter>(iter: T) -> Self { try_join_all(iter) } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index b3d8b00773..da00ccf40c 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -576,10 +576,10 @@ pub mod future { // TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin - assert_impl!(TryJoinAll>: Send); + assert_impl!(TryJoinAll>: Send); assert_not_impl!(TryJoinAll: Send); assert_not_impl!(TryJoinAll: Send); - assert_impl!(TryJoinAll>: Sync); + assert_impl!(TryJoinAll>: Sync); assert_not_impl!(TryJoinAll: Sync); assert_not_impl!(TryJoinAll: Sync); assert_impl!(TryJoinAll: Unpin); From 0db9749a40c4b09587ff19cd928c7e60e37a801c Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Sat, 4 Jun 2022 22:51:58 -0400 Subject: [PATCH 20/33] Fix orderings in `LocalPool` waker (#2608) --- futures-executor/src/local_pool.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 9691060725..ec1751c7c4 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -63,7 +63,7 @@ thread_local! { impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc) { // Make sure the wakeup is remembered until the next `park()`. - let unparked = arc_self.unparked.swap(true, Ordering::Relaxed); + let unparked = arc_self.unparked.swap(true, Ordering::Release); if !unparked { // If the thread has not been unparked yet, it must be done // now. If it was actually parked, it will run again, @@ -90,17 +90,13 @@ fn run_executor) -> Poll>(mut f: F) -> T { if let Poll::Ready(t) = f(&mut cx) { return t; } - // Consume the wakeup that occurred while executing `f`, if any. - let unparked = thread_notify.unparked.swap(false, Ordering::Acquire); - if !unparked { + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { // No wakeup occurred. It may occur now, right before parking, // but in that case the token made available by `unpark()` // is guaranteed to still be available and `park()` is a no-op. thread::park(); - // When the thread is unparked, `unparked` will have been set - // and needs to be unset before the next call to `f` to avoid - // a redundant loop iteration. - thread_notify.unparked.store(false, Ordering::Release); } } }) From 41b9aac3667bd7b39f0316eb7bab6aa336bef53a Mon Sep 17 00:00:00 2001 From: jefftt Date: Mon, 6 Jun 2022 10:26:07 -0400 Subject: [PATCH 21/33] stream: Fix Chunk adapters size hints (#2611) --- futures-util/src/stream/stream/chunks.rs | 2 +- futures-util/src/stream/stream/ready_chunks.rs | 2 +- futures-util/src/stream/try_stream/try_chunks.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/stream/chunks.rs b/futures-util/src/stream/stream/chunks.rs index 8457869999..e6d9118a0e 100644 --- a/futures-util/src/stream/stream/chunks.rs +++ b/futures-util/src/stream/stream/chunks.rs @@ -79,7 +79,7 @@ impl Stream for Chunks { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/futures-util/src/stream/stream/ready_chunks.rs b/futures-util/src/stream/stream/ready_chunks.rs index 5ebc9582db..44edc53521 100644 --- a/futures-util/src/stream/stream/ready_chunks.rs +++ b/futures-util/src/stream/stream/ready_chunks.rs @@ -87,7 +87,7 @@ impl Stream for ReadyChunks { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs index 07d4425a81..5b5ff1d6a6 100644 --- a/futures-util/src/stream/try_stream/try_chunks.rs +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -83,7 +83,7 @@ impl Stream for TryChunks { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, From 188ae1cdfdf5710d29a5e2efd5becf72bdeb8ef4 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Wed, 6 Jul 2022 03:28:07 +0900 Subject: [PATCH 22/33] Update minimal version of pin-project to 1.0.11 (#2619) --- futures-test/Cargo.toml | 2 +- futures/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index b5aa8a7dd1..90e50ff919 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -19,7 +19,7 @@ futures-executor = { version = "0.3.21", path = "../futures-executor", default-f futures-sink = { version = "0.3.21", path = "../futures-sink", default-features = false } futures-macro = { version = "=0.3.21", path = "../futures-macro", default-features = false } pin-utils = { version = "0.1.0", default-features = false } -pin-project = "1.0.1" +pin-project = "1.0.11" [dev-dependencies] futures = { path = "../futures", default-features = false, features = ["std", "executor"] } diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 6871f47eaa..b6ff22d387 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -27,7 +27,7 @@ futures-util = { path = "../futures-util", version = "0.3.21", default-features futures-executor = { path = "../futures-executor", features = ["thread-pool"] } futures-test = { path = "../futures-test" } assert_matches = "1.3.0" -pin-project = "1.0.1" +pin-project = "1.0.11" pin-utils = "0.1.0" static_assertions = "1" tokio = "0.1.11" From f338bd03f63f690a5cb02adfa13761a96ae24488 Mon Sep 17 00:00:00 2001 From: Conor Date: Thu, 7 Jul 2022 11:21:35 +1000 Subject: [PATCH 23/33] Add push_front and push_back to FuturesOrdered (#2591) --- futures-util/src/stream/futures_ordered.rs | 32 +++++++++- futures-util/src/stream/stream/buffered.rs | 2 +- .../src/stream/try_stream/try_buffered.rs | 2 +- futures/tests/stream_futures_ordered.rs | 64 +++++++++++++++++++ 4 files changed, 96 insertions(+), 4 deletions(-) diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index f596b3b0e3..f1c93fd683 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -135,11 +135,39 @@ impl FuturesOrdered { /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. + #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { + self.push_back(future); + } + + /// Pushes a future to the back of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } + + /// Pushes a future to the front of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. This future will be the next future to be returned + /// complete. + pub fn push_front(&mut self, future: Fut) { + if self.next_outgoing_index == 0 { + self.push_back(future) + } else { + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); + } + } } impl Default for FuturesOrdered { @@ -196,7 +224,7 @@ impl FromIterator for FuturesOrdered { { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { - acc.push(item); + acc.push_back(item); acc }) } @@ -214,7 +242,7 @@ impl Extend for FuturesOrdered { I: IntoIterator, { for item in iter { - self.push(item); + self.push_back(item); } } } diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 6052a737ba..8ca0391c55 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -64,7 +64,7 @@ where // our queue of futures. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 45bd3f8c7a..9f48e5c0a7 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -54,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures/tests/stream_futures_ordered.rs b/futures/tests/stream_futures_ordered.rs index 7506c65a63..8b85a3365a 100644 --- a/futures/tests/stream_futures_ordered.rs +++ b/futures/tests/stream_futures_ordered.rs @@ -2,6 +2,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, join, Future, FutureExt, TryFutureExt}; use futures::stream::{FuturesOrdered, StreamExt}; +use futures::task::Poll; use futures_test::task::noop_context; use std::any::Any; @@ -45,6 +46,69 @@ fn works_2() { assert!(stream.poll_next_unpin(&mut cx).is_ready()); } +#[test] +fn test_push_front() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // 1 and 2 should be received in order + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_front(d_rx); + d_tx.send(4).unwrap(); + + // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next + // and then 3 after it + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn test_push_back() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // All results should be received in order + + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_back(d_rx); + d_tx.send(4).unwrap(); + + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); +} + #[test] fn from_iterator() { let stream = vec![future::ready::(1), future::ready::(2), future::ready::(3)] From 76590e646774f69eaf5de06780ad3e4be2a9b2e5 Mon Sep 17 00:00:00 2001 From: Daniel Henry-Mantilla Date: Thu, 7 Jul 2022 03:52:12 +0200 Subject: [PATCH 24/33] `doc(alias)` `pending()` to `never` (#2613) It can be legitimately to look for the future that _never_ resolves using the name `never`. This PR ensures `pending()` shows up when making such queries. --- futures-util/src/future/pending.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/futures-util/src/future/pending.rs b/futures-util/src/future/pending.rs index 92c78d52b8..b8e28686e1 100644 --- a/futures-util/src/future/pending.rs +++ b/futures-util/src/future/pending.rs @@ -33,6 +33,7 @@ impl FusedFuture for Pending { /// unreachable!(); /// # }); /// ``` +#[cfg_attr(docsrs, doc(alias = "never"))] pub fn pending() -> Pending { assert_future::(Pending { _data: marker::PhantomData }) } From bedd2d8ca964d6607e39f72b8a952c993d12748a Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 9 Jul 2022 15:32:35 +0900 Subject: [PATCH 25/33] Fix miri test failure (#2621) --- futures-test/src/future/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs index ee5c6ddd5d..cf53e871de 100644 --- a/futures-test/src/future/mod.rs +++ b/futures-test/src/future/mod.rs @@ -68,6 +68,7 @@ pub trait FutureTestExt: Future { /// /// assert_eq!(rx.await, Ok(5)); /// # }); + /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed /// ``` fn run_in_background(self) where From ac207614e071b8b55a2787f864d84f72b1f028de Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 22 Jul 2022 22:10:05 +0900 Subject: [PATCH 26/33] Run more tests with Miri (#2624) --- futures-channel/tests/mpsc.rs | 31 +++++++--------------------- futures-channel/tests/oneshot.rs | 10 ++------- futures-executor/src/thread_pool.rs | 4 ++-- futures-executor/tests/local_pool.rs | 5 +---- futures-test/src/future/mod.rs | 2 +- futures-util/src/task/spawn.rs | 4 ++-- futures/src/lib.rs | 2 +- futures/tests/eventual.rs | 5 +---- futures/tests/lock_mutex.rs | 2 +- futures/tests/ready_queue.rs | 5 +---- futures/tests/task_atomic_waker.rs | 1 - 11 files changed, 19 insertions(+), 52 deletions(-) diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index da0899d491..444c8e10fd 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -200,10 +200,7 @@ fn tx_close_gets_none() { #[test] fn stress_shared_unbounded() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] - const AMT: u32 = 10000; + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::(); @@ -232,10 +229,7 @@ fn stress_shared_unbounded() { #[test] fn stress_shared_bounded_hard() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] - const AMT: u32 = 10000; + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::(0); @@ -265,10 +259,7 @@ fn stress_shared_bounded_hard() { #[allow(clippy::same_item_push)] #[test] fn stress_receiver_multi_task_bounded_hard() { - #[cfg(miri)] - const AMT: usize = 100; - #[cfg(not(miri))] - const AMT: usize = 10_000; + const AMT: usize = if cfg!(miri) { 100 } else { 10_000 }; const NTHREADS: u32 = 2; let (mut tx, rx) = mpsc::channel::(0); @@ -336,10 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() { /// after sender dropped. #[test] fn stress_drop_sender() { - #[cfg(miri)] - const ITER: usize = 100; - #[cfg(not(miri))] - const ITER: usize = 10000; + const ITER: usize = if cfg!(miri) { 100 } else { 10000 }; fn list() -> impl Stream { let (tx, rx) = mpsc::channel(1); @@ -394,10 +382,9 @@ fn stress_close_receiver_iter() { } } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_close_receiver() { - const ITER: usize = 10000; + const ITER: usize = if cfg!(miri) { 50 } else { 10000 }; for _ in 0..ITER { stress_close_receiver_iter(); @@ -414,10 +401,7 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender, count: u32) { #[allow(clippy::same_item_push)] #[test] fn stress_poll_ready() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] - const AMT: u32 = 1000; + const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; const NTHREADS: u32 = 8; /// Run a stress test using the specified channel capacity. @@ -444,10 +428,9 @@ fn stress_poll_ready() { stress(16); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn try_send_1() { - const N: usize = 3000; + const N: usize = if cfg!(miri) { 100 } else { 3000 }; let (mut tx, rx) = mpsc::channel(0); let t = thread::spawn(move || { diff --git a/futures-channel/tests/oneshot.rs b/futures-channel/tests/oneshot.rs index c9f5508973..6b48376dc0 100644 --- a/futures-channel/tests/oneshot.rs +++ b/futures-channel/tests/oneshot.rs @@ -35,10 +35,7 @@ fn cancel_notifies() { #[test] fn cancel_lots() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 20000; + const N: usize = if cfg!(miri) { 100 } else { 20000 }; let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); let t = thread::spawn(move || { @@ -106,10 +103,7 @@ fn is_canceled() { #[test] fn cancel_sends() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 20000; + const N: usize = if cfg!(miri) { 100 } else { 20000 }; let (tx, rx) = mpsc::channel::>(); let t = thread::spawn(move || { diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 8c93b476bc..5371008953 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -116,7 +116,7 @@ impl ThreadPool { /// let future = async { /* ... */ }; /// pool.spawn_ok(future); /// # } - /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` /// /// > **Note**: This method is similar to `SpawnExt::spawn`, except that @@ -375,6 +375,6 @@ mod tests { let count = rx.into_iter().count(); assert_eq!(count, 2); } - std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 6e908d2444..72ce74b744 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -288,10 +288,7 @@ fn run_until_stalled_runs_spawned_sub_futures() { #[test] fn run_until_stalled_executes_all_ready() { - #[cfg(miri)] - const ITER: usize = 50; - #[cfg(not(miri))] - const ITER: usize = 200; + const ITER: usize = if cfg!(miri) { 50 } else { 200 }; const PER_ITER: usize = 3; let cnt = Rc::new(Cell::new(0)); diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs index cf53e871de..0f52f62bb9 100644 --- a/futures-test/src/future/mod.rs +++ b/futures-test/src/future/mod.rs @@ -68,7 +68,7 @@ pub trait FutureTestExt: Future { /// /// assert_eq!(rx.await, Ok(5)); /// # }); - /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` fn run_in_background(self) where diff --git a/futures-util/src/task/spawn.rs b/futures-util/src/task/spawn.rs index 8e78717c27..d9e9985309 100644 --- a/futures-util/src/task/spawn.rs +++ b/futures-util/src/task/spawn.rs @@ -43,7 +43,7 @@ pub trait SpawnExt: Spawn { /// let future = async { /* ... */ }; /// executor.spawn(future).unwrap(); /// # } - /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "alloc")] fn spawn(&self, future: Fut) -> Result<(), SpawnError> @@ -72,7 +72,7 @@ pub trait SpawnExt: Spawn { /// let join_handle_fut = executor.spawn_with_handle(future).unwrap(); /// assert_eq!(block_on(join_handle_fut), 1); /// # } - /// # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "channel")] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 3ae9091dca..b972f51754 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -74,7 +74,7 @@ //! //! println!("Values={:?}", values); //! # } -//! # std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed +//! # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 //! } //! ``` //! diff --git a/futures/tests/eventual.rs b/futures/tests/eventual.rs index 96e21a12a4..951c55c214 100644 --- a/futures/tests/eventual.rs +++ b/futures/tests/eventual.rs @@ -134,10 +134,7 @@ fn select3() { #[test] fn select4() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 10000; + const N: usize = if cfg!(miri) { 100 } else { 10000 }; let (tx, rx) = mpsc::channel::>(); diff --git a/futures/tests/lock_mutex.rs b/futures/tests/lock_mutex.rs index 0bd2607565..c15e76bd84 100644 --- a/futures/tests/lock_mutex.rs +++ b/futures/tests/lock_mutex.rs @@ -65,5 +65,5 @@ fn mutex_contested() { assert_eq!(num_tasks, *lock); }); } - std::thread::sleep(std::time::Duration::from_secs(1)); // wait for background threads closed + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } diff --git a/futures/tests/ready_queue.rs b/futures/tests/ready_queue.rs index afba8f28b3..c19d62593c 100644 --- a/futures/tests/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -93,10 +93,7 @@ fn dropping_ready_queue() { #[test] fn stress() { - #[cfg(miri)] - const ITER: usize = 30; - #[cfg(not(miri))] - const ITER: usize = 300; + const ITER: usize = if cfg!(miri) { 30 } else { 300 }; for i in 0..ITER { let n = (i % 10) + 1; diff --git a/futures/tests/task_atomic_waker.rs b/futures/tests/task_atomic_waker.rs index 2d1612a45d..cec3db2876 100644 --- a/futures/tests/task_atomic_waker.rs +++ b/futures/tests/task_atomic_waker.rs @@ -6,7 +6,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread; -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn basic() { let atomic_waker = Arc::new(AtomicWaker::new()); From 7f4d0fb5859b51f4d3affbfc429dbda1fe44c1bf Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 22 Jul 2022 22:13:46 +0900 Subject: [PATCH 27/33] Remove .clippy.toml Clippy now respects `rust-version` field in Cargo.toml. https://github.com/rust-lang/rust-clippy/commit/b776fb82941cadfc752368901f210831d5184d95 --- .clippy.toml | 1 - .github/workflows/ci.yml | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) delete mode 100644 .clippy.toml diff --git a/.clippy.toml b/.clippy.toml deleted file mode 100644 index 992016c29a..0000000000 --- a/.clippy.toml +++ /dev/null @@ -1 +0,0 @@ -msrv = "1.36" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c83181a6f..4c909d887d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,8 +69,8 @@ jobs: matrix: rust: # This is the minimum Rust version supported by futures-core, futures-io, futures-sink. - # When updating this, the reminder to update the minimum required version in README.md, Cargo.toml, and .clippy.toml. - - 1.36 + # When updating this, the reminder to update the minimum required version in README.md and Cargo.toml. + - '1.36' runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -102,7 +102,7 @@ jobs: rust: # This is the minimum Rust version supported by futures, futures-util, futures-task, futures-macro, futures-executor, futures-channel, futures-test. # When updating this, the reminder to update the minimum required version in README.md and Cargo.toml. - - 1.45 + - '1.45' runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 From 9250a345c41dcff582025050f28619d93d2cd635 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 22 Jul 2022 22:14:05 +0900 Subject: [PATCH 28/33] Remove unnecessarily allowed lints --- futures-util/src/stream/stream/split.rs | 2 +- futures/tests_disabled/stream.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 3a72fee30b..e2034e0c27 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -35,7 +35,7 @@ impl Stream for SplitStream { } } -#[allow(bad_style)] +#[allow(non_snake_case)] fn SplitSink, Item>(lock: BiLock) -> SplitSink { SplitSink { lock, slot: None } } diff --git a/futures/tests_disabled/stream.rs b/futures/tests_disabled/stream.rs index 854dbad829..a4eec2c7aa 100644 --- a/futures/tests_disabled/stream.rs +++ b/futures/tests_disabled/stream.rs @@ -318,7 +318,6 @@ fn forward() { } #[test] -#[allow(deprecated)] fn concat() { let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); From f46acef03062f06711b5418bcd5bc26112a6cc9d Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 22 Jul 2022 22:14:59 +0900 Subject: [PATCH 29/33] Fix clippy::mem_replace_with_default warning ``` warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/future/select_all.rs:62:28 | 62 | let rest = mem::replace(&mut self.inner, Vec::new()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(&mut self.inner)` | = note: `#[warn(clippy::mem_replace_with_default)]` on by default = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/future/select_ok.rs:62:40 | 62 | ... let rest = mem::replace(&mut self.inner, Vec::new()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(&mut self.inner)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/stream/collect.rs:22:9 | 22 | mem::replace(self.project().collection, Default::default()) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(self.project().collection)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/stream/unzip.rs:24:10 | 24 | (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.left)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/stream/unzip.rs:24:55 | 24 | (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.right)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/stream/chunks.rs:69:40 | 69 | let full_buf = mem::replace(this.items, Vec::new()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.items)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/stream/ready_chunks.rs:77:40 | 77 | let full_buf = mem::replace(this.items, Vec::new()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.items)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/try_stream/try_collect.rs:48:31 | 48 | None => break mem::replace(this.items, Default::default()), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.items)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/stream/try_stream/try_chunks.rs:74:40 | 74 | let full_buf = mem::replace(this.items, Vec::new()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.items)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/io/lines.rs:45:29 | 45 | Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(this.buf)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/io/read_exact.rs:33:33 | 33 | let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(&mut this.buf)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/io/read_line.rs:25:31 | 25 | Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 } | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(buf)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/io/read_to_string.rs:25:31 | 25 | Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len } | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(buf)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures-util/src/io/write_all.rs:33:33 | 33 | let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(&mut this.buf)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default warning: replacing a value of type `T` with `T::default()` is better expressed using `std::mem::take` --> futures/tests/sink.rs:141:9 | 141 | mem::replace(&mut self.data, Vec::new()) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `std::mem::take(&mut self.data)` | = note: `#[warn(clippy::mem_replace_with_default)]` on by default = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#mem_replace_with_default ``` --- futures-util/src/future/select_all.rs | 2 +- futures-util/src/future/select_ok.rs | 2 +- futures-util/src/io/lines.rs | 2 +- futures-util/src/io/read_exact.rs | 2 +- futures-util/src/io/read_line.rs | 2 +- futures-util/src/io/read_to_string.rs | 2 +- futures-util/src/io/write_all.rs | 2 +- futures-util/src/stream/stream/chunks.rs | 2 +- futures-util/src/stream/stream/collect.rs | 2 +- futures-util/src/stream/stream/ready_chunks.rs | 2 +- futures-util/src/stream/stream/unzip.rs | 2 +- futures-util/src/stream/try_stream/try_chunks.rs | 2 +- futures-util/src/stream/try_stream/try_collect.rs | 2 +- futures/tests/sink.rs | 2 +- 14 files changed, 14 insertions(+), 14 deletions(-) diff --git a/futures-util/src/future/select_all.rs b/futures-util/src/future/select_all.rs index 106e50844c..07d65cae79 100644 --- a/futures-util/src/future/select_all.rs +++ b/futures-util/src/future/select_all.rs @@ -59,7 +59,7 @@ impl Future for SelectAll { match item { Some((idx, res)) => { let _ = self.inner.swap_remove(idx); - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); Poll::Ready((res, idx, rest)) } None => Poll::Pending, diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 0ad83c6db6..5d5579930b 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -59,7 +59,7 @@ impl Future for SelectOk { drop(self.inner.remove(idx)); match res { Ok(e) => { - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs index 13e70df238..b5561bfa7d 100644 --- a/futures-util/src/io/lines.rs +++ b/futures-util/src/io/lines.rs @@ -42,6 +42,6 @@ impl Stream for Lines { this.buf.pop(); } } - Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) + Poll::Ready(Some(Ok(mem::take(this.buf)))) } } diff --git a/futures-util/src/io/read_exact.rs b/futures-util/src/io/read_exact.rs index 02e38c35be..cd0b20e597 100644 --- a/futures-util/src/io/read_exact.rs +++ b/futures-util/src/io/read_exact.rs @@ -30,7 +30,7 @@ impl Future for ReadExact<'_, R> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n); + let (_, rest) = mem::take(&mut this.buf).split_at_mut(n); this.buf = rest; } if n == 0 { diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs index c75af9471f..e1b8fc9455 100644 --- a/futures-util/src/io/read_line.rs +++ b/futures-util/src/io/read_line.rs @@ -22,7 +22,7 @@ impl Unpin for ReadLine<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 } } } diff --git a/futures-util/src/io/read_to_string.rs b/futures-util/src/io/read_to_string.rs index 457af59e4f..c175396d81 100644 --- a/futures-util/src/io/read_to_string.rs +++ b/futures-util/src/io/read_to_string.rs @@ -22,7 +22,7 @@ impl Unpin for ReadToString<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { let start_len = buf.len(); - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len } } } diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs index b134bf1b22..08c025f94d 100644 --- a/futures-util/src/io/write_all.rs +++ b/futures-util/src/io/write_all.rs @@ -30,7 +30,7 @@ impl Future for WriteAll<'_, W> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n); + let (_, rest) = mem::take(&mut this.buf).split_at(n); this.buf = rest; } if n == 0 { diff --git a/futures-util/src/stream/stream/chunks.rs b/futures-util/src/stream/stream/chunks.rs index e6d9118a0e..8e8ed3bd86 100644 --- a/futures-util/src/stream/stream/chunks.rs +++ b/futures-util/src/stream/stream/chunks.rs @@ -66,7 +66,7 @@ impl Stream for Chunks { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; diff --git a/futures-util/src/stream/stream/collect.rs b/futures-util/src/stream/stream/collect.rs index b0e81b9ce0..970ac26dbf 100644 --- a/futures-util/src/stream/stream/collect.rs +++ b/futures-util/src/stream/stream/collect.rs @@ -19,7 +19,7 @@ pin_project! { impl Collect { fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.project().collection, Default::default()) + mem::take(self.project().collection) } pub(super) fn new(stream: St) -> Self { diff --git a/futures-util/src/stream/stream/ready_chunks.rs b/futures-util/src/stream/stream/ready_chunks.rs index 44edc53521..d3618d81e8 100644 --- a/futures-util/src/stream/stream/ready_chunks.rs +++ b/futures-util/src/stream/stream/ready_chunks.rs @@ -74,7 +74,7 @@ impl Stream for ReadyChunks { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; diff --git a/futures-util/src/stream/stream/unzip.rs b/futures-util/src/stream/stream/unzip.rs index 15f22e80b0..a88cf03266 100644 --- a/futures-util/src/stream/stream/unzip.rs +++ b/futures-util/src/stream/stream/unzip.rs @@ -21,7 +21,7 @@ pin_project! { impl Unzip { fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { let this = self.project(); - (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) + (mem::take(this.left), mem::take(this.right)) } pub(super) fn new(stream: St) -> Self { diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs index 5b5ff1d6a6..7626e7124d 100644 --- a/futures-util/src/stream/try_stream/try_chunks.rs +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -70,7 +70,7 @@ impl Stream for TryChunks { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; diff --git a/futures-util/src/stream/try_stream/try_collect.rs b/futures-util/src/stream/try_stream/try_collect.rs index 5d3b3d7668..3e5963f033 100644 --- a/futures-util/src/stream/try_stream/try_collect.rs +++ b/futures-util/src/stream/try_stream/try_collect.rs @@ -45,7 +45,7 @@ where Poll::Ready(Ok(loop { match ready!(this.stream.as_mut().try_poll_next(cx)?) { Some(x) => this.items.extend(Some(x)), - None => break mem::replace(this.items, Default::default()), + None => break mem::take(this.items), } })) } diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index f3cf11b931..5b691e74c6 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -138,7 +138,7 @@ impl ManualFlush { for task in self.waiting_tasks.drain(..) { task.wake() } - mem::replace(&mut self.data, Vec::new()) + mem::take(&mut self.data) } } From 0ba0f2a34f3ff7584b6836f86fb04275c7bca9bf Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 22 Jul 2022 22:43:49 +0900 Subject: [PATCH 30/33] Fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f76048681..e2c410917b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -192,7 +192,7 @@ NOTE: This release has been yanked. See #2310 for details. NOTE: This release has been yanked. See #2310 for details. -* Fix signature of `LocalSpawn` trait (breaking change -- see #1959) +* Fix signature of `SpawnExt` and `LocalSpawnExt` trait (breaking change -- see #1959) # 0.3.0 - 2019-11-05 From 5dbb2eba5298e49470689dd4cc9d9dd5ab383b82 Mon Sep 17 00:00:00 2001 From: Konrad Borowski Date: Sat, 23 Jul 2022 07:18:26 +0200 Subject: [PATCH 31/33] Inline WakerRef functions (#2626) Those functions are trivial and non-generic. It's necessary to use `#[inline]` to enable cross-crate inlining for non-generic functions when LTO is disabled. --- futures-task/src/waker_ref.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/futures-task/src/waker_ref.rs b/futures-task/src/waker_ref.rs index 7fb552fcfd..aac4109577 100644 --- a/futures-task/src/waker_ref.rs +++ b/futures-task/src/waker_ref.rs @@ -18,6 +18,7 @@ pub struct WakerRef<'a> { impl<'a> WakerRef<'a> { /// Create a new [`WakerRef`] from a [`Waker`] reference. + #[inline] pub fn new(waker: &'a Waker) -> Self { // copy the underlying (raw) waker without calling a clone, // as we won't call Waker::drop either. @@ -31,6 +32,7 @@ impl<'a> WakerRef<'a> { /// an unsafe way (that will be valid only for a lifetime to be determined /// by the caller), and the [`Waker`] doesn't need to or must not be /// destroyed. + #[inline] pub fn new_unowned(waker: ManuallyDrop) -> Self { Self { waker, _marker: PhantomData } } @@ -39,6 +41,7 @@ impl<'a> WakerRef<'a> { impl Deref for WakerRef<'_> { type Target = Waker; + #[inline] fn deref(&self) -> &Waker { &self.waker } From 72eeae7142fa6d7555194ec15f07a534ffbb401a Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 30 Jul 2022 13:36:20 +0900 Subject: [PATCH 32/33] Fix CI failure on 1.45 ``` error: failed to get `assert_matches` as a dependency of package `futures v0.4.0-alpha.0 (/home/runner/work/futures-rs/futures-rs/futures)` Caused by: failed to load source for dependency `assert_matches` Caused by: Unable to update registry `https://github.com/rust-lang/crates.io-index` Caused by: failed to fetch `https://github.com/rust-lang/crates.io-index` Caused by: error reading from the zlib stream; class=Zlib (5) ``` --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c909d887d..86cac19362 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,6 +97,8 @@ jobs: util-msrv: name: cargo +${{ matrix.rust }} build + env: + CARGO_NET_GIT_FETCH_WITH_CLI: true strategy: matrix: rust: From 80e63b9c27dee7da42f7ea7a896d3ad80316314e Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 14 Aug 2022 20:09:52 +0900 Subject: [PATCH 33/33] Update actions/checkout action to v3 --- .github/workflows/ci.yml | 30 +++++++++++++++--------------- .github/workflows/release.yml | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86cac19362..691bf560b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - windows-latest runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. run: rustup update nightly --no-self-update && rustup default nightly @@ -53,7 +53,7 @@ jobs: - aarch64-unknown-linux-gnu runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cross @@ -73,7 +73,7 @@ jobs: - '1.36' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} # cargo does not support for --features/--no-default-features with workspace, so use cargo-hack instead. @@ -107,7 +107,7 @@ jobs: - '1.45' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -138,7 +138,7 @@ jobs: - nightly runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -150,7 +150,7 @@ jobs: name: cargo build -Z minimal-versions runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -172,7 +172,7 @@ jobs: - thumbv6m-none-eabi runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup target add ${{ matrix.target }} @@ -204,7 +204,7 @@ jobs: name: cargo bench runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: cargo bench --workspace @@ -214,7 +214,7 @@ jobs: name: cargo hack check --feature-powerset runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -239,7 +239,7 @@ jobs: contents: write pull-requests: write steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: ci/no_atomic_cas.sh @@ -272,7 +272,7 @@ jobs: name: cargo miri test runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - run: cargo miri test --workspace --all-features @@ -291,7 +291,7 @@ jobs: - thread runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup component add rust-src @@ -308,7 +308,7 @@ jobs: # name: cargo clippy # runs-on: ubuntu-latest # steps: - # - uses: actions/checkout@v2 + # - uses: actions/checkout@v3 # - name: Install Rust # run: rustup toolchain install nightly --component clippy && rustup default nightly # - run: cargo clippy --workspace --all-features --all-targets @@ -317,7 +317,7 @@ jobs: name: cargo fmt runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - run: cargo fmt --all -- --check @@ -326,7 +326,7 @@ jobs: name: cargo doc runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: RUSTDOCFLAGS="-D warnings --cfg docsrs" cargo doc --workspace --no-deps --all-features diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index dc9b65bf6e..f10a03263a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: if: github.repository_owner == 'rust-lang' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - run: cargo build --all