Skip to content

Commit 1a1e83c

Browse files
barvirmMarek Barvir
authored andcommitted
StreamExt::All / StreamExt::Any (#2460)
Closes #2458 Co-authored-by: Marek Barvir <barvir@cadwork.cz>
1 parent 0afa002 commit 1a1e83c

File tree

3 files changed

+236
-0
lines changed

3 files changed

+236
-0
lines changed

futures-util/src/stream/stream/all.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::{FusedFuture, Future};
4+
use futures_core::ready;
5+
use futures_core::stream::Stream;
6+
use futures_core::task::{Context, Poll};
7+
use pin_project_lite::pin_project;
8+
9+
pin_project! {
10+
/// Future for the [`all`](super::StreamExt::all) method.
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct All<St, Fut, F> {
13+
#[pin]
14+
stream: St,
15+
f: F,
16+
accum: Option<bool>,
17+
#[pin]
18+
future: Option<Fut>,
19+
}
20+
}
21+
22+
impl<St, Fut, F> fmt::Debug for All<St, Fut, F>
23+
where
24+
St: fmt::Debug,
25+
Fut: fmt::Debug,
26+
{
27+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28+
f.debug_struct("All")
29+
.field("stream", &self.stream)
30+
.field("accum", &self.accum)
31+
.field("future", &self.future)
32+
.finish()
33+
}
34+
}
35+
36+
impl<St, Fut, F> All<St, Fut, F>
37+
where
38+
St: Stream,
39+
F: FnMut(St::Item) -> Fut,
40+
Fut: Future<Output = bool>,
41+
{
42+
pub(super) fn new(stream: St, f: F) -> Self {
43+
Self { stream, f, accum: Some(true), future: None }
44+
}
45+
}
46+
47+
impl<St, Fut, F> FusedFuture for All<St, Fut, F>
48+
where
49+
St: Stream,
50+
F: FnMut(St::Item) -> Fut,
51+
Fut: Future<Output = bool>,
52+
{
53+
fn is_terminated(&self) -> bool {
54+
self.accum.is_none() && self.future.is_none()
55+
}
56+
}
57+
58+
impl<St, Fut, F> Future for All<St, Fut, F>
59+
where
60+
St: Stream,
61+
F: FnMut(St::Item) -> Fut,
62+
Fut: Future<Output = bool>,
63+
{
64+
type Output = bool;
65+
66+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
67+
let mut this = self.project();
68+
Poll::Ready(loop {
69+
if let Some(fut) = this.future.as_mut().as_pin_mut() {
70+
// we're currently processing a future to produce a new accum value
71+
let acc = this.accum.unwrap() && ready!(fut.poll(cx));
72+
if !acc {
73+
break false;
74+
} // early exit
75+
*this.accum = Some(acc);
76+
this.future.set(None);
77+
} else if this.accum.is_some() {
78+
// we're waiting on a new item from the stream
79+
match ready!(this.stream.as_mut().poll_next(cx)) {
80+
Some(item) => {
81+
this.future.set(Some((this.f)(item)));
82+
}
83+
None => {
84+
break this.accum.take().unwrap();
85+
}
86+
}
87+
} else {
88+
panic!("All polled after completion")
89+
}
90+
})
91+
}
92+
}

futures-util/src/stream/stream/any.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::{FusedFuture, Future};
4+
use futures_core::ready;
5+
use futures_core::stream::Stream;
6+
use futures_core::task::{Context, Poll};
7+
use pin_project_lite::pin_project;
8+
9+
pin_project! {
10+
/// Future for the [`any`](super::StreamExt::any) method.
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct Any<St, Fut, F> {
13+
#[pin]
14+
stream: St,
15+
f: F,
16+
accum: Option<bool>,
17+
#[pin]
18+
future: Option<Fut>,
19+
}
20+
}
21+
22+
impl<St, Fut, F> fmt::Debug for Any<St, Fut, F>
23+
where
24+
St: fmt::Debug,
25+
Fut: fmt::Debug,
26+
{
27+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28+
f.debug_struct("Any")
29+
.field("stream", &self.stream)
30+
.field("accum", &self.accum)
31+
.field("future", &self.future)
32+
.finish()
33+
}
34+
}
35+
36+
impl<St, Fut, F> Any<St, Fut, F>
37+
where
38+
St: Stream,
39+
F: FnMut(St::Item) -> Fut,
40+
Fut: Future<Output = bool>,
41+
{
42+
pub(super) fn new(stream: St, f: F) -> Self {
43+
Self { stream, f, accum: Some(false), future: None }
44+
}
45+
}
46+
47+
impl<St, Fut, F> FusedFuture for Any<St, Fut, F>
48+
where
49+
St: Stream,
50+
F: FnMut(St::Item) -> Fut,
51+
Fut: Future<Output = bool>,
52+
{
53+
fn is_terminated(&self) -> bool {
54+
self.accum.is_none() && self.future.is_none()
55+
}
56+
}
57+
58+
impl<St, Fut, F> Future for Any<St, Fut, F>
59+
where
60+
St: Stream,
61+
F: FnMut(St::Item) -> Fut,
62+
Fut: Future<Output = bool>,
63+
{
64+
type Output = bool;
65+
66+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
67+
let mut this = self.project();
68+
Poll::Ready(loop {
69+
if let Some(fut) = this.future.as_mut().as_pin_mut() {
70+
// we're currently processing a future to produce a new accum value
71+
let acc = this.accum.unwrap() || ready!(fut.poll(cx));
72+
if acc {
73+
break true;
74+
} // early exit
75+
*this.accum = Some(acc);
76+
this.future.set(None);
77+
} else if this.accum.is_some() {
78+
// we're waiting on a new item from the stream
79+
match ready!(this.stream.as_mut().poll_next(cx)) {
80+
Some(item) => {
81+
this.future.set(Some((this.f)(item)));
82+
}
83+
None => {
84+
break this.accum.take().unwrap();
85+
}
86+
}
87+
} else {
88+
panic!("Any polled after completion")
89+
}
90+
})
91+
}
92+
}

futures-util/src/stream/stream/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ mod fold;
7070
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
7171
pub use self::fold::Fold;
7272

73+
mod any;
74+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
75+
pub use self::any::Any;
76+
77+
mod all;
78+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
79+
pub use self::all::All;
80+
7381
#[cfg(feature = "sink")]
7482
mod forward;
7583

@@ -627,6 +635,50 @@ pub trait StreamExt: Stream {
627635
assert_future::<T, _>(Fold::new(self, f, init))
628636
}
629637

638+
/// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
639+
///
640+
/// # Examples
641+
///
642+
/// ```
643+
/// # futures::executor::block_on(async {
644+
/// use futures::stream::{self, StreamExt};
645+
///
646+
/// let number_stream = stream::iter(0..10);
647+
/// let contain_three = number_stream.any(|i| async move { i == 3 });
648+
/// assert_eq!(contain_three.await, true);
649+
/// # });
650+
/// ```
651+
fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
652+
where
653+
F: FnMut(Self::Item) -> Fut,
654+
Fut: Future<Output = bool>,
655+
Self: Sized,
656+
{
657+
assert_future::<bool, _>(Any::new(self, f))
658+
}
659+
660+
/// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
661+
///
662+
/// # Examples
663+
///
664+
/// ```
665+
/// # futures::executor::block_on(async {
666+
/// use futures::stream::{self, StreamExt};
667+
///
668+
/// let number_stream = stream::iter(0..10);
669+
/// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
670+
/// assert_eq!(less_then_twenty.await, true);
671+
/// # });
672+
/// ```
673+
fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
674+
where
675+
F: FnMut(Self::Item) -> Fut,
676+
Fut: Future<Output = bool>,
677+
Self: Sized,
678+
{
679+
assert_future::<bool, _>(All::new(self, f))
680+
}
681+
630682
/// Flattens a stream of streams into just one continuous stream.
631683
///
632684
/// # Examples

0 commit comments

Comments
 (0)