Skip to content

Commit 0b29f22

Browse files
najamelantaiki-e
authored andcommitted
Introduce stream::select_with_strategy (#2450)
1 parent 64f0050 commit 0b29f22

File tree

3 files changed

+266
-46
lines changed

3 files changed

+266
-46
lines changed

futures-util/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ pub use self::poll_fn::{poll_fn, PollFn};
9191
mod select;
9292
pub use self::select::{select, Select};
9393

94+
mod select_with_strategy;
95+
pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithStrategy};
96+
9497
mod unfold;
9598
pub use self::unfold::{unfold, Unfold};
9699

futures-util/src/stream/select.rs

Lines changed: 33 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::assert_stream;
2-
use crate::stream::{Fuse, StreamExt};
2+
use crate::stream::{select_with_strategy, PollNext, SelectWithStrategy};
33
use core::pin::Pin;
44
use futures_core::stream::{FusedStream, Stream};
55
use futures_core::task::{Context, Poll};
@@ -11,40 +11,58 @@ pin_project! {
1111
#[must_use = "streams do nothing unless polled"]
1212
pub struct Select<St1, St2> {
1313
#[pin]
14-
stream1: Fuse<St1>,
15-
#[pin]
16-
stream2: Fuse<St2>,
17-
flag: bool,
14+
inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>,
1815
}
1916
}
2017

2118
/// This function will attempt to pull items from both streams. Each
2219
/// stream will be polled in a round-robin fashion, and whenever a stream is
2320
/// ready to yield an item that item is yielded.
2421
///
25-
/// After one of the two input stream completes, the remaining one will be
22+
/// After one of the two input streams completes, the remaining one will be
2623
/// polled exclusively. The returned stream completes when both input
2724
/// streams have completed.
2825
///
2926
/// Note that this function consumes both streams and returns a wrapped
3027
/// version of them.
28+
///
29+
/// ## Examples
30+
///
31+
/// ```rust
32+
/// # futures::executor::block_on(async {
33+
/// use futures::stream::{ repeat, select, StreamExt };
34+
///
35+
/// let left = repeat(1);
36+
/// let right = repeat(2);
37+
///
38+
/// let mut out = select(left, right);
39+
///
40+
/// for _ in 0..100 {
41+
/// // We should be alternating.
42+
/// assert_eq!(1, out.select_next_some().await);
43+
/// assert_eq!(2, out.select_next_some().await);
44+
/// }
45+
/// # });
46+
/// ```
3147
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
3248
where
3349
St1: Stream,
3450
St2: Stream<Item = St1::Item>,
3551
{
52+
fn round_robin(last: &mut PollNext) -> PollNext {
53+
last.toggle()
54+
}
55+
3656
assert_stream::<St1::Item, _>(Select {
37-
stream1: stream1.fuse(),
38-
stream2: stream2.fuse(),
39-
flag: false,
57+
inner: select_with_strategy(stream1, stream2, round_robin),
4058
})
4159
}
4260

4361
impl<St1, St2> Select<St1, St2> {
4462
/// Acquires a reference to the underlying streams that this combinator is
4563
/// pulling from.
4664
pub fn get_ref(&self) -> (&St1, &St2) {
47-
(self.stream1.get_ref(), self.stream2.get_ref())
65+
self.inner.get_ref()
4866
}
4967

5068
/// Acquires a mutable reference to the underlying streams that this
@@ -53,7 +71,7 @@ impl<St1, St2> Select<St1, St2> {
5371
/// Note that care must be taken to avoid tampering with the state of the
5472
/// stream which may otherwise confuse this combinator.
5573
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
56-
(self.stream1.get_mut(), self.stream2.get_mut())
74+
self.inner.get_mut()
5775
}
5876

5977
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -63,15 +81,15 @@ impl<St1, St2> Select<St1, St2> {
6381
/// stream which may otherwise confuse this combinator.
6482
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
6583
let this = self.project();
66-
(this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
84+
this.inner.get_pin_mut()
6785
}
6886

6987
/// Consumes this combinator, returning the underlying streams.
7088
///
7189
/// Note that this may discard intermediate state of this combinator, so
7290
/// care should be taken to avoid losing resources when this is called.
7391
pub fn into_inner(self) -> (St1, St2) {
74-
(self.stream1.into_inner(), self.stream2.into_inner())
92+
self.inner.into_inner()
7593
}
7694
}
7795

@@ -81,7 +99,7 @@ where
8199
St2: Stream<Item = St1::Item>,
82100
{
83101
fn is_terminated(&self) -> bool {
84-
self.stream1.is_terminated() && self.stream2.is_terminated()
102+
self.inner.is_terminated()
85103
}
86104
}
87105

@@ -94,37 +112,6 @@ where
94112

95113
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
96114
let this = self.project();
97-
if !*this.flag {
98-
poll_inner(this.flag, this.stream1, this.stream2, cx)
99-
} else {
100-
poll_inner(this.flag, this.stream2, this.stream1, cx)
101-
}
102-
}
103-
}
104-
105-
fn poll_inner<St1, St2>(
106-
flag: &mut bool,
107-
a: Pin<&mut St1>,
108-
b: Pin<&mut St2>,
109-
cx: &mut Context<'_>,
110-
) -> Poll<Option<St1::Item>>
111-
where
112-
St1: Stream,
113-
St2: Stream<Item = St1::Item>,
114-
{
115-
let a_done = match a.poll_next(cx) {
116-
Poll::Ready(Some(item)) => {
117-
// give the other stream a chance to go first next time
118-
*flag = !*flag;
119-
return Poll::Ready(Some(item));
120-
}
121-
Poll::Ready(None) => true,
122-
Poll::Pending => false,
123-
};
124-
125-
match b.poll_next(cx) {
126-
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
127-
Poll::Ready(None) if a_done => Poll::Ready(None),
128-
Poll::Ready(None) | Poll::Pending => Poll::Pending,
115+
this.inner.poll_next(cx)
129116
}
130117
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
use super::assert_stream;
2+
use crate::stream::{Fuse, StreamExt};
3+
use core::{fmt, pin::Pin};
4+
use futures_core::stream::{FusedStream, Stream};
5+
use futures_core::task::{Context, Poll};
6+
use pin_project_lite::pin_project;
7+
8+
/// Type to tell [`SelectWithStrategy`] which stream to poll next.
9+
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
10+
pub enum PollNext {
11+
/// Poll the first stream.
12+
Left,
13+
/// Poll the second stream.
14+
Right,
15+
}
16+
17+
impl PollNext {
18+
/// Toggle the value and return the old one.
19+
pub fn toggle(&mut self) -> Self {
20+
let old = *self;
21+
22+
match self {
23+
PollNext::Left => *self = PollNext::Right,
24+
PollNext::Right => *self = PollNext::Left,
25+
}
26+
27+
old
28+
}
29+
}
30+
31+
impl Default for PollNext {
32+
fn default() -> Self {
33+
PollNext::Left
34+
}
35+
}
36+
37+
pin_project! {
38+
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
39+
#[must_use = "streams do nothing unless polled"]
40+
pub struct SelectWithStrategy<St1, St2, Clos, State> {
41+
#[pin]
42+
stream1: Fuse<St1>,
43+
#[pin]
44+
stream2: Fuse<St2>,
45+
state: State,
46+
clos: Clos,
47+
}
48+
}
49+
50+
/// This function will attempt to pull items from both streams. You provide a
51+
/// closure to tell [`SelectWithStrategy`] which stream to poll. The closure can
52+
/// store state on `SelectWithStrategy` to which it will receive a `&mut` on every
53+
/// invocation. This allows basing the strategy on prior choices.
54+
///
55+
/// After one of the two input streams completes, the remaining one will be
56+
/// polled exclusively. The returned stream completes when both input
57+
/// streams have completed.
58+
///
59+
/// Note that this function consumes both streams and returns a wrapped
60+
/// version of them.
61+
///
62+
/// ## Examples
63+
///
64+
/// ### Priority
65+
/// This example shows how to always prioritize the left stream.
66+
///
67+
/// ```rust
68+
/// # futures::executor::block_on(async {
69+
/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
70+
///
71+
/// let left = repeat(1);
72+
/// let right = repeat(2);
73+
///
74+
/// // We don't need any state, so let's make it an empty tuple.
75+
/// // We must provide some type here, as there is no way for the compiler
76+
/// // to infer it. As we don't need to capture variables, we can just
77+
/// // use a function pointer instead of a closure.
78+
/// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
79+
///
80+
/// let mut out = select_with_strategy(left, right, prio_left);
81+
///
82+
/// for _ in 0..100 {
83+
/// // Whenever we poll out, we will alwas get `1`.
84+
/// assert_eq!(1, out.select_next_some().await);
85+
/// }
86+
/// # });
87+
/// ```
88+
///
89+
/// ### Round Robin
90+
/// This example shows how to select from both streams round robin.
91+
/// Note: this special case is provided by [`futures-util::stream::select`].
92+
///
93+
/// ```rust
94+
/// # futures::executor::block_on(async {
95+
/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
96+
///
97+
/// let left = repeat(1);
98+
/// let right = repeat(2);
99+
///
100+
/// // We don't need any state, so let's make it an empty tuple.
101+
/// let rrobin = |last: &mut PollNext| last.toggle();
102+
///
103+
/// let mut out = select_with_strategy(left, right, rrobin);
104+
///
105+
/// for _ in 0..100 {
106+
/// // We should be alternating now.
107+
/// assert_eq!(1, out.select_next_some().await);
108+
/// assert_eq!(2, out.select_next_some().await);
109+
/// }
110+
/// # });
111+
/// ```
112+
pub fn select_with_strategy<St1, St2, Clos, State>(
113+
stream1: St1,
114+
stream2: St2,
115+
which: Clos,
116+
) -> SelectWithStrategy<St1, St2, Clos, State>
117+
where
118+
St1: Stream,
119+
St2: Stream<Item = St1::Item>,
120+
Clos: FnMut(&mut State) -> PollNext,
121+
State: Default,
122+
{
123+
assert_stream::<St1::Item, _>(SelectWithStrategy {
124+
stream1: stream1.fuse(),
125+
stream2: stream2.fuse(),
126+
state: Default::default(),
127+
clos: which,
128+
})
129+
}
130+
131+
impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
132+
/// Acquires a reference to the underlying streams that this combinator is
133+
/// pulling from.
134+
pub fn get_ref(&self) -> (&St1, &St2) {
135+
(self.stream1.get_ref(), self.stream2.get_ref())
136+
}
137+
138+
/// Acquires a mutable reference to the underlying streams that this
139+
/// combinator is pulling from.
140+
///
141+
/// Note that care must be taken to avoid tampering with the state of the
142+
/// stream which may otherwise confuse this combinator.
143+
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
144+
(self.stream1.get_mut(), self.stream2.get_mut())
145+
}
146+
147+
/// Acquires a pinned mutable reference to the underlying streams that this
148+
/// combinator is pulling from.
149+
///
150+
/// Note that care must be taken to avoid tampering with the state of the
151+
/// stream which may otherwise confuse this combinator.
152+
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
153+
let this = self.project();
154+
(this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
155+
}
156+
157+
/// Consumes this combinator, returning the underlying streams.
158+
///
159+
/// Note that this may discard intermediate state of this combinator, so
160+
/// care should be taken to avoid losing resources when this is called.
161+
pub fn into_inner(self) -> (St1, St2) {
162+
(self.stream1.into_inner(), self.stream2.into_inner())
163+
}
164+
}
165+
166+
impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State>
167+
where
168+
St1: Stream,
169+
St2: Stream<Item = St1::Item>,
170+
Clos: FnMut(&mut State) -> PollNext,
171+
{
172+
fn is_terminated(&self) -> bool {
173+
self.stream1.is_terminated() && self.stream2.is_terminated()
174+
}
175+
}
176+
177+
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
178+
where
179+
St1: Stream,
180+
St2: Stream<Item = St1::Item>,
181+
Clos: FnMut(&mut State) -> PollNext,
182+
{
183+
type Item = St1::Item;
184+
185+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
186+
let this = self.project();
187+
188+
match (this.clos)(this.state) {
189+
PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
190+
PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
191+
}
192+
}
193+
}
194+
195+
fn poll_inner<St1, St2>(
196+
a: Pin<&mut St1>,
197+
b: Pin<&mut St2>,
198+
cx: &mut Context<'_>,
199+
) -> Poll<Option<St1::Item>>
200+
where
201+
St1: Stream,
202+
St2: Stream<Item = St1::Item>,
203+
{
204+
let a_done = match a.poll_next(cx) {
205+
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
206+
Poll::Ready(None) => true,
207+
Poll::Pending => false,
208+
};
209+
210+
match b.poll_next(cx) {
211+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
212+
Poll::Ready(None) if a_done => Poll::Ready(None),
213+
Poll::Ready(None) | Poll::Pending => Poll::Pending,
214+
}
215+
}
216+
217+
impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State>
218+
where
219+
St1: fmt::Debug,
220+
St2: fmt::Debug,
221+
State: fmt::Debug,
222+
{
223+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224+
f.debug_struct("SelectWithStrategy")
225+
.field("stream1", &self.stream1)
226+
.field("stream2", &self.stream2)
227+
.field("state", &self.state)
228+
.finish()
229+
}
230+
}

0 commit comments

Comments
 (0)