Skip to content

Commit c0375ae

Browse files
committed
Add stream::next implementation
1 parent d106ff9 commit c0375ae

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
#![feature(async_await, await_macro, futures_api)]
22

33
pub mod future;
4+
pub mod stream;

src/stream.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use futures::stream::Stream;
2+
use core::pin::Pin;
3+
4+
pub async fn next<St>(stream: &mut St) -> Option<St::Item>
5+
where St: Stream + Unpin,
6+
{
7+
use futures::future::poll_fn;
8+
use futures::task::Waker;
9+
let poll_next = |waker: &Waker| Pin::new(&mut *stream).poll_next(waker);
10+
let future_next = poll_fn(poll_next);
11+
await!(future_next)
12+
}
13+
14+
#[cfg(test)]
15+
mod tests {
16+
use futures::{stream, executor};
17+
use crate::stream::*;
18+
19+
#[test]
20+
fn test_next() {
21+
let mut stream = stream::iter(1..=3);
22+
23+
assert_eq!(executor::block_on(next(&mut stream)), Some(1));
24+
assert_eq!(executor::block_on(next(&mut stream)), Some(2));
25+
assert_eq!(executor::block_on(next(&mut stream)), Some(3));
26+
assert_eq!(executor::block_on(next(&mut stream)), None);
27+
}
28+
29+
}

0 commit comments

Comments
 (0)