Skip to content

Commit 440cfdb

Browse files
petrosaggtaiki-e
authored andcommitted
futures-util: add StreamExt::count method
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent fb271ed commit 440cfdb

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::{FusedFuture, Future};
4+
use futures_core::ready;
5+
use futures_core::stream::{FusedStream, Stream};
6+
use futures_core::task::{Context, Poll};
7+
use pin_project_lite::pin_project;
8+
9+
pin_project! {
10+
/// Future for the [`count`](super::StreamExt::count) method.
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct Count<St> {
13+
#[pin]
14+
stream: St,
15+
count: usize
16+
}
17+
}
18+
19+
impl<St> fmt::Debug for Count<St>
20+
where
21+
St: fmt::Debug,
22+
{
23+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24+
f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
25+
}
26+
}
27+
28+
impl<St: Stream> Count<St> {
29+
pub(super) fn new(stream: St) -> Self {
30+
Self { stream, count: 0 }
31+
}
32+
}
33+
34+
impl<St: FusedStream> FusedFuture for Count<St> {
35+
fn is_terminated(&self) -> bool {
36+
self.stream.is_terminated()
37+
}
38+
}
39+
40+
impl<St: Stream> Future for Count<St> {
41+
type Output = usize;
42+
43+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
44+
let mut this = self.project();
45+
46+
Poll::Ready(loop {
47+
match ready!(this.stream.as_mut().poll_next(cx)) {
48+
Some(_) => *this.count += 1,
49+
None => break *this.count,
50+
}
51+
})
52+
}
53+
}

futures-util/src/stream/stream/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ mod concat;
4040
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
4141
pub use self::concat::Concat;
4242

43+
mod count;
44+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45+
pub use self::count::Count;
46+
4347
mod cycle;
4448
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
4549
pub use self::cycle::Cycle;
@@ -576,6 +580,38 @@ pub trait StreamExt: Stream {
576580
assert_future::<Self::Item, _>(Concat::new(self))
577581
}
578582

583+
/// Drives the stream to completion, counting the number of items.
584+
///
585+
/// # Overflow Behavior
586+
///
587+
/// The method does no guarding against overflows, so counting elements of a
588+
/// stream with more than [`usize::MAX`] elements either produces the wrong
589+
/// result or panics. If debug assertions are enabled, a panic is guaranteed.
590+
///
591+
/// # Panics
592+
///
593+
/// This function might panic if the iterator has more than [`usize::MAX`]
594+
/// elements.
595+
///
596+
/// # Examples
597+
///
598+
/// ```
599+
/// # futures::executor::block_on(async {
600+
/// use futures::stream::{self, StreamExt};
601+
///
602+
/// let stream = stream::iter(1..=10);
603+
/// let count = stream.count().await;
604+
///
605+
/// assert_eq!(count, 10);
606+
/// # });
607+
/// ```
608+
fn count(self) -> Count<Self>
609+
where
610+
Self: Sized,
611+
{
612+
assert_future::<usize, _>(Count::new(self))
613+
}
614+
579615
/// Repeats a stream endlessly.
580616
///
581617
/// The stream never terminates. Note that you likely want to avoid

0 commit comments

Comments
 (0)