From 9bb7938239c8f9061a9c9f3c9bd35592e8493ac1 Mon Sep 17 00:00:00 2001 From: Adriano Mourao Date: Sat, 19 Apr 2025 00:20:09 -0400 Subject: [PATCH 1/3] feat: Implement find combinator for StreamExt --- futures-util/src/stream/stream/find.rs | 84 ++++++++++++++++++++++++++ futures-util/src/stream/stream/mod.rs | 26 ++++++++ 2 files changed, 110 insertions(+) create mode 100644 futures-util/src/stream/stream/find.rs diff --git a/futures-util/src/stream/stream/find.rs b/futures-util/src/stream/stream/find.rs new file mode 100644 index 000000000..abde4ed20 --- /dev/null +++ b/futures-util/src/stream/stream/find.rs @@ -0,0 +1,84 @@ +use crate::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{ready, Context, Poll}; +use futures_core::FusedFuture; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`find`](super::StreamExt::find) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Find + where St: Stream, + { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + pending_fut: Option, + pending_item: Option, + } +} + +impl Find +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, pending_fut: None, pending_item: None } + } +} + +impl FusedFuture for Find +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done && self.pending_fut.is_none() + } +} + +impl Future for Find +where + St: futures_core::Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.pending_fut.set(None); + if res { + *this.done = true; + break this.pending_item.take(); + } + } else if !*this.done { + match ready!(this.stream.as_mut().poll_next(cx)) { + // we're waiting on a new item from the stream + Some(item) => { + this.pending_fut.set(Some((this.f)(&item))); + *this.pending_item = Some(item); + } + None => { + *this.done = true; + break None; + } + } + } else { + break None; + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6..ba08c8431 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -49,6 +49,9 @@ pub use self::filter::Filter; mod filter_map; pub use self::filter_map::FilterMap; +mod find; +use self::find::Find; + mod flatten; delegate_all!( @@ -672,6 +675,29 @@ pub trait StreamExt: Stream { assert_future::(Fold::new(self, f, init)) } + /// Execute asynchronous predicate over asynchronous stream, and return `Option` + /// if any element in stream satisfied the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter([1, 3, 6, 5, 9]); + /// let first_even = number_stream.find(|&i| async move { i % 2 == 0 }); + /// assert_eq!(first_even.await, Some(6)); + /// # }); + /// ``` + fn find(self, f: F) -> Find + where + Self: Sized, + F: FnMut(&Self::Item) -> Fut, + Fut: Future, + { + assert_future::, _>(Find::new(self, f)) + } + /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. /// /// # Examples From e053475330380b832b2208a15caa88d8965fa9d7 Mon Sep 17 00:00:00 2001 From: Adriano Mourao Date: Sun, 20 Apr 2025 19:31:41 -0400 Subject: [PATCH 2/3] Change FnMut to FnMut1 and improve test and its explanation --- futures-util/src/stream/stream/find.rs | 9 +++++---- futures-util/src/stream/stream/mod.rs | 11 +++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/futures-util/src/stream/stream/find.rs b/futures-util/src/stream/stream/find.rs index abde4ed20..ed5b44b5a 100644 --- a/futures-util/src/stream/stream/find.rs +++ b/futures-util/src/stream/stream/find.rs @@ -1,3 +1,4 @@ +use crate::fns::FnMut1; use crate::Stream; use core::future::Future; @@ -26,7 +27,7 @@ pin_project! { impl Find where St: Stream, - F: FnMut(&St::Item) -> Fut, + F: for<'a> FnMut1<&'a St::Item, Output = Fut>, Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { @@ -48,7 +49,7 @@ where impl Future for Find where St: futures_core::Stream, - F: FnMut(&St::Item) -> Fut, + F: for<'a> FnMut1<&'a St::Item, Output = Fut>, Fut: Future, { type Output = Option; @@ -65,10 +66,10 @@ where break this.pending_item.take(); } } else if !*this.done { + // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { - // we're waiting on a new item from the stream Some(item) => { - this.pending_fut.set(Some((this.f)(&item))); + this.pending_fut.set(Some(this.f.call_mut(&item))); *this.pending_item = Some(item); } None => { diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ba08c8431..2f19de62f 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -675,17 +675,20 @@ pub trait StreamExt: Stream { assert_future::(Fold::new(self, f, init)) } - /// Execute asynchronous predicate over asynchronous stream, and return `Option` - /// if any element in stream satisfied the predicate. + /// Returns the first element in the stream that matches the given asynchronous predicate. + /// + /// Searches through the stream and applies the provided asynchronous predicate to each element + /// until a match is found. Returns `Some(item)` with the matching element, or `None` if no + /// element matches the predicate. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { - /// use futures::stream::{self, StreamExt}; + /// use futures::{future, stream::{self, StreamExt}}; /// /// let number_stream = stream::iter([1, 3, 6, 5, 9]); - /// let first_even = number_stream.find(|&i| async move { i % 2 == 0 }); + /// let first_even = number_stream.find(|x| future::ready(x % 2 == 0)); /// assert_eq!(first_even.await, Some(6)); /// # }); /// ``` From 69d02971efbd1ea2a1a240f5b28ba861ba51035e Mon Sep 17 00:00:00 2001 From: Adriano Mourao Date: Sun, 20 Apr 2025 19:51:29 -0400 Subject: [PATCH 3/3] Implement Display for Find --- futures-util/src/stream/stream/find.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/find.rs b/futures-util/src/stream/stream/find.rs index ed5b44b5a..055847e94 100644 --- a/futures-util/src/stream/stream/find.rs +++ b/futures-util/src/stream/stream/find.rs @@ -1,6 +1,7 @@ use crate::fns::FnMut1; use crate::Stream; +use core::fmt; use core::future::Future; use core::pin::Pin; use core::task::{ready, Context, Poll}; @@ -9,7 +10,6 @@ use pin_project_lite::pin_project; pin_project! { /// Future for the [`find`](super::StreamExt::find) method. - #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Find where St: Stream, @@ -24,6 +24,22 @@ pin_project! { } } +impl fmt::Debug for Find +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Find") + .field("stream", &self.stream) + .field("done", &self.done) + .field("pending_fut", &self.pending_fut) + .field("pending_item", &self.pending_item) + .finish() + } +} + impl Find where St: Stream,