Skip to content

Commit 8aefbed

Browse files
committed
Implement early-exit stream selection with a trait
1 parent b97d95e commit 8aefbed

File tree

4 files changed

+161
-110
lines changed

4 files changed

+161
-110
lines changed

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub use self::select::{select, select_early_exit, Select};
104104

105105
mod select_with_strategy;
106106
pub use self::select_with_strategy::{
107-
select_with_strategy, ExitStrategy, PollNext, SelectWithStrategy,
107+
select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy,
108108
};
109109

110110
mod unfold;

futures-util/src/stream/select.rs

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,54 @@
11
use super::assert_stream;
2-
use crate::stream::{select_with_strategy, ExitStrategy, PollNext, SelectWithStrategy};
2+
use crate::stream::{
3+
select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy,
4+
};
35
use core::pin::Pin;
46
use futures_core::stream::{FusedStream, Stream};
57
use futures_core::task::{Context, Poll};
68
use pin_project_lite::pin_project;
79

10+
type PollNextFn = fn(&mut PollNext) -> PollNext;
11+
812
pin_project! {
913
/// Stream for the [`select()`] function.
1014
#[derive(Debug)]
1115
#[must_use = "streams do nothing unless polled"]
12-
pub struct Select<St1, St2> {
16+
pub struct Select<St1, St2, Exit> {
1317
#[pin]
14-
inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>,
18+
inner: SelectWithStrategy<St1, St2, PollNextFn, PollNext, Exit>,
19+
}
20+
}
21+
22+
#[derive(Debug)]
23+
pub struct ExitWhenBothFinished {}
24+
25+
impl ExitStrategy for ExitWhenBothFinished {
26+
#[inline]
27+
fn is_terminated(closed_streams: ClosedStreams) -> bool {
28+
match closed_streams {
29+
ClosedStreams::Both => true,
30+
_ => false,
31+
}
1532
}
1633
}
1734

35+
#[derive(Debug)]
36+
pub struct ExitWhenEitherFinished {}
37+
38+
impl ExitStrategy for ExitWhenEitherFinished {
39+
#[inline]
40+
fn is_terminated(closed_streams: ClosedStreams) -> bool {
41+
match closed_streams {
42+
ClosedStreams::None => false,
43+
_ => true,
44+
}
45+
}
46+
}
47+
48+
fn round_robin(last: &mut PollNext) -> PollNext {
49+
last.toggle()
50+
}
51+
1852
/// This function will attempt to pull items from both streams. Each
1953
/// stream will be polled in a round-robin fashion, and whenever a stream is
2054
/// ready to yield an item that item is yielded.
@@ -44,42 +78,31 @@ pin_project! {
4478
/// }
4579
/// # });
4680
/// ```
47-
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
81+
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2, ExitWhenBothFinished>
4882
where
4983
St1: Stream,
5084
St2: Stream<Item = St1::Item>,
5185
{
52-
select_with_exit(stream1, stream2, ExitStrategy::WhenBothFinish)
86+
assert_stream::<St1::Item, _>(Select {
87+
inner: select_with_strategy(stream1, stream2, round_robin),
88+
})
5389
}
5490

5591
/// Same as `select`, but finishes when either stream finishes
56-
pub fn select_early_exit<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
57-
where
58-
St1: Stream,
59-
St2: Stream<Item = St1::Item>,
60-
{
61-
select_with_exit(stream1, stream2, ExitStrategy::WhenEitherFinish)
62-
}
63-
64-
fn select_with_exit<St1, St2>(
92+
pub fn select_early_exit<St1, St2>(
6593
stream1: St1,
6694
stream2: St2,
67-
exit_strategy: ExitStrategy,
68-
) -> Select<St1, St2>
95+
) -> Select<St1, St2, ExitWhenEitherFinished>
6996
where
7097
St1: Stream,
7198
St2: Stream<Item = St1::Item>,
7299
{
73-
fn round_robin(last: &mut PollNext) -> PollNext {
74-
last.toggle()
75-
}
76-
77100
assert_stream::<St1::Item, _>(Select {
78-
inner: select_with_strategy(stream1, stream2, round_robin, exit_strategy),
101+
inner: select_with_strategy(stream1, stream2, round_robin),
79102
})
80103
}
81104

82-
impl<St1, St2> Select<St1, St2> {
105+
impl<St1, St2, Exit> Select<St1, St2, Exit> {
83106
/// Acquires a reference to the underlying streams that this combinator is
84107
/// pulling from.
85108
pub fn get_ref(&self) -> (&St1, &St2) {
@@ -114,20 +137,22 @@ impl<St1, St2> Select<St1, St2> {
114137
}
115138
}
116139

117-
impl<St1, St2> FusedStream for Select<St1, St2>
140+
impl<St1, St2, Exit> FusedStream for Select<St1, St2, Exit>
118141
where
119142
St1: Stream,
120143
St2: Stream<Item = St1::Item>,
144+
Exit: ExitStrategy,
121145
{
122146
fn is_terminated(&self) -> bool {
123147
self.inner.is_terminated()
124148
}
125149
}
126150

127-
impl<St1, St2> Stream for Select<St1, St2>
151+
impl<St1, St2, Exit> Stream for Select<St1, St2, Exit>
128152
where
129153
St1: Stream,
130154
St2: Stream<Item = St1::Item>,
155+
Exit: ExitStrategy,
131156
{
132157
type Item = St1::Item;
133158

0 commit comments

Comments
 (0)