Skip to content

Commit ef6415a

Browse files
committed
Implement stream::poll_fn
1 parent 926d49a commit ef6415a

File tree

1 file changed

+46
-7
lines changed

1 file changed

+46
-7
lines changed

src/stream.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub use futures::stream::Stream;
33

44
use core::iter::IntoIterator;
55
use core::pin::Pin;
6+
use core::task::{Context, Poll};
67

78
use pin_utils::pin_mut;
89

@@ -255,9 +256,8 @@ pub fn iter<I>(i: I) -> impl Stream<Item = I::Item>
255256
where
256257
I: IntoIterator,
257258
{
258-
use core::task::Poll;
259259
let mut iter = i.into_iter();
260-
futures::stream::poll_fn(move |_| -> Poll<Option<I::Item>> { Poll::Ready(iter.next()) })
260+
crate::stream::poll_fn(move |_| -> Poll<Option<I::Item>> { Poll::Ready(iter.next()) })
261261
}
262262

263263
/// Concatenate all items of a stream into a single extendable
@@ -402,8 +402,7 @@ pub fn repeat<T>(item: T) -> impl Stream<Item = T>
402402
where
403403
T: Clone,
404404
{
405-
use core::task::Poll;
406-
futures::stream::poll_fn(move |_| -> Poll<Option<T>> { Poll::Ready(Some(item.clone())) })
405+
crate::stream::poll_fn(move |_| -> Poll<Option<T>> { Poll::Ready(Some(item.clone())) })
407406
}
408407

409408
/// Flattens a stream of streams into just one continuous stream.
@@ -795,19 +794,18 @@ where
795794
F: FnMut(T) -> Fut,
796795
Fut: Future<Output = Option<(It, T)>>,
797796
{
798-
use core::task::Poll;
799797
enum State<T, Fut> {
800798
Paused(T),
801799
Running(Pin<Box<Fut>>),
802800
}
803801
let mut state = Some(State::Paused(init));
804-
futures::stream::poll_fn(move |waker| -> Poll<Option<It>> {
802+
crate::stream::poll_fn(move |context| -> Poll<Option<It>> {
805803
let mut future = match state.take() {
806804
Some(State::Running(fut)) => fut,
807805
Some(State::Paused(st)) => Box::pin(f(st)),
808806
None => panic!("this stream must not be polled any more"),
809807
};
810-
match future.as_mut().poll(waker) {
808+
match future.as_mut().poll(context) {
811809
Poll::Pending => {
812810
state = Some(State::Running(future));
813811
Poll::Pending
@@ -821,6 +819,47 @@ where
821819
})
822820
}
823821

822+
/// Creates a new stream wrapping a function returning `Poll<Option<T>>`.
823+
///
824+
/// Polling the returned stream calls the wrapped function.
825+
///
826+
/// # Examples
827+
///
828+
/// ```
829+
/// use futures_async_combinators::stream::poll_fn;
830+
/// use core::task::Poll;
831+
///
832+
/// let mut counter = 1usize;
833+
///
834+
/// let read_stream = poll_fn(move |_| -> Poll<Option<String>> {
835+
/// if counter == 0 { return Poll::Ready(None); }
836+
/// counter -= 1;
837+
/// Poll::Ready(Some("Hello, World!".to_owned()))
838+
/// });
839+
/// ```
840+
pub fn poll_fn<T, F>(f: F) -> impl Stream<Item = T>
841+
where
842+
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
843+
{
844+
pub struct PollFn<F> {
845+
f: F,
846+
}
847+
848+
impl<F> Unpin for PollFn<F> {}
849+
850+
impl<T, F> Stream for PollFn<F>
851+
where
852+
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
853+
{
854+
type Item = T;
855+
856+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
857+
(&mut self.f)(cx)
858+
}
859+
}
860+
PollFn { f }
861+
}
862+
824863
#[cfg(test)]
825864
mod tests {
826865
use crate::future::ready;

0 commit comments

Comments
 (0)