@@ -3,6 +3,7 @@ pub use futures::stream::Stream;
3
3
4
4
use core:: iter:: IntoIterator ;
5
5
use core:: pin:: Pin ;
6
+ use core:: task:: { Context , Poll } ;
6
7
7
8
use pin_utils:: pin_mut;
8
9
@@ -255,9 +256,8 @@ pub fn iter<I>(i: I) -> impl Stream<Item = I::Item>
255
256
where
256
257
I : IntoIterator ,
257
258
{
258
- use core:: task:: Poll ;
259
259
let mut iter = i. into_iter ( ) ;
260
- futures :: stream:: poll_fn ( move |_| -> Poll < Option < I :: Item > > { Poll :: Ready ( iter. next ( ) ) } )
260
+ crate :: stream:: poll_fn ( move |_| -> Poll < Option < I :: Item > > { Poll :: Ready ( iter. next ( ) ) } )
261
261
}
262
262
263
263
/// Concatenate all items of a stream into a single extendable
@@ -402,8 +402,7 @@ pub fn repeat<T>(item: T) -> impl Stream<Item = T>
402
402
where
403
403
T : Clone ,
404
404
{
405
- use core:: task:: Poll ;
406
- futures:: stream:: poll_fn ( move |_| -> Poll < Option < T > > { Poll :: Ready ( Some ( item. clone ( ) ) ) } )
405
+ crate :: stream:: poll_fn ( move |_| -> Poll < Option < T > > { Poll :: Ready ( Some ( item. clone ( ) ) ) } )
407
406
}
408
407
409
408
/// Flattens a stream of streams into just one continuous stream.
@@ -795,19 +794,18 @@ where
795
794
F : FnMut ( T ) -> Fut ,
796
795
Fut : Future < Output = Option < ( It , T ) > > ,
797
796
{
798
- use core:: task:: Poll ;
799
797
enum State < T , Fut > {
800
798
Paused ( T ) ,
801
799
Running ( Pin < Box < Fut > > ) ,
802
800
}
803
801
let mut state = Some ( State :: Paused ( init) ) ;
804
- futures :: stream:: poll_fn ( move |waker | -> Poll < Option < It > > {
802
+ crate :: stream:: poll_fn ( move |context | -> Poll < Option < It > > {
805
803
let mut future = match state. take ( ) {
806
804
Some ( State :: Running ( fut) ) => fut,
807
805
Some ( State :: Paused ( st) ) => Box :: pin ( f ( st) ) ,
808
806
None => panic ! ( "this stream must not be polled any more" ) ,
809
807
} ;
810
- match future. as_mut ( ) . poll ( waker ) {
808
+ match future. as_mut ( ) . poll ( context ) {
811
809
Poll :: Pending => {
812
810
state = Some ( State :: Running ( future) ) ;
813
811
Poll :: Pending
@@ -821,6 +819,47 @@ where
821
819
} )
822
820
}
823
821
822
+ /// Creates a new stream wrapping a function returning `Poll<Option<T>>`.
823
+ ///
824
+ /// Polling the returned stream calls the wrapped function.
825
+ ///
826
+ /// # Examples
827
+ ///
828
+ /// ```
829
+ /// use futures_async_combinators::stream::poll_fn;
830
+ /// use core::task::Poll;
831
+ ///
832
+ /// let mut counter = 1usize;
833
+ ///
834
+ /// let read_stream = poll_fn(move |_| -> Poll<Option<String>> {
835
+ /// if counter == 0 { return Poll::Ready(None); }
836
+ /// counter -= 1;
837
+ /// Poll::Ready(Some("Hello, World!".to_owned()))
838
+ /// });
839
+ /// ```
840
+ pub fn poll_fn < T , F > ( f : F ) -> impl Stream < Item = T >
841
+ where
842
+ F : FnMut ( & mut Context < ' _ > ) -> Poll < Option < T > > ,
843
+ {
844
+ pub struct PollFn < F > {
845
+ f : F ,
846
+ }
847
+
848
+ impl < F > Unpin for PollFn < F > { }
849
+
850
+ impl < T , F > Stream for PollFn < F >
851
+ where
852
+ F : FnMut ( & mut Context < ' _ > ) -> Poll < Option < T > > ,
853
+ {
854
+ type Item = T ;
855
+
856
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < T > > {
857
+ ( & mut self . f ) ( cx)
858
+ }
859
+ }
860
+ PollFn { f }
861
+ }
862
+
824
863
#[ cfg( test) ]
825
864
mod tests {
826
865
use crate :: future:: ready;
0 commit comments