diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad22..7220c39a6 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -19,9 +19,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, - Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, - SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, - TryFold, TryForEach, Unzip, Zip, + Fold, ForEach, Fuse, Inspect, Interleave, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, + Scan, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, + Then, TryFold, TryForEach, Unzip, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/interleave.rs b/futures-util/src/stream/stream/interleave.rs new file mode 100644 index 000000000..2a1a642de --- /dev/null +++ b/futures-util/src/stream/stream/interleave.rs @@ -0,0 +1,90 @@ +use core::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use futures_core::{FusedStream, Stream}; +use pin_project_lite::pin_project; + +use crate::stream::Fuse; + +pin_project! { + /// Stream for the [`interleave`](super::StreamExt::interleave) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Interleave { + #[pin] i: Fuse, + #[pin] j: Fuse, + next_coming_from_j: bool, + } +} + +impl Interleave { + pub(super) fn new(i: I, j: J) -> Self { + Self { i: Fuse::new(i), j: Fuse::new(j), next_coming_from_j: false } + } + /// Acquires a reference to the underlying streams that this combinator is + /// pulling from. + pub fn get_ref(&self) -> (&I, &J) { + (self.i.get_ref(), self.j.get_ref()) + } + + /// Acquires a mutable reference to the underlying streams that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> (&mut I, &mut J) { + (self.i.get_mut(), self.j.get_mut()) + } + + /// Acquires a pinned mutable reference to the underlying streams that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut I>, Pin<&mut J>) { + let this = self.project(); + (this.i.get_pin_mut(), this.j.get_pin_mut()) + } + + /// Consumes this combinator, returning the underlying streams. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> (I, J) { + (self.i.into_inner(), self.j.into_inner()) + } +} + +impl> Stream for Interleave { + type Item = I::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + *this.next_coming_from_j = !*this.next_coming_from_j; + match this.next_coming_from_j { + true => match ready!(this.i.poll_next(cx)) { + Some(it) => Poll::Ready(Some(it)), + None => this.j.poll_next(cx), + }, + false => match ready!(this.j.poll_next(cx)) { + Some(it) => Poll::Ready(Some(it)), + None => this.i.poll_next(cx), + }, + } + } + + fn size_hint(&self) -> (usize, Option) { + let (ilo, ihi) = self.i.size_hint(); + let (jlo, jhi) = self.j.size_hint(); + let lo = ilo.saturating_add(jlo); + let hi = ihi.and_then(|it| it.checked_add(jhi?)); + (lo, hi) + } +} + +impl> FusedStream for Interleave { + fn is_terminated(&self) -> bool { + self.i.is_terminated() && self.j.is_terminated() + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6..3f4b1d409 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -87,6 +87,9 @@ pub use self::for_each::ForEach; mod fuse; pub use self::fuse::Fuse; +mod interleave; +pub use self::interleave::Interleave; + mod into_future; pub use self::into_future::StreamFuture; @@ -1814,4 +1817,31 @@ pub trait StreamExt: Stream { { assert_future::(SelectNextSome::new(self)) } + /// Creates a stream which interleaves items between `self` and `other`, + /// in turn. + /// + /// When one stream is exhausted, only items from the other are yielded. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::{stream, StreamExt as _}; + /// + /// let stream = stream::iter(['a', 'b']) + /// .interleave(stream::iter(['A', 'B', 'C', 'D'])); + /// + /// assert_eq!( + /// "aAbBCD", + /// stream.collect::().await + /// ); + /// # }) + /// ``` + fn interleave(self, other: St) -> Interleave + where + Self: Sized, + St: Stream, + { + assert_stream(Interleave::new(self, other)) + } }