Skip to content

Commit 2e2b013

Browse files
svartalfNemo157
authored andcommitted
Add TryStreamExt::try_filter
1 parent f68d824 commit 2e2b013

File tree

2 files changed

+172
-3
lines changed

2 files changed

+172
-3
lines changed

futures-util/src/try_stream/mod.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! that return `Result`s, allowing for short-circuiting computations.
55
66
use core::pin::Pin;
7-
use futures_core::future::TryFuture;
7+
use futures_core::future::{Future, TryFuture};
88
use futures_core::stream::TryStream;
99
use futures_core::task::{Context, Poll};
1010

@@ -35,6 +35,9 @@ pub use self::try_next::TryNext;
3535
mod try_for_each;
3636
pub use self::try_for_each::TryForEach;
3737

38+
mod try_filter;
39+
pub use self::try_filter::TryFilter;
40+
3841
mod try_filter_map;
3942
pub use self::try_filter_map::TryFilterMap;
4043

@@ -60,8 +63,6 @@ cfg_target_has_atomic! {
6063
mod try_for_each_concurrent;
6164
#[cfg(feature = "alloc")]
6265
pub use self::try_for_each_concurrent::TryForEachConcurrent;
63-
#[cfg(feature = "alloc")]
64-
use futures_core::future::Future;
6566
}
6667

6768
#[cfg(feature = "std")]
@@ -444,6 +445,46 @@ pub trait TryStreamExt: TryStream {
444445
TryCollect::new(self)
445446
}
446447

448+
/// Attempt to filter the values produced by this stream according to the
449+
/// provided asynchronous closure.
450+
///
451+
/// As values of this stream are made available, the provided predicate `f`
452+
/// will be run on them. If the predicate returns a `Future` which resolves
453+
/// to `true`, then the stream will yield the value, but if the predicate
454+
/// return a `Future` which resolves to `false`, then the value will be
455+
/// discarded and the next value will be produced.
456+
///
457+
/// All errors are passed through without filtering in this combinator.
458+
///
459+
/// Note that this function consumes the stream passed into it and returns a
460+
/// wrapped version of it, similar to the existing `filter` methods in
461+
/// the standard library.
462+
///
463+
/// # Examples
464+
/// ```
465+
/// #![feature(async_await, await_macro)]
466+
/// # futures::executor::block_on(async {
467+
/// use futures::executor::block_on;
468+
/// use futures::future;
469+
/// use futures::stream::{self, StreamExt, TryStreamExt};
470+
///
471+
/// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
472+
/// let mut evens = stream.try_filter(|x| {
473+
/// future::ready(x % 2 == 0)
474+
/// });
475+
///
476+
/// assert_eq!(await!(evens.next()), Some(Ok(2)));
477+
/// assert_eq!(await!(evens.next()), Some(Err("error")));
478+
/// # })
479+
/// ```
480+
fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
481+
where Fut: Future<Output = bool>,
482+
F: FnMut(&Self::Ok) -> Fut,
483+
Self: Sized
484+
{
485+
TryFilter::new(self, f)
486+
}
487+
447488
/// Attempt to filter the values produced by this stream while
448489
/// simultaneously mapping them to a different type according to the
449490
/// provided asynchronous closure.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use core::pin::Pin;
2+
use futures_core::future::Future;
3+
use futures_core::stream::{Stream, TryStream, FusedStream};
4+
use futures_core::task::{Context, Poll};
5+
use futures_sink::Sink;
6+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
7+
8+
/// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
9+
/// method.
10+
#[derive(Debug)]
11+
#[must_use = "streams do nothing unless polled"]
12+
pub struct TryFilter<St, Fut, F>
13+
where St: TryStream
14+
{
15+
stream: St,
16+
f: F,
17+
pending_fut: Option<Fut>,
18+
pending_item: Option<St::Ok>,
19+
}
20+
21+
impl<St, Fut, F> Unpin for TryFilter<St, Fut, F>
22+
where St: TryStream + Unpin, Fut: Unpin,
23+
{}
24+
25+
impl<St, Fut, F> TryFilter<St, Fut, F>
26+
where St: TryStream
27+
{
28+
unsafe_pinned!(stream: St);
29+
unsafe_unpinned!(f: F);
30+
unsafe_pinned!(pending_fut: Option<Fut>);
31+
unsafe_unpinned!(pending_item: Option<St::Ok>);
32+
33+
pub(super) fn new(stream: St, f: F) -> Self {
34+
TryFilter {
35+
stream,
36+
f,
37+
pending_fut: None,
38+
pending_item: None,
39+
}
40+
}
41+
42+
/// Acquires a reference to the underlying stream that this combinator is
43+
/// pulling from.
44+
pub fn get_ref(&self) -> &St {
45+
&self.stream
46+
}
47+
48+
/// Acquires a mutable reference to the underlying stream that this
49+
/// combinator is pulling from.
50+
///
51+
/// Note that care must be taken to avoid tampering with the state of the
52+
/// stream which may otherwise confuse this combinator.
53+
pub fn get_mut(&mut self) -> &mut St {
54+
&mut self.stream
55+
}
56+
57+
/// Acquires a pinned mutable reference to the underlying stream that this
58+
/// combinator is pulling from.
59+
///
60+
/// Note that care must be taken to avoid tampering with the state of the
61+
/// stream which may otherwise confuse this combinator.
62+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> {
63+
self.stream()
64+
}
65+
66+
/// Consumes this combinator, returning the underlying stream.
67+
///
68+
/// Note that this may discard intermediate state of this combinator, so
69+
/// care should be taken to avoid losing resources when this is called.
70+
pub fn into_inner(self) -> St {
71+
self.stream
72+
}
73+
}
74+
75+
impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F>
76+
where St: TryStream + FusedStream,
77+
F: FnMut(&St::Ok) -> Fut,
78+
Fut: Future<Output = bool>,
79+
{
80+
fn is_terminated(&self) -> bool {
81+
self.pending_fut.is_none() && self.stream.is_terminated()
82+
}
83+
}
84+
85+
impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
86+
where St: TryStream,
87+
Fut: Future<Output = bool>,
88+
F: FnMut(&St::Ok) -> Fut,
89+
{
90+
type Item = Result<St::Ok, St::Error>;
91+
92+
fn poll_next(
93+
mut self: Pin<&mut Self>,
94+
cx: &mut Context<'_>,
95+
) -> Poll<Option<Result<St::Ok, St::Error>>> {
96+
loop {
97+
if self.pending_fut.is_none() {
98+
let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
99+
Some(Ok(x)) => x,
100+
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
101+
None => return Poll::Ready(None),
102+
};
103+
let fut = (self.as_mut().f())(&item);
104+
self.as_mut().pending_fut().set(Some(fut));
105+
*self.as_mut().pending_item() = Some(item);
106+
}
107+
108+
let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx));
109+
self.as_mut().pending_fut().set(None);
110+
let item = self.as_mut().pending_item().take().unwrap();
111+
112+
if yield_item {
113+
return Poll::Ready(Some(Ok(item)));
114+
}
115+
}
116+
}
117+
}
118+
119+
// Forwarding impl of Sink from the underlying stream
120+
impl<S, Fut, F, Item> Sink<Item> for TryFilter<S, Fut, F>
121+
where S: TryStream + Sink<Item>,
122+
Fut: Future<Output = bool>,
123+
F: FnMut(&S::Ok) -> Fut,
124+
{
125+
type SinkError = S::SinkError;
126+
127+
delegate_sink!(stream, Item);
128+
}

0 commit comments

Comments
 (0)