Skip to content

Commit 9a34f07

Browse files
committed
Implement early-exit stream selection with a trait
1 parent 7aefa94 commit 9a34f07

File tree

4 files changed

+149
-109
lines changed

4 files changed

+149
-109
lines changed

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub use self::select::{select, select_early_exit, Select};
104104

105105
mod select_with_strategy;
106106
pub use self::select_with_strategy::{
107-
select_with_strategy, ExitStrategy, PollNext, SelectWithStrategy,
107+
select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy,
108108
};
109109

110110
mod unfold;

futures-util/src/stream/select.rs

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::assert_stream;
2-
use crate::stream::{select_with_strategy, ExitStrategy, PollNext, SelectWithStrategy};
2+
use crate::stream::{
3+
select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy,
4+
};
35
use core::pin::Pin;
46
use futures_core::stream::{FusedStream, Stream};
57
use futures_core::task::{Context, Poll};
@@ -9,12 +11,42 @@ pin_project! {
911
/// Stream for the [`select()`] function.
1012
#[derive(Debug)]
1113
#[must_use = "streams do nothing unless polled"]
12-
pub struct Select<St1, St2> {
14+
pub struct Select<St1, St2, Exit> {
1315
#[pin]
14-
inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>,
16+
inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext, Exit>,
1517
}
1618
}
1719

20+
#[derive(Debug)]
21+
pub struct ExitWhenBothFinished {}
22+
23+
impl ExitStrategy for ExitWhenBothFinished {
24+
#[inline]
25+
fn is_terminated(closed_streams: ClosedStreams) -> bool {
26+
match closed_streams {
27+
ClosedStreams::Both => true,
28+
_ => false,
29+
}
30+
}
31+
}
32+
33+
#[derive(Debug)]
34+
pub struct ExitWhenEitherFinished {}
35+
36+
impl ExitStrategy for ExitWhenEitherFinished {
37+
#[inline]
38+
fn is_terminated(closed_streams: ClosedStreams) -> bool {
39+
match closed_streams {
40+
ClosedStreams::None => false,
41+
_ => true,
42+
}
43+
}
44+
}
45+
46+
fn round_robin(last: &mut PollNext) -> PollNext {
47+
last.toggle()
48+
}
49+
1850
/// This function will attempt to pull items from both streams. Each
1951
/// stream will be polled in a round-robin fashion, and whenever a stream is
2052
/// ready to yield an item that item is yielded.
@@ -44,42 +76,31 @@ pin_project! {
4476
/// }
4577
/// # });
4678
/// ```
47-
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
79+
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2, ExitWhenBothFinished>
4880
where
4981
St1: Stream,
5082
St2: Stream<Item = St1::Item>,
5183
{
52-
select_with_exit(stream1, stream2, ExitStrategy::WhenBothFinish)
84+
assert_stream::<St1::Item, _>(Select {
85+
inner: select_with_strategy(stream1, stream2, round_robin),
86+
})
5387
}
5488

5589
/// Same as `select`, but finishes when either stream finishes
56-
pub fn select_early_exit<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
57-
where
58-
St1: Stream,
59-
St2: Stream<Item = St1::Item>,
60-
{
61-
select_with_exit(stream1, stream2, ExitStrategy::WhenEitherFinish)
62-
}
63-
64-
fn select_with_exit<St1, St2>(
90+
pub fn select_early_exit<St1, St2>(
6591
stream1: St1,
6692
stream2: St2,
67-
exit_strategy: ExitStrategy,
68-
) -> Select<St1, St2>
93+
) -> Select<St1, St2, ExitWhenEitherFinished>
6994
where
7095
St1: Stream,
7196
St2: Stream<Item = St1::Item>,
7297
{
73-
fn round_robin(last: &mut PollNext) -> PollNext {
74-
last.toggle()
75-
}
76-
7798
assert_stream::<St1::Item, _>(Select {
78-
inner: select_with_strategy(stream1, stream2, round_robin, exit_strategy),
99+
inner: select_with_strategy(stream1, stream2, round_robin),
79100
})
80101
}
81102

82-
impl<St1, St2> Select<St1, St2> {
103+
impl<St1, St2, Exit> Select<St1, St2, Exit> {
83104
/// Acquires a reference to the underlying streams that this combinator is
84105
/// pulling from.
85106
pub fn get_ref(&self) -> (&St1, &St2) {
@@ -114,20 +135,22 @@ impl<St1, St2> Select<St1, St2> {
114135
}
115136
}
116137

117-
impl<St1, St2> FusedStream for Select<St1, St2>
138+
impl<St1, St2, Exit> FusedStream for Select<St1, St2, Exit>
118139
where
119140
St1: Stream,
120141
St2: Stream<Item = St1::Item>,
142+
Exit: ExitStrategy,
121143
{
122144
fn is_terminated(&self) -> bool {
123145
self.inner.is_terminated()
124146
}
125147
}
126148

127-
impl<St1, St2> Stream for Select<St1, St2>
149+
impl<St1, St2, Exit> Stream for Select<St1, St2, Exit>
128150
where
129151
St1: Stream,
130152
St2: Stream<Item = St1::Item>,
153+
Exit: ExitStrategy,
131154
{
132155
type Item = St1::Item;
133156

0 commit comments

Comments
 (0)