Skip to content

Commit 7549e90

Browse files
authored
Add TryFlattenUnordered and improve FlattenUnordered (#2577)
1 parent 2be33de commit 7549e90

File tree

9 files changed

+325
-90
lines changed

9 files changed

+325
-90
lines changed

futures-util/benches/flatten_unordered.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::test::Bencher;
55

66
use futures::channel::oneshot;
77
use futures::executor::block_on;
8-
use futures::future::{self, FutureExt};
8+
use futures::future;
99
use futures::stream::{self, StreamExt};
1010
use futures::task::Poll;
1111
use std::collections::VecDeque;
@@ -35,15 +35,14 @@ fn oneshot_streams(b: &mut Bencher) {
3535
});
3636

3737
let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
38-
async {
38+
Box::pin(async {
3939
if let Some(next) = vals.next() {
4040
let val = next.await.unwrap();
4141
Some((val, vals))
4242
} else {
4343
None
4444
}
45-
}
46-
.boxed()
45+
})
4746
})
4847
.flatten_unordered(None);
4948

futures-util/src/future/try_select.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {
1212

1313
impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
1414

15+
type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
16+
type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;
17+
1518
/// Waits for either one of two differently-typed futures to complete.
1619
///
1720
/// This function will return a new future which awaits for either one of both
@@ -52,18 +55,17 @@ where
5255
A: TryFuture + Unpin,
5356
B: TryFuture + Unpin,
5457
{
55-
super::assert_future::<
56-
Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
57-
_,
58-
>(TrySelect { inner: Some((future1, future2)) })
58+
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
59+
inner: Some((future1, future2)),
60+
})
5961
}
6062

6163
impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
6264
where
6365
A: TryFuture,
6466
B: TryFuture,
6567
{
66-
type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
68+
type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;
6769

6870
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
6971
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");

futures-util/src/stream/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1818
#[allow(clippy::module_inception)]
1919
mod stream;
2020
pub use self::stream::{
21-
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
22-
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
23-
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
24-
TryForEach, Unzip, Zip,
21+
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
22+
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
23+
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
24+
TryFold, TryForEach, Unzip, Zip,
2525
};
2626

2727
#[cfg(feature = "std")]
@@ -39,7 +39,10 @@ pub use self::stream::Forward;
3939

4040
#[cfg(not(futures_no_atomic_cas))]
4141
#[cfg(feature = "alloc")]
42-
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent, TryForEachConcurrent};
42+
pub use self::stream::{
43+
BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent,
44+
TryForEachConcurrent,
45+
};
4346

4447
#[cfg(not(futures_no_atomic_cas))]
4548
#[cfg(feature = "sink")]
@@ -61,7 +64,7 @@ pub use self::try_stream::IntoAsyncRead;
6164

6265
#[cfg(not(futures_no_atomic_cas))]
6366
#[cfg(feature = "alloc")]
64-
pub use self::try_stream::{TryBufferUnordered, TryBuffered};
67+
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryFlattenUnordered};
6568

6669
#[cfg(feature = "sink")]
6770
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]

0 commit comments

Comments
 (0)