diff --git a/.clippy.toml b/.clippy.toml deleted file mode 100644 index 992016c29a..0000000000 --- a/.clippy.toml +++ /dev/null @@ -1 +0,0 @@ -msrv = "1.36" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dfb7eea4ff..691bf560b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - windows-latest runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. run: rustup update nightly --no-self-update && rustup default nightly @@ -53,7 +53,7 @@ jobs: - aarch64-unknown-linux-gnu runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cross @@ -69,11 +69,11 @@ jobs: matrix: rust: # This is the minimum Rust version supported by futures-core, futures-io, futures-sink. - # When updating this, the reminder to update the minimum required version in README.md, Cargo.toml, and .clippy.toml. - - 1.36 + # When updating this, the reminder to update the minimum required version in README.md and Cargo.toml. + - '1.36' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} # cargo does not support for --features/--no-default-features with workspace, so use cargo-hack instead. @@ -97,15 +97,17 @@ jobs: util-msrv: name: cargo +${{ matrix.rust }} build + env: + CARGO_NET_GIT_FETCH_WITH_CLI: true strategy: matrix: rust: # This is the minimum Rust version supported by futures, futures-util, futures-task, futures-macro, futures-executor, futures-channel, futures-test. # When updating this, the reminder to update the minimum required version in README.md and Cargo.toml. - - 1.45 + - '1.45' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -136,7 +138,7 @@ jobs: - nightly runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -148,7 +150,7 @@ jobs: name: cargo build -Z minimal-versions runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -170,7 +172,7 @@ jobs: - thumbv6m-none-eabi runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup target add ${{ matrix.target }} @@ -202,7 +204,7 @@ jobs: name: cargo bench runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: cargo bench --workspace @@ -212,7 +214,7 @@ jobs: name: cargo hack check --feature-powerset runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -237,7 +239,7 @@ jobs: contents: write pull-requests: write steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: ci/no_atomic_cas.sh @@ -270,13 +272,13 @@ jobs: name: cargo miri test runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - # futures-executor uses boxed futures so many tests trigger https://github.com/rust-lang/miri/issues/1038 - - run: cargo miri test --workspace --exclude futures-executor --all-features + - run: cargo miri test --workspace --all-features env: - MIRIFLAGS: -Zmiri-check-number-validity -Zmiri-symbolic-alignment-check -Zmiri-tag-raw-pointers -Zmiri-disable-isolation + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout san: name: cargo test -Z sanitizer=${{ matrix.sanitizer }} @@ -289,7 +291,7 @@ jobs: - thread runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup component add rust-src @@ -306,7 +308,7 @@ jobs: # name: cargo clippy # runs-on: ubuntu-latest # steps: - # - uses: actions/checkout@v2 + # - uses: actions/checkout@v3 # - name: Install Rust # run: rustup toolchain install nightly --component clippy && rustup default nightly # - run: cargo clippy --workspace --all-features --all-targets @@ -315,7 +317,7 @@ jobs: name: cargo fmt runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - run: cargo fmt --all -- --check @@ -324,7 +326,7 @@ jobs: name: cargo doc runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: RUSTDOCFLAGS="-D warnings --cfg docsrs" cargo doc --workspace --no-deps --all-features diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index dc9b65bf6e..f10a03263a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: if: github.repository_owner == 'rust-lang' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - run: cargo build --all diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f76048681..e2c410917b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -192,7 +192,7 @@ NOTE: This release has been yanked. See #2310 for details. NOTE: This release has been yanked. See #2310 for details. -* Fix signature of `LocalSpawn` trait (breaking change -- see #1959) +* Fix signature of `SpawnExt` and `LocalSpawnExt` trait (breaking change -- see #1959) # 0.3.0 - 2019-11-05 diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index da0899d491..444c8e10fd 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -200,10 +200,7 @@ fn tx_close_gets_none() { #[test] fn stress_shared_unbounded() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] - const AMT: u32 = 10000; + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::(); @@ -232,10 +229,7 @@ fn stress_shared_unbounded() { #[test] fn stress_shared_bounded_hard() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] - const AMT: u32 = 10000; + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::(0); @@ -265,10 +259,7 @@ fn stress_shared_bounded_hard() { #[allow(clippy::same_item_push)] #[test] fn stress_receiver_multi_task_bounded_hard() { - #[cfg(miri)] - const AMT: usize = 100; - #[cfg(not(miri))] - const AMT: usize = 10_000; + const AMT: usize = if cfg!(miri) { 100 } else { 10_000 }; const NTHREADS: u32 = 2; let (mut tx, rx) = mpsc::channel::(0); @@ -336,10 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() { /// after sender dropped. #[test] fn stress_drop_sender() { - #[cfg(miri)] - const ITER: usize = 100; - #[cfg(not(miri))] - const ITER: usize = 10000; + const ITER: usize = if cfg!(miri) { 100 } else { 10000 }; fn list() -> impl Stream { let (tx, rx) = mpsc::channel(1); @@ -394,10 +382,9 @@ fn stress_close_receiver_iter() { } } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_close_receiver() { - const ITER: usize = 10000; + const ITER: usize = if cfg!(miri) { 50 } else { 10000 }; for _ in 0..ITER { stress_close_receiver_iter(); @@ -414,10 +401,7 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender, count: u32) { #[allow(clippy::same_item_push)] #[test] fn stress_poll_ready() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] - const AMT: u32 = 1000; + const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; const NTHREADS: u32 = 8; /// Run a stress test using the specified channel capacity. @@ -444,10 +428,9 @@ fn stress_poll_ready() { stress(16); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn try_send_1() { - const N: usize = 3000; + const N: usize = if cfg!(miri) { 100 } else { 3000 }; let (mut tx, rx) = mpsc::channel(0); let t = thread::spawn(move || { diff --git a/futures-channel/tests/oneshot.rs b/futures-channel/tests/oneshot.rs index c9f5508973..6b48376dc0 100644 --- a/futures-channel/tests/oneshot.rs +++ b/futures-channel/tests/oneshot.rs @@ -35,10 +35,7 @@ fn cancel_notifies() { #[test] fn cancel_lots() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 20000; + const N: usize = if cfg!(miri) { 100 } else { 20000 }; let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); let t = thread::spawn(move || { @@ -106,10 +103,7 @@ fn is_canceled() { #[test] fn cancel_sends() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 20000; + const N: usize = if cfg!(miri) { 100 } else { 20000 }; let (tx, rx) = mpsc::channel::>(); let t = thread::spawn(move || { diff --git a/futures-executor/Cargo.toml b/futures-executor/Cargo.toml index dae5f22e77..e1ce688581 100644 --- a/futures-executor/Cargo.toml +++ b/futures-executor/Cargo.toml @@ -22,7 +22,7 @@ futures-util = { path = "../futures-util", version = "0.3.21", default-features num_cpus = { version = "1.8.0", optional = true } [dev-dependencies] -futures = { path = "../futures" } +futures = { path = "../futures", features = ["thread-pool"] } [package.metadata.docs.rs] all-features = true diff --git a/futures-executor/src/enter.rs b/futures-executor/src/enter.rs index 5895a9efb6..cb58c30bb7 100644 --- a/futures-executor/src/enter.rs +++ b/futures-executor/src/enter.rs @@ -34,7 +34,7 @@ impl std::error::Error for EnterError {} /// executor. /// /// Executor implementations should call this function before beginning to -/// execute a tasks, and drop the returned [`Enter`](Enter) value after +/// execute a task, and drop the returned [`Enter`](Enter) value after /// completing task execution: /// /// ``` diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index bee96d8db9..ec1751c7c4 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -63,7 +63,7 @@ thread_local! { impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc) { // Make sure the wakeup is remembered until the next `park()`. - let unparked = arc_self.unparked.swap(true, Ordering::Relaxed); + let unparked = arc_self.unparked.swap(true, Ordering::Release); if !unparked { // If the thread has not been unparked yet, it must be done // now. If it was actually parked, it will run again, @@ -90,33 +90,21 @@ fn run_executor) -> Poll>(mut f: F) -> T { if let Poll::Ready(t) = f(&mut cx) { return t; } - // Consume the wakeup that occurred while executing `f`, if any. - let unparked = thread_notify.unparked.swap(false, Ordering::Acquire); - if !unparked { + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { // No wakeup occurred. It may occur now, right before parking, // but in that case the token made available by `unpark()` // is guaranteed to still be available and `park()` is a no-op. thread::park(); - // When the thread is unparked, `unparked` will have been set - // and needs to be unset before the next call to `f` to avoid - // a redundant loop iteration. - thread_notify.unparked.store(false, Ordering::Release); } } }) } -fn poll_executor) -> T>(mut f: F) -> T { - let _enter = enter().expect( - "cannot execute `LocalPool` executor from within \ - another executor", - ); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - f(&mut cx) - }) +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst)) } impl LocalPool { @@ -212,20 +200,26 @@ impl LocalPool { /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { - poll_executor(|ctx| { + run_executor(|cx| { loop { - let ret = self.poll_pool_once(ctx); - - // return if we have executed a future - if let Poll::Ready(Some(_)) = ret { - return true; + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), } - // if there are no new incoming futures - // then there is no feature that can make progress - // and we can return without having completed a single future - if self.incoming.borrow().is_empty() { - return false; + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); } } }) @@ -257,44 +251,52 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { - let _ = self.poll_pool(ctx); + run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } }); } - // Make maximal progress on the entire pool of spawned task, returning `Ready` - // if the pool is empty and `Pending` if no further progress can be made. + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // state for the FuturesUnordered, which will never be used loop { - let ret = self.poll_pool_once(cx); + self.drain_incoming(); - // we queued up some new tasks; add them and poll again + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. if !self.incoming.borrow().is_empty() { continue; } - // no queued tasks; we may be done - match ret { - Poll::Pending => return Poll::Pending, + match pool_ret { + Poll::Ready(Some(())) => continue, Poll::Ready(None) => return Poll::Ready(()), - _ => {} + Poll::Pending => return Poll::Pending, } } } - // Try make minimal progress on the pool of spawned tasks - fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll> { - // empty the incoming queue of newly-spawned tasks - { - let mut incoming = self.incoming.borrow_mut(); - for task in incoming.drain(..) { - self.pool.push(task) - } + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) } - - // try to execute the next ready future - self.pool.poll_next_unpin(cx) } } diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 5e1f586eb8..5371008953 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -108,12 +108,15 @@ impl ThreadPool { /// completion. /// /// ``` + /// # { /// use futures::executor::ThreadPool; /// /// let pool = ThreadPool::new().unwrap(); /// /// let future = async { /* ... */ }; /// pool.spawn_ok(future); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` /// /// > **Note**: This method is similar to `SpawnExt::spawn`, except that @@ -346,9 +349,8 @@ impl fmt::Debug for Task { impl ArcWake for WakeHandle { fn wake_by_ref(arc_self: &Arc) { - match arc_self.mutex.notify() { - Ok(task) => arc_self.exec.state.send(Message::Run(task)), - Err(()) => {} + if let Ok(task) = arc_self.mutex.notify() { + arc_self.exec.state.send(Message::Run(task)) } } } @@ -360,16 +362,19 @@ mod tests { #[test] fn test_drop_after_start() { - let (tx, rx) = mpsc::sync_channel(2); - let _cpu_pool = ThreadPoolBuilder::new() - .pool_size(2) - .after_start(move |_| tx.send(1).unwrap()) - .create() - .unwrap(); - - // After ThreadPoolBuilder is deconstructed, the tx should be dropped - // so that we can use rx as an iterator. - let count = rx.into_iter().count(); - assert_eq!(count, 2); + { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = ThreadPoolBuilder::new() + .pool_size(2) + .after_start(move |_| tx.send(1).unwrap()) + .create() + .unwrap(); + + // After ThreadPoolBuilder is deconstructed, the tx should be dropped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 9b1316b998..72ce74b744 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,7 +1,7 @@ use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, lazy, poll_fn, Future}; -use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker}; +use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; @@ -288,7 +288,7 @@ fn run_until_stalled_runs_spawned_sub_futures() { #[test] fn run_until_stalled_executes_all_ready() { - const ITER: usize = 200; + const ITER: usize = if cfg!(miri) { 50 } else { 200 }; const PER_ITER: usize = 3; let cnt = Rc::new(Cell::new(0)); @@ -432,3 +432,65 @@ fn park_unpark_independence() { futures::executor::block_on(future) } + +struct SelfWaking { + wakeups_remaining: Rc>, +} + +impl Future for SelfWaking { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if *self.wakeups_remaining.borrow() != 0 { + *self.wakeups_remaining.borrow_mut() -= 1; + cx.waker().wake_by_ref(); + } + + Poll::Pending + } +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `run_until_stalled` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_run_until_stalled() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + // This should keep polling until there are no more wakeups. + pool.run_until_stalled(); + + assert_eq!(*wakeups_remaining.borrow(), 0); +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `try_run_one` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_try_run_one() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + spawner.spawn(future::ready(())).unwrap(); + + // The `ready` future should complete. + assert!(pool.try_run_one()); + + // The self-waking futures are each polled once. + assert_eq!(*wakeups_remaining.borrow(), 7); +} diff --git a/futures-task/src/waker_ref.rs b/futures-task/src/waker_ref.rs index 7fb552fcfd..aac4109577 100644 --- a/futures-task/src/waker_ref.rs +++ b/futures-task/src/waker_ref.rs @@ -18,6 +18,7 @@ pub struct WakerRef<'a> { impl<'a> WakerRef<'a> { /// Create a new [`WakerRef`] from a [`Waker`] reference. + #[inline] pub fn new(waker: &'a Waker) -> Self { // copy the underlying (raw) waker without calling a clone, // as we won't call Waker::drop either. @@ -31,6 +32,7 @@ impl<'a> WakerRef<'a> { /// an unsafe way (that will be valid only for a lifetime to be determined /// by the caller), and the [`Waker`] doesn't need to or must not be /// destroyed. + #[inline] pub fn new_unowned(waker: ManuallyDrop) -> Self { Self { waker, _marker: PhantomData } } @@ -39,6 +41,7 @@ impl<'a> WakerRef<'a> { impl Deref for WakerRef<'_> { type Target = Waker; + #[inline] fn deref(&self) -> &Waker { &self.waker } diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index b5aa8a7dd1..90e50ff919 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -19,7 +19,7 @@ futures-executor = { version = "0.3.21", path = "../futures-executor", default-f futures-sink = { version = "0.3.21", path = "../futures-sink", default-features = false } futures-macro = { version = "=0.3.21", path = "../futures-macro", default-features = false } pin-utils = { version = "0.1.0", default-features = false } -pin-project = "1.0.1" +pin-project = "1.0.11" [dev-dependencies] futures = { path = "../futures", default-features = false, features = ["std", "executor"] } diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs index ee5c6ddd5d..0f52f62bb9 100644 --- a/futures-test/src/future/mod.rs +++ b/futures-test/src/future/mod.rs @@ -68,6 +68,7 @@ pub trait FutureTestExt: Future { /// /// assert_eq!(rx.await, Ok(5)); /// # }); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` fn run_in_background(self) where diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 46ec854b04..e32b642aa2 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -45,7 +45,7 @@ memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" -pin-project-lite = "0.2.4" +pin-project-lite = "0.2.6" [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/benches/select.rs b/futures-util/benches/select.rs new file mode 100644 index 0000000000..5410a95299 --- /dev/null +++ b/futures-util/benches/select.rs @@ -0,0 +1,35 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::stream::{repeat, select, StreamExt}; + +#[bench] +fn select_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + + b.iter(|| { + let stream1 = repeat(1).take(STREAM_COUNT); + let stream2 = repeat(2).take(STREAM_COUNT); + let stream3 = repeat(3).take(STREAM_COUNT); + let stream4 = repeat(4).take(STREAM_COUNT); + let stream5 = repeat(5).take(STREAM_COUNT); + let stream6 = repeat(6).take(STREAM_COUNT); + let stream7 = repeat(7).take(STREAM_COUNT); + let count = block_on(async { + let count = select( + stream1, + select( + stream2, + select(stream3, select(stream4, select(stream5, select(stream6, stream7)))), + ), + ) + .count() + .await; + count + }); + assert_eq!(count, STREAM_COUNT * 7); + }); +} diff --git a/futures-util/src/abortable.rs b/futures-util/src/abortable.rs index bb82dd0db8..e0afd47218 100644 --- a/futures-util/src/abortable.rs +++ b/futures-util/src/abortable.rs @@ -75,7 +75,7 @@ impl Abortable { /// in calls to `Abortable::new`. #[derive(Debug)] pub struct AbortRegistration { - inner: Arc, + pub(crate) inner: Arc, } /// A handle to an `Abortable` task. @@ -100,9 +100,9 @@ impl AbortHandle { // Inner type storing the waker to awaken and a bool indicating that it // should be aborted. #[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - aborted: AtomicBool, +pub(crate) struct AbortInner { + pub(crate) waker: AtomicWaker, + pub(crate) aborted: AtomicBool, } /// Indicator that the `Abortable` task was aborted. diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index 9b31932fe3..985931564b 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -262,19 +262,20 @@ where let waker = waker_ref(&inner.notifier); let mut cx = Context::from_waker(&waker); - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, + } impl Drop for Reset<'_> { fn drop(&mut self) { - use std::thread; - - if thread::panicking() { - self.0.store(POISONED, SeqCst); + if !self.did_not_panic { + self.state.store(POISONED, SeqCst); } } } - let _reset = Reset(&inner.notifier.state); + let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; let output = { let future = unsafe { @@ -284,12 +285,15 @@ where } }; - match future.poll(&mut cx) { + let poll_result = future.poll(&mut cx); + reset.did_not_panic = true; + + match poll_result { Poll::Pending => { if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success - drop(_reset); + drop(reset); this.inner = Some(inner); return Poll::Pending; } else { @@ -313,7 +317,7 @@ where waker.wake(); } - drop(_reset); // Make borrow checker happy + drop(reset); // Make borrow checker happy drop(wakers_guard); // Safety: We're in the COMPLETE state diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 2e52ac17f4..7dc159ba07 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { +pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind +enum JoinAllKind where F: Future, { @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; + assert_future::::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::>().collect() }, }; + assert_future::::Output>, _>(JoinAll { kind }) } } diff --git a/futures-util/src/future/pending.rs b/futures-util/src/future/pending.rs index 92c78d52b8..b8e28686e1 100644 --- a/futures-util/src/future/pending.rs +++ b/futures-util/src/future/pending.rs @@ -33,6 +33,7 @@ impl FusedFuture for Pending { /// unreachable!(); /// # }); /// ``` +#[cfg_attr(docsrs, doc(alias = "never"))] pub fn pending() -> Pending { assert_future::(Pending { _data: marker::PhantomData }) } diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index bd44f20f77..e693a30b00 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -100,16 +100,17 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), - Poll::Pending => match b.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending - } - }, + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, b))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, a))); } + + self.inner = Some((a, b)); + Poll::Pending } } diff --git a/futures-util/src/future/select_all.rs b/futures-util/src/future/select_all.rs index 106e50844c..07d65cae79 100644 --- a/futures-util/src/future/select_all.rs +++ b/futures-util/src/future/select_all.rs @@ -59,7 +59,7 @@ impl Future for SelectAll { match item { Some((idx, res)) => { let _ = self.inner.swap_remove(idx); - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); Poll::Ready((res, idx, rest)) } None => Poll::Pending, diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 0ad83c6db6..5d5579930b 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -59,7 +59,7 @@ impl Future for SelectOk { drop(self.inner.remove(idx)); match res { Ok(e) => { - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index fb3bdd8a02..e5bc700714 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture { /// assert_eq!(future.await, Ok(1)); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn map_err(self, f: F) -> MapErr where F: FnOnce(Self::Error) -> E, @@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture { /// let future_err_i32 = future_err_u8.err_into::(); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn err_into(self) -> ErrInto where Self: Sized, diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 29244af837..25fcfcb6c2 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,14 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState { Pending, @@ -31,7 +28,20 @@ pub struct TryJoinAll where F: TryFuture, { - elems: Pin]>>, + kind: TryJoinAllKind, +} + +enum TryJoinAllKind +where + F: TryFuture, +{ + Small { + elems: Pin>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect>, Vec>, + }, } impl fmt::Debug for TryJoinAll @@ -39,9 +49,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -83,15 +100,37 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all(i: I) -> TryJoinAll +pub fn try_join_all(iter: I) -> TryJoinAll where I: IntoIterator, I::Item: TryFuture, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::::Ok>, ::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }; + + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, + }; + + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } } impl Future for TryJoinAll @@ -101,36 +140,46 @@ where type Output = Result, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut state = FinalState::AllDone; - - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl FromIterator for TryJoinAll { +impl FromIterator for TryJoinAll +where + F: TryFuture, +{ fn from_iter>(iter: T) -> Self { try_join_all(iter) } diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs new file mode 100644 index 0000000000..fdbc4a5f00 --- /dev/null +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -0,0 +1,124 @@ +use crate::abortable::{AbortHandle, AbortInner, Aborted}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`. +/// +/// The returned future will copy all the bytes read from this `AsyncBufRead` into the +/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// use futures::future::Aborted; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer); +/// let bytes = fut.await; +/// abort_handle.abort(); +/// writer.close().await.unwrap(); +/// match bytes { +/// Ok(Ok(n)) => { +/// assert_eq!(n, 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// Ok(n) +/// }, +/// Ok(Err(a)) => { +/// Err::(a) +/// } +/// Err(e) => panic!("{}", e) +/// } +/// # }).unwrap(); +/// ``` +pub fn copy_buf_abortable( + reader: R, + writer: &mut W, +) -> (CopyBufAbortable<'_, R, W>, AbortHandle) +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + let (handle, reg) = AbortHandle::new_pair(); + (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle) +} + +pin_project! { + /// Future for the [`copy_buf()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBufAbortable<'a, R, W: ?Sized> { + #[pin] + reader: R, + writer: &'a mut W, + amt: u64, + inner: Arc + } +} + +macro_rules! ready_or_break { + ($e:expr $(,)?) => { + match $e { + $crate::task::Poll::Ready(t) => t, + $crate::task::Poll::Pending => break, + } + }; +} + +impl Future for CopyBufAbortable<'_, R, W> +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + Sized, +{ + type Output = Result, io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + // Check if the task has been aborted + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + + // Read some bytes from the reader, and if we have reached EOF, return total bytes read + let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?; + return Poll::Ready(Ok(Ok(*this.amt))); + } + + // Pass the buffer to the writer, and update the amount written + let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *this.amt += i as u64; + this.reader.as_mut().consume(i); + } + // Schedule the task to be woken up again. + // Never called unless Poll::Pending is returned from io objects. + this.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + Poll::Pending + } +} diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs index 13e70df238..b5561bfa7d 100644 --- a/futures-util/src/io/lines.rs +++ b/futures-util/src/io/lines.rs @@ -42,6 +42,6 @@ impl Stream for Lines { this.buf.pop(); } } - Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) + Poll::Ready(Some(Ok(mem::take(this.buf)))) } } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 4dd2e029bf..8ce3ad644b 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -66,6 +66,9 @@ pub use self::copy::{copy, Copy}; mod copy_buf; pub use self::copy_buf::{copy_buf, CopyBuf}; +mod copy_buf_abortable; +pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable}; + mod cursor; pub use self::cursor::Cursor; diff --git a/futures-util/src/io/read_exact.rs b/futures-util/src/io/read_exact.rs index 02e38c35be..cd0b20e597 100644 --- a/futures-util/src/io/read_exact.rs +++ b/futures-util/src/io/read_exact.rs @@ -30,7 +30,7 @@ impl Future for ReadExact<'_, R> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n); + let (_, rest) = mem::take(&mut this.buf).split_at_mut(n); this.buf = rest; } if n == 0 { diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs index c75af9471f..e1b8fc9455 100644 --- a/futures-util/src/io/read_line.rs +++ b/futures-util/src/io/read_line.rs @@ -22,7 +22,7 @@ impl Unpin for ReadLine<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 } } } diff --git a/futures-util/src/io/read_to_string.rs b/futures-util/src/io/read_to_string.rs index 457af59e4f..c175396d81 100644 --- a/futures-util/src/io/read_to_string.rs +++ b/futures-util/src/io/read_to_string.rs @@ -22,7 +22,7 @@ impl Unpin for ReadToString<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { let start_len = buf.len(); - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len } } } diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs index b134bf1b22..08c025f94d 100644 --- a/futures-util/src/io/write_all.rs +++ b/futures-util/src/io/write_all.rs @@ -30,7 +30,7 @@ impl Future for WriteAll<'_, W> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n); + let (_, rest) = mem::take(&mut this.buf).split_at(n); this.buf = rest; } if n == 0 { diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 2f51ae7c98..2174079c83 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -224,6 +224,9 @@ pub struct BiLockGuard<'a, T> { bilock: &'a BiLock, } +// We allow parallel access to T via Deref, so Sync bound is also needed here. +unsafe impl Sync for BiLockGuard<'_, T> {} + impl Deref for BiLockGuard<'_, T> { type Target = T; fn deref(&self) -> &T { diff --git a/futures-util/src/lock/mod.rs b/futures-util/src/lock/mod.rs index cf374c016f..0be72717c8 100644 --- a/futures-util/src/lock/mod.rs +++ b/futures-util/src/lock/mod.rs @@ -4,11 +4,18 @@ //! library is activated, and it is activated by default. #[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "std")] -mod mutex; +#[cfg(any(feature = "sink", feature = "io"))] +#[cfg(not(feature = "bilock"))] +pub(crate) use self::bilock::BiLock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "std")] -pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; +pub use self::mutex::{ + MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] @@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; #[cfg(not(futures_no_atomic_cas))] -#[cfg(any(feature = "sink", feature = "io"))] -#[cfg(not(feature = "bilock"))] -pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "bilock")] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; +#[cfg(feature = "std")] +mod mutex; diff --git a/futures-util/src/lock/mutex.rs b/futures-util/src/lock/mutex.rs index 85dcb1537b..335ad14273 100644 --- a/futures-util/src/lock/mutex.rs +++ b/futures-util/src/lock/mutex.rs @@ -1,14 +1,16 @@ -use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll, Waker}; -use slab::Slab; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex as StdMutex; +use std::sync::{Arc, Mutex as StdMutex}; use std::{fmt, mem}; +use slab::Slab; + +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; + /// A futures-aware mutex. /// /// # Fairness @@ -107,6 +109,18 @@ impl Mutex { } } + /// Attempt to acquire the lock immediately. + /// + /// If the lock is currently held, this will return `None`. + pub fn try_lock_owned(self: &Arc) -> Option> { + let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); + if (old_state & IS_LOCKED) == 0 { + Some(OwnedMutexGuard { mutex: self.clone() }) + } else { + None + } + } + /// Acquire the lock asynchronously. /// /// This method returns a future that will resolve once the lock has been @@ -115,6 +129,14 @@ impl Mutex { MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } + /// Acquire the lock asynchronously. + /// + /// This method returns a future that will resolve once the lock has been + /// successfully acquired. + pub fn lock_owned(self: Arc) -> OwnedMutexLockFuture { + OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } + } + /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `Mutex` mutably, no actual locking needs to @@ -173,7 +195,118 @@ impl Mutex { } // Sentinel for when no slot in the `Slab` has been dedicated to this object. -const WAIT_KEY_NONE: usize = usize::max_value(); +const WAIT_KEY_NONE: usize = usize::MAX; + +/// A future which resolves when the target mutex has been successfully acquired, owned version. +pub struct OwnedMutexLockFuture { + // `None` indicates that the mutex was successfully acquired. + mutex: Option>>, + wait_key: usize, +} + +impl fmt::Debug for OwnedMutexLockFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexLockFuture") + .field("was_acquired", &self.mutex.is_none()) + .field("mutex", &self.mutex) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) + .finish() + } +} + +impl FusedFuture for OwnedMutexLockFuture { + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl Future for OwnedMutexLockFuture { + type Output = OwnedMutexGuard; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion"); + + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + { + let mut waiters = mutex.waiters.lock().unwrap(); + if this.wait_key == WAIT_KEY_NONE { + this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); + if waiters.len() == 1 { + mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock + } + } else { + waiters[this.wait_key].register(cx.waker()); + } + } + + // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by + // attempting to acquire the lock again. + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + Poll::Pending + } +} + +impl Drop for OwnedMutexLockFuture { + fn drop(&mut self) { + if let Some(mutex) = self.mutex.as_ref() { + // This future was dropped before it acquired the mutex. + // + // Remove ourselves from the map, waking up another waiter if we + // had been awoken to acquire the lock. + mutex.remove_waker(self.wait_key, true); + } + } +} + +/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods. +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +pub struct OwnedMutexGuard { + mutex: Arc>, +} + +impl fmt::Debug for OwnedMutexGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexGuard") + .field("value", &&**self) + .field("mutex", &self.mutex) + .finish() + } +} + +impl Drop for OwnedMutexGuard { + fn drop(&mut self) { + self.mutex.unlock() + } +} + +impl Deref for OwnedMutexGuard { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl DerefMut for OwnedMutexGuard { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} /// A future which resolves when the target mutex has been successfully acquired. pub struct MutexLockFuture<'a, T: ?Sized> { @@ -386,13 +519,25 @@ unsafe impl Sync for Mutex {} // It's safe to switch which thread the acquire is being attempted on so long as // `T` can be accessed on that thread. unsafe impl Send for MutexLockFuture<'_, T> {} + // doesn't have any interesting `&self` methods (only Debug) unsafe impl Sync for MutexLockFuture<'_, T> {} +// It's safe to switch which thread the acquire is being attempted on so long as +// `T` can be accessed on that thread. +unsafe impl Send for OwnedMutexLockFuture {} + +// doesn't have any interesting `&self` methods (only Debug) +unsafe impl Sync for OwnedMutexLockFuture {} + // Safe to send since we don't track any thread-specific details-- the inner // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) unsafe impl Send for MutexGuard<'_, T> {} unsafe impl Sync for MutexGuard<'_, T> {} + +unsafe impl Send for OwnedMutexGuard {} +unsafe impl Sync for OwnedMutexGuard {} + unsafe impl Send for MappedMutexGuard<'_, T, U> {} unsafe impl Sync for MappedMutexGuard<'_, T, U> {} diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index f596b3b0e3..f1c93fd683 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -135,11 +135,39 @@ impl FuturesOrdered { /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. + #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { + self.push_back(future); + } + + /// Pushes a future to the back of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } + + /// Pushes a future to the front of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. This future will be the next future to be returned + /// complete. + pub fn push_front(&mut self, future: Fut) { + if self.next_outgoing_index == 0 { + self.push_back(future) + } else { + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); + } + } } impl Default for FuturesOrdered { @@ -196,7 +224,7 @@ impl FromIterator for FuturesOrdered { { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { - acc.push(item); + acc.push_back(item); acc }) } @@ -214,7 +242,7 @@ impl Extend for FuturesOrdered { I: IntoIterator, { for item in iter { - self.push(item); + self.push_back(item); } } } diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 04db5ee753..20248c70fe 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -2,6 +2,7 @@ use super::task::Task; use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; +use core::ptr; use core::sync::atomic::Ordering::Relaxed; /// Mutable iterator over all futures in the unordered set. @@ -58,6 +59,9 @@ impl Iterator for IntoIter { // valid `next_all` checks can be skipped. let next = (**task).next_all.load(Relaxed); *task = next; + if !task.is_null() { + *(**task).prev_all.get() = ptr::null_mut(); + } self.len -= 1; Some(future) } diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index bd86990cdb..7423519df1 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -1,5 +1,4 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -18,13 +17,15 @@ impl PollNext { /// Toggle the value and return the old one. pub fn toggle(&mut self) -> Self { let old = *self; + *self = self.other(); + old + } + fn other(&self) -> PollNext { match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, } - - old } } @@ -34,14 +35,41 @@ impl Default for PollNext { } } +enum InternalState { + Start, + LeftFinished, + RightFinished, + BothFinished, +} + +impl InternalState { + fn finish(&mut self, ps: PollNext) { + match (&self, ps) { + (InternalState::Start, PollNext::Left) => { + *self = InternalState::LeftFinished; + } + (InternalState::Start, PollNext::Right) => { + *self = InternalState::RightFinished; + } + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { + *self = InternalState::BothFinished; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy { #[pin] - stream1: Fuse, + stream1: St1, #[pin] - stream2: Fuse, + stream2: St2, + internal_state: InternalState, state: State, clos: Clos, } @@ -120,9 +148,10 @@ where State: Default, { assert_stream::(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), + internal_state: InternalState::Start, clos: which, }) } @@ -131,7 +160,7 @@ impl SelectWithStrategy { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -140,7 +169,7 @@ impl SelectWithStrategy { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -150,7 +179,7 @@ impl SelectWithStrategy { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -158,7 +187,7 @@ impl SelectWithStrategy { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } @@ -169,47 +198,88 @@ where Clos: FnMut(&mut State) -> PollNext, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + match self.internal_state { + InternalState::BothFinished => true, + _ => false, + } } } -impl Stream for SelectWithStrategy +#[inline] +fn poll_side( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, + cx: &mut Context<'_>, +) -> Poll> where St1: Stream, St2: Stream, - Clos: FnMut(&mut State) -> PollNext, { - type Item = St1::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), - } + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), } } -fn poll_inner( - a: Pin<&mut St1>, - b: Pin<&mut St2>, +#[inline] +fn poll_inner( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, cx: &mut Context<'_>, ) -> Poll> where St1: Stream, St2: Stream, { - let a_done = match a.poll_next(cx) { + match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, - Poll::Pending => false, + Poll::Ready(None) => { + select.internal_state.finish(side); + } + Poll::Pending => (), }; + let other = side.other(); + match poll_side(select, other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(other); + Poll::Ready(None) + } + a => a, + } +} + +impl Stream for SelectWithStrategy +where + St1: Stream, + St2: Stream, + Clos: FnMut(&mut State) -> PollNext, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + match this.internal_state { + InternalState::Start => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } + InternalState::LeftFinished => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::RightFinished => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::BothFinished => Poll::Ready(None), + } } } diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 6052a737ba..8ca0391c55 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -64,7 +64,7 @@ where // our queue of futures. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/stream/chunks.rs b/futures-util/src/stream/stream/chunks.rs index 8457869999..8e8ed3bd86 100644 --- a/futures-util/src/stream/stream/chunks.rs +++ b/futures-util/src/stream/stream/chunks.rs @@ -66,7 +66,7 @@ impl Stream for Chunks { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -79,7 +79,7 @@ impl Stream for Chunks { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/futures-util/src/stream/stream/collect.rs b/futures-util/src/stream/stream/collect.rs index b0e81b9ce0..970ac26dbf 100644 --- a/futures-util/src/stream/stream/collect.rs +++ b/futures-util/src/stream/stream/collect.rs @@ -19,7 +19,7 @@ pin_project! { impl Collect { fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.project().collection, Default::default()) + mem::take(self.project().collection) } pub(super) fn new(stream: St) -> Self { diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 642b91e823..a823fab123 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -1674,6 +1674,8 @@ pub trait StreamExt: Stream { /// assert_eq!(total, 6); /// # }); /// ``` + /// + /// [`select!`]: crate::select fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream, diff --git a/futures-util/src/stream/stream/ready_chunks.rs b/futures-util/src/stream/stream/ready_chunks.rs index 5ebc9582db..d3618d81e8 100644 --- a/futures-util/src/stream/stream/ready_chunks.rs +++ b/futures-util/src/stream/stream/ready_chunks.rs @@ -74,7 +74,7 @@ impl Stream for ReadyChunks { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -87,7 +87,7 @@ impl Stream for ReadyChunks { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 3a72fee30b..e2034e0c27 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -35,7 +35,7 @@ impl Stream for SplitStream { } } -#[allow(bad_style)] +#[allow(non_snake_case)] fn SplitSink, Item>(lock: BiLock) -> SplitSink { SplitSink { lock, slot: None } } diff --git a/futures-util/src/stream/stream/unzip.rs b/futures-util/src/stream/stream/unzip.rs index 15f22e80b0..a88cf03266 100644 --- a/futures-util/src/stream/stream/unzip.rs +++ b/futures-util/src/stream/stream/unzip.rs @@ -21,7 +21,7 @@ pin_project! { impl Unzip { fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { let this = self.project(); - (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) + (mem::take(this.left), mem::take(this.right)) } pub(super) fn new(stream: St) -> Self { diff --git a/futures-util/src/stream/try_stream/into_async_read.rs b/futures-util/src/stream/try_stream/into_async_read.rs index 914b277a02..ffbfc7eae9 100644 --- a/futures-util/src/stream/try_stream/into_async_read.rs +++ b/futures-util/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState, -} - -impl Unpin for IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead + where + St: TryStream, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState> { impl IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl AsyncRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl AsyncWrite for IntoAsyncRead where - St: TryStream + AsyncWrite + Unpin, + St: TryStream + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_close(cx) } } impl AsyncBufRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 6bf2cb74a5..bc4c6e4f6a 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -918,7 +918,7 @@ pub trait TryStreamExt: TryStream { /// that matches the stream's `Error` type. /// /// This adaptor will buffer up to `n` futures and then return their - /// outputs in the order. If the underlying stream returns an error, it will + /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will /// be immediately propagated. /// /// The returned stream will be a stream of results, each containing either @@ -1031,12 +1031,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1048,12 +1043,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1061,7 +1056,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead where - Self: Sized + TryStreamExt + Unpin, + Self: Sized + TryStreamExt, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self)) diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 45bd3f8c7a..9f48e5c0a7 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -54,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs index 07d4425a81..7626e7124d 100644 --- a/futures-util/src/stream/try_stream/try_chunks.rs +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -70,7 +70,7 @@ impl Stream for TryChunks { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -83,7 +83,7 @@ impl Stream for TryChunks { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/futures-util/src/stream/try_stream/try_collect.rs b/futures-util/src/stream/try_stream/try_collect.rs index 5d3b3d7668..3e5963f033 100644 --- a/futures-util/src/stream/try_stream/try_collect.rs +++ b/futures-util/src/stream/try_stream/try_collect.rs @@ -45,7 +45,7 @@ where Poll::Ready(Ok(loop { match ready!(this.stream.as_mut().try_poll_next(cx)?) { Some(x) => this.items.extend(Some(x)), - None => break mem::replace(this.items, Default::default()), + None => break mem::take(this.items), } })) } diff --git a/futures-util/src/task/spawn.rs b/futures-util/src/task/spawn.rs index 87ca360516..d9e9985309 100644 --- a/futures-util/src/task/spawn.rs +++ b/futures-util/src/task/spawn.rs @@ -34,7 +34,7 @@ pub trait SpawnExt: Spawn { /// today. Feel free to use this method in the meantime. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 + /// # { /// use futures::executor::ThreadPool; /// use futures::task::SpawnExt; /// @@ -42,6 +42,8 @@ pub trait SpawnExt: Spawn { /// /// let future = async { /* ... */ }; /// executor.spawn(future).unwrap(); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "alloc")] fn spawn(&self, future: Fut) -> Result<(), SpawnError> @@ -59,7 +61,7 @@ pub trait SpawnExt: Spawn { /// resolves to the output of the spawned future. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 + /// # { /// use futures::executor::{block_on, ThreadPool}; /// use futures::future; /// use futures::task::SpawnExt; @@ -69,6 +71,8 @@ pub trait SpawnExt: Spawn { /// let future = future::ready(1); /// let join_handle_fut = executor.spawn_with_handle(future).unwrap(); /// assert_eq!(block_on(join_handle_fut), 1); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "channel")] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] @@ -138,7 +142,6 @@ pub trait LocalSpawnExt: LocalSpawn { /// resolves to the output of the spawned future. /// /// ``` - /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 /// use futures::executor::LocalPool; /// use futures::task::LocalSpawnExt; /// diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 6871f47eaa..b6ff22d387 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -27,7 +27,7 @@ futures-util = { path = "../futures-util", version = "0.3.21", default-features futures-executor = { path = "../futures-executor", features = ["thread-pool"] } futures-test = { path = "../futures-test" } assert_matches = "1.3.0" -pin-project = "1.0.1" +pin-project = "1.0.11" pin-utils = "0.1.0" static_assertions = "1" tokio = "0.1.11" diff --git a/futures/src/lib.rs b/futures/src/lib.rs index b8ebc614e2..b972f51754 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -25,13 +25,13 @@ //! within macros and keywords such as async and await!. //! //! ```rust -//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 //! # use futures::channel::mpsc; //! # use futures::executor; ///standard executors to provide a context for futures and streams //! # use futures::executor::ThreadPool; //! # use futures::StreamExt; //! # //! fn main() { +//! # { //! let pool = ThreadPool::new().expect("Failed to build pool"); //! let (tx, rx) = mpsc::unbounded::(); //! @@ -73,6 +73,8 @@ //! let values: Vec = executor::block_on(fut_values); //! //! println!("Values={:?}", values); +//! # } +//! # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 //! } //! ``` //! diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index b3d8b00773..da00ccf40c 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -576,10 +576,10 @@ pub mod future { // TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin - assert_impl!(TryJoinAll>: Send); + assert_impl!(TryJoinAll>: Send); assert_not_impl!(TryJoinAll: Send); assert_not_impl!(TryJoinAll: Send); - assert_impl!(TryJoinAll>: Sync); + assert_impl!(TryJoinAll>: Sync); assert_not_impl!(TryJoinAll: Sync); assert_not_impl!(TryJoinAll: Sync); assert_impl!(TryJoinAll: Unpin); diff --git a/futures/tests/eventual.rs b/futures/tests/eventual.rs index 34613806c4..951c55c214 100644 --- a/futures/tests/eventual.rs +++ b/futures/tests/eventual.rs @@ -1,5 +1,3 @@ -#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 - use futures::channel::oneshot; use futures::executor::ThreadPool; use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; @@ -136,6 +134,8 @@ fn select3() { #[test] fn select4() { + const N: usize = if cfg!(miri) { 100 } else { 10000 }; + let (tx, rx) = mpsc::channel::>(); let t = thread::spawn(move || { @@ -145,7 +145,7 @@ fn select4() { }); let (tx2, rx2) = mpsc::channel(); - for _ in 0..10000 { + for _ in 0..N { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); diff --git a/futures/tests/future_shared.rs b/futures/tests/future_shared.rs index 3ceaebb5c8..6bf43d23cf 100644 --- a/futures/tests/future_shared.rs +++ b/futures/tests/future_shared.rs @@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool}; use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; use futures::task::LocalSpawn; use std::cell::{Cell, RefCell}; +use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::task::Poll; use std::thread; @@ -96,7 +97,6 @@ fn drop_in_poll() { assert_eq!(block_on(future1), 1); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn peek() { let mut local_pool = LocalPool::new(); @@ -194,3 +194,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() { // has returned pending assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); } + +#[test] +#[should_panic(expected = "inner future panicked during poll")] +fn panic_while_poll() { + let fut = futures::future::poll_fn::(|_cx| panic!("test")).shared(); + + let fut_captured = fut.clone(); + std::panic::catch_unwind(AssertUnwindSafe(|| { + block_on(fut_captured); + })) + .unwrap_err(); + + block_on(fut); +} + +#[test] +#[should_panic(expected = "test_marker")] +fn poll_while_panic() { + struct S; + + impl Drop for S { + fn drop(&mut self) { + let fut = futures::future::ready(1).shared(); + assert_eq!(block_on(fut.clone()), 1); + assert_eq!(block_on(fut), 1); + } + } + + let _s = S {}; + panic!("test_marker"); +} diff --git a/futures/tests/lock_mutex.rs b/futures/tests/lock_mutex.rs index c92ef50ad8..c15e76bd84 100644 --- a/futures/tests/lock_mutex.rs +++ b/futures/tests/lock_mutex.rs @@ -34,34 +34,36 @@ fn mutex_wakes_waiters() { assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn mutex_contested() { - let (tx, mut rx) = mpsc::unbounded(); - let pool = ThreadPool::builder().pool_size(16).create().unwrap(); + { + let (tx, mut rx) = mpsc::unbounded(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); - let tx = Arc::new(tx); - let mutex = Arc::new(Mutex::new(0)); + let tx = Arc::new(tx); + let mutex = Arc::new(Mutex::new(0)); - let num_tasks = 1000; - for _ in 0..num_tasks { - let tx = tx.clone(); - let mutex = mutex.clone(); - pool.spawn(async move { - let mut lock = mutex.lock().await; - ready(()).pending_once().await; - *lock += 1; - tx.unbounded_send(()).unwrap(); - drop(lock); - }) - .unwrap(); - } - - block_on(async { + let num_tasks = 1000; for _ in 0..num_tasks { - rx.next().await.unwrap(); + let tx = tx.clone(); + let mutex = mutex.clone(); + pool.spawn(async move { + let mut lock = mutex.lock().await; + ready(()).pending_once().await; + *lock += 1; + tx.unbounded_send(()).unwrap(); + drop(lock); + }) + .unwrap(); } - let lock = mutex.lock().await; - assert_eq!(num_tasks, *lock); - }) + + block_on(async { + for _ in 0..num_tasks { + rx.next().await.unwrap(); + } + let lock = mutex.lock().await; + assert_eq!(num_tasks, *lock); + }); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } diff --git a/futures/tests/macro_comma_support.rs b/futures/tests/macro_comma_support.rs index 3b082d211f..85871e98be 100644 --- a/futures/tests/macro_comma_support.rs +++ b/futures/tests/macro_comma_support.rs @@ -14,7 +14,6 @@ fn ready() { })) } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn poll() { use futures::poll; diff --git a/futures/tests/no-std/src/lib.rs b/futures/tests/no-std/src/lib.rs index 308218d6b7..89a8fa1ff1 100644 --- a/futures/tests/no-std/src/lib.rs +++ b/futures/tests/no-std/src/lib.rs @@ -1,6 +1,5 @@ #![cfg(nightly)] #![no_std] -#![feature(cfg_target_has_atomic)] #[cfg(feature = "futures-core-alloc")] #[cfg(target_has_atomic = "ptr")] diff --git a/futures/tests/ready_queue.rs b/futures/tests/ready_queue.rs index afba8f28b3..c19d62593c 100644 --- a/futures/tests/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -93,10 +93,7 @@ fn dropping_ready_queue() { #[test] fn stress() { - #[cfg(miri)] - const ITER: usize = 30; - #[cfg(not(miri))] - const ITER: usize = 300; + const ITER: usize = if cfg!(miri) { 30 } else { 300 }; for i in 0..ITER { let n = (i % 10) + 1; diff --git a/futures/tests/recurse.rs b/futures/tests/recurse.rs index f06524f85a..d81753c9d7 100644 --- a/futures/tests/recurse.rs +++ b/futures/tests/recurse.rs @@ -3,7 +3,6 @@ use futures::future::{self, BoxFuture, FutureExt}; use std::sync::mpsc; use std::thread; -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn lots() { #[cfg(not(futures_sanitizer))] diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index dc826bda98..5b691e74c6 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -138,7 +138,7 @@ impl ManualFlush { for task in self.waiting_tasks.drain(..) { task.wake() } - mem::replace(&mut self.data, Vec::new()) + mem::take(&mut self.data) } } @@ -288,7 +288,6 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn with_flush() { let (tx, rx) = oneshot::channel(); diff --git a/futures/tests/stream_futures_ordered.rs b/futures/tests/stream_futures_ordered.rs index 84e0bcc1df..8b85a3365a 100644 --- a/futures/tests/stream_futures_ordered.rs +++ b/futures/tests/stream_futures_ordered.rs @@ -2,6 +2,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, join, Future, FutureExt, TryFutureExt}; use futures::stream::{FuturesOrdered, StreamExt}; +use futures::task::Poll; use futures_test::task::noop_context; use std::any::Any; @@ -26,7 +27,6 @@ fn works_1() { assert_eq!(None, iter.next()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::(); @@ -46,6 +46,69 @@ fn works_2() { assert!(stream.poll_next_unpin(&mut cx).is_ready()); } +#[test] +fn test_push_front() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // 1 and 2 should be received in order + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_front(d_rx); + d_tx.send(4).unwrap(); + + // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next + // and then 3 after it + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn test_push_back() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // All results should be received in order + + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_back(d_rx); + d_tx.send(4).unwrap(); + + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); +} + #[test] fn from_iterator() { let stream = vec![future::ready::(1), future::ready::(2), future::ready::(3)] @@ -55,7 +118,6 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn queue_never_unblocked() { let (_a_tx, a_rx) = oneshot::channel::>(); diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index f62f733610..b568280479 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -56,7 +56,6 @@ fn works_1() { assert_eq!(None, iter.next()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::(); @@ -86,7 +85,6 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn finished_future() { let (_a_tx, a_rx) = oneshot::channel::(); @@ -261,6 +259,20 @@ fn into_iter_len() { assert!(into_iter.next().is_none()); } +#[test] +fn into_iter_partial() { + let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)] + .into_iter() + .collect::>(); + + let mut into_iter = stream.into_iter(); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + // don't panic when iterator is dropped before completing +} + #[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index d83fc54b1c..194e74db74 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -1,5 +1,3 @@ -#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 - use futures::{ stream::{self, StreamExt, TryStreamExt}, task::Poll, diff --git a/futures/tests/task_atomic_waker.rs b/futures/tests/task_atomic_waker.rs index 2d1612a45d..cec3db2876 100644 --- a/futures/tests/task_atomic_waker.rs +++ b/futures/tests/task_atomic_waker.rs @@ -6,7 +6,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread; -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn basic() { let atomic_waker = Arc::new(AtomicWaker::new()); diff --git a/futures/tests_disabled/stream.rs b/futures/tests_disabled/stream.rs index 854dbad829..a4eec2c7aa 100644 --- a/futures/tests_disabled/stream.rs +++ b/futures/tests_disabled/stream.rs @@ -318,7 +318,6 @@ fn forward() { } #[test] -#[allow(deprecated)] fn concat() { let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs index 9b05d4b9f4..341e6ac7d2 100644 --- a/no_atomic_cas.rs +++ b/no_atomic_cas.rs @@ -7,6 +7,7 @@ const NO_ATOMIC_CAS: &[&str] = &[ "bpfel-unknown-none", "msp430-none-elf", "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", "riscv32imc-unknown-none-elf", "thumbv4t-none-eabi", "thumbv6m-none-eabi",