Skip to content

Commit 1b6968b

Browse files
zhy2020zhy
andauthored
Add stream::repeat_with() method (rust-lang#2279)
Co-authored-by: zhy <zhy@gmail.com>
1 parent 7397e91 commit 1b6968b

File tree

3 files changed

+103
-7
lines changed

3 files changed

+103
-7
lines changed

futures-util/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ pub use self::iter::{iter, Iter};
6565
mod repeat;
6666
pub use self::repeat::{repeat, Repeat};
6767

68+
mod repeat_with;
69+
pub use self::repeat_with::{repeat_with, RepeatWith};
70+
6871
mod empty;
6972
pub use self::empty::{empty, Empty};
7073

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use core::pin::Pin;
2+
use futures_core::stream::{FusedStream, Stream};
3+
use futures_core::task::{Context, Poll};
4+
5+
/// An stream that repeats elements of type `A` endlessly by
6+
/// applying the provided closure `F: FnMut() -> A`.
7+
///
8+
/// This `struct` is created by the [`repeat_with()`] function.
9+
/// See its documentation for more.
10+
#[derive(Debug, Clone)]
11+
#[must_use = "streams do nothing unless polled"]
12+
pub struct RepeatWith<F> {
13+
repeater: F,
14+
}
15+
16+
impl<A, F: FnMut() -> A> Unpin for RepeatWith<F> {}
17+
18+
impl<A, F: FnMut() -> A> Stream for RepeatWith<F> {
19+
type Item = A;
20+
21+
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
22+
Poll::Ready(Some((&mut self.repeater)()))
23+
}
24+
25+
fn size_hint(&self) -> (usize, Option<usize>) {
26+
(usize::max_value(), None)
27+
}
28+
}
29+
30+
impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> {
31+
fn is_terminated(&self) -> bool {
32+
false
33+
}
34+
}
35+
36+
/// Creates a new stream that repeats elements of type `A` endlessly by
37+
/// applying the provided closure, the repeater, `F: FnMut() -> A`.
38+
///
39+
/// The `repeat_with()` function calls the repeater over and over again.
40+
///
41+
/// Infinite stream like `repeat_with()` are often used with adapters like
42+
/// [`stream.take()`], in order to make them finite.
43+
///
44+
/// If the element type of the stream you need implements [`Clone`], and
45+
/// it is OK to keep the source element in memory, you should instead use
46+
/// the [`stream.repeat()`] function.
47+
///
48+
/// # Examples
49+
///
50+
/// Basic usage:
51+
///
52+
/// ```
53+
/// # futures::executor::block_on(async {
54+
/// use futures::stream::{self, StreamExt};
55+
///
56+
/// // let's assume we have some value of a type that is not `Clone`
57+
/// // or which don't want to have in memory just yet because it is expensive:
58+
/// #[derive(PartialEq, Debug)]
59+
/// struct Expensive;
60+
///
61+
/// // a particular value forever:
62+
/// let mut things = stream::repeat_with(|| Expensive);
63+
///
64+
/// assert_eq!(Some(Expensive), things.next().await);
65+
/// assert_eq!(Some(Expensive), things.next().await);
66+
/// assert_eq!(Some(Expensive), things.next().await);
67+
/// # });
68+
/// ```
69+
///
70+
/// Using mutation and going finite:
71+
///
72+
/// ```rust
73+
/// # futures::executor::block_on(async {
74+
/// use futures::stream::{self, StreamExt};
75+
///
76+
/// // From the zeroth to the third power of two:
77+
/// let mut curr = 1;
78+
/// let mut pow2 = stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp })
79+
/// .take(4);
80+
///
81+
/// assert_eq!(Some(1), pow2.next().await);
82+
/// assert_eq!(Some(2), pow2.next().await);
83+
/// assert_eq!(Some(4), pow2.next().await);
84+
/// assert_eq!(Some(8), pow2.next().await);
85+
///
86+
/// // ... and now we're done
87+
/// assert_eq!(None, pow2.next().await);
88+
/// # });
89+
/// ```
90+
pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> {
91+
RepeatWith { repeater }
92+
}

futures/src/lib.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,13 +406,14 @@ pub mod stream {
406406
pub use futures_core::stream::{BoxStream, LocalBoxStream};
407407

408408
pub use futures_util::stream::{
409-
empty, iter, once, pending, poll_fn, repeat, select, try_unfold, unfold, AndThen, Chain,
410-
Collect, Concat, Cycle, Empty, Enumerate, ErrInto, Filter, FilterMap, FlatMap, Flatten,
411-
Fold, ForEach, Forward, Fuse, Inspect, InspectErr, InspectOk, IntoStream, Iter, Map,
412-
MapErr, MapOk, Next, Once, OrElse, Peek, Peekable, Pending, PollFn, Repeat, Scan, Select,
413-
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
414-
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
415-
TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, Unfold, Unzip, Zip,
409+
empty, iter, once, pending, poll_fn, repeat, repeat_with, select, try_unfold, unfold,
410+
AndThen, Chain, Collect, Concat, Cycle, Empty, Enumerate, ErrInto, Filter, FilterMap,
411+
FlatMap, Flatten, Fold, ForEach, Forward, Fuse, Inspect, InspectErr, InspectOk, IntoStream,
412+
Iter, Map, MapErr, MapOk, Next, Once, OrElse, Peek, Peekable, Pending, PollFn, Repeat,
413+
RepeatWith, Scan, Select, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take,
414+
TakeUntil, TakeWhile, Then, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten,
415+
TryFold, TryForEach, TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, Unfold,
416+
Unzip, Zip,
416417
};
417418

418419
#[cfg(feature = "alloc")]

0 commit comments

Comments
 (0)