Skip to content

Commit b32fbc4

Browse files
ivan770taiki-e
authored andcommitted
Implement try_chunks (#2438)
1 parent 8abb21e commit b32fbc4

File tree

3 files changed

+189
-0
lines changed

3 files changed

+189
-0
lines changed

futures-util/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ pub use self::try_stream::IntoAsyncRead;
6262
#[cfg(feature = "alloc")]
6363
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
6464

65+
#[cfg(feature = "alloc")]
66+
pub use self::try_stream::{TryChunks, TryChunksError};
67+
6568
// Primitive streams
6669

6770
mod iter;

futures-util/src/stream/try_stream/mod.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use crate::fns::{
1212
use crate::future::assert_future;
1313
use crate::stream::assert_stream;
1414
use crate::stream::{Inspect, Map};
15+
#[cfg(feature = "alloc")]
16+
use alloc::vec::Vec;
1517
use core::pin::Pin;
1618
use futures_core::{
1719
future::{Future, TryFuture},
@@ -94,6 +96,12 @@ mod try_concat;
9496
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
9597
pub use self::try_concat::TryConcat;
9698

99+
#[cfg(feature = "alloc")]
100+
mod try_chunks;
101+
#[cfg(feature = "alloc")]
102+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
103+
pub use self::try_chunks::{TryChunks, TryChunksError};
104+
97105
mod try_fold;
98106
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
99107
pub use self::try_fold::TryFold;
@@ -576,6 +584,53 @@ pub trait TryStreamExt: TryStream {
576584
assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
577585
}
578586

587+
/// An adaptor for chunking up successful items of the stream inside a vector.
588+
///
589+
/// This combinator will attempt to pull successful items from this stream and buffer
590+
/// them into a local vector. At most `capacity` items will get buffered
591+
/// before they're yielded from the returned stream.
592+
///
593+
/// Note that the vectors returned from this iterator may not always have
594+
/// `capacity` elements. If the underlying stream ended and only a partial
595+
/// vector was created, it'll be returned. Additionally if an error happens
596+
/// from the underlying stream then the currently buffered items will be
597+
/// yielded.
598+
///
599+
/// This method is only available when the `std` or `alloc` feature of this
600+
/// library is activated, and it is activated by default.
601+
///
602+
/// This function is similar to
603+
/// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
604+
/// early if an error occurs.
605+
///
606+
/// # Examples
607+
///
608+
/// ```
609+
/// # futures::executor::block_on(async {
610+
/// use futures::stream::{self, TryChunksError, TryStreamExt};
611+
///
612+
/// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
613+
/// let mut stream = stream.try_chunks(2);
614+
///
615+
/// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
616+
/// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
617+
/// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
618+
/// # })
619+
/// ```
620+
///
621+
/// # Panics
622+
///
623+
/// This method will panic if `capacity` is zero.
624+
#[cfg(feature = "alloc")]
625+
fn try_chunks(self, capacity: usize) -> TryChunks<Self>
626+
where
627+
Self: Sized,
628+
{
629+
assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
630+
TryChunks::new(self, capacity),
631+
)
632+
}
633+
579634
/// Attempt to filter the values produced by this stream according to the
580635
/// provided asynchronous closure.
581636
///
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use crate::stream::{Fuse, IntoStream, StreamExt};
2+
3+
use alloc::vec::Vec;
4+
use core::pin::Pin;
5+
use core::{fmt, mem};
6+
use futures_core::ready;
7+
use futures_core::stream::{FusedStream, Stream, TryStream};
8+
use futures_core::task::{Context, Poll};
9+
#[cfg(feature = "sink")]
10+
use futures_sink::Sink;
11+
use pin_project_lite::pin_project;
12+
13+
pin_project! {
14+
/// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
15+
#[derive(Debug)]
16+
#[must_use = "streams do nothing unless polled"]
17+
pub struct TryChunks<St: TryStream> {
18+
#[pin]
19+
stream: Fuse<IntoStream<St>>,
20+
items: Vec<St::Ok>,
21+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
22+
}
23+
}
24+
25+
impl<St: TryStream> TryChunks<St> {
26+
pub(super) fn new(stream: St, capacity: usize) -> Self {
27+
assert!(capacity > 0);
28+
29+
Self {
30+
stream: IntoStream::new(stream).fuse(),
31+
items: Vec::with_capacity(capacity),
32+
cap: capacity,
33+
}
34+
}
35+
36+
fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37+
let cap = self.cap;
38+
mem::replace(self.project().items, Vec::with_capacity(cap))
39+
}
40+
41+
delegate_access_inner!(stream, St, (. .));
42+
}
43+
44+
impl<St: TryStream> Stream for TryChunks<St> {
45+
#[allow(clippy::type_complexity)]
46+
type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
47+
48+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49+
let mut this = self.as_mut().project();
50+
loop {
51+
match ready!(this.stream.as_mut().try_poll_next(cx)) {
52+
// Push the item into the buffer and check whether it is full.
53+
// If so, replace our buffer with a new and empty one and return
54+
// the full one.
55+
Some(item) => match item {
56+
Ok(item) => {
57+
this.items.push(item);
58+
if this.items.len() >= *this.cap {
59+
return Poll::Ready(Some(Ok(self.take())));
60+
}
61+
}
62+
Err(e) => {
63+
return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
64+
}
65+
},
66+
67+
// Since the underlying stream ran out of values, return what we
68+
// have buffered, if we have anything.
69+
None => {
70+
let last = if this.items.is_empty() {
71+
None
72+
} else {
73+
let full_buf = mem::replace(this.items, Vec::new());
74+
Some(full_buf)
75+
};
76+
77+
return Poll::Ready(last.map(Ok));
78+
}
79+
}
80+
}
81+
}
82+
83+
fn size_hint(&self) -> (usize, Option<usize>) {
84+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
85+
let (lower, upper) = self.stream.size_hint();
86+
let lower = lower.saturating_add(chunk_len);
87+
let upper = match upper {
88+
Some(x) => x.checked_add(chunk_len),
89+
None => None,
90+
};
91+
(lower, upper)
92+
}
93+
}
94+
95+
impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
96+
fn is_terminated(&self) -> bool {
97+
self.stream.is_terminated() && self.items.is_empty()
98+
}
99+
}
100+
101+
// Forwarding impl of Sink from the underlying stream
102+
#[cfg(feature = "sink")]
103+
impl<S, Item> Sink<Item> for TryChunks<S>
104+
where
105+
S: TryStream + Sink<Item>,
106+
{
107+
type Error = <S as Sink<Item>>::Error;
108+
109+
delegate_sink!(stream, Item);
110+
}
111+
112+
/// Error indicating, that while chunk was collected inner stream produced an error.
113+
///
114+
/// Contains all items that were collected before an error occured, and the stream error itself.
115+
#[derive(PartialEq, Eq)]
116+
pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
117+
118+
impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
119+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120+
self.1.fmt(f)
121+
}
122+
}
123+
124+
impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
125+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126+
self.1.fmt(f)
127+
}
128+
}
129+
130+
#[cfg(feature = "std")]
131+
impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}

0 commit comments

Comments
 (0)