Skip to content

Commit 2e30ec3

Browse files
authored
Remove Fuses from select, and only poll non-terminated streams (#2583)
1 parent 1495f64 commit 2e30ec3

File tree

2 files changed

+106
-36
lines changed

2 files changed

+106
-36
lines changed

futures-util/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ memchr = { version = "2.2", optional = true }
4141
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
4242
tokio-io = { version = "0.1.9", optional = true }
4343
pin-utils = "0.1.0"
44-
pin-project-lite = "0.2.4"
44+
pin-project-lite = "0.2.6"
4545

4646
[dev-dependencies]
4747
futures = { path = "../futures", features = ["async-await", "thread-pool"] }

futures-util/src/stream/select_with_strategy.rs

Lines changed: 105 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use super::assert_stream;
2-
use crate::stream::{Fuse, StreamExt};
32
use core::{fmt, pin::Pin};
43
use futures_core::stream::{FusedStream, Stream};
54
use futures_core::task::{Context, Poll};
@@ -19,13 +18,15 @@ impl PollNext {
1918
#[must_use]
2019
pub fn toggle(&mut self) -> Self {
2120
let old = *self;
21+
*self = self.other();
22+
old
23+
}
2224

25+
fn other(&self) -> PollNext {
2326
match self {
24-
PollNext::Left => *self = PollNext::Right,
25-
PollNext::Right => *self = PollNext::Left,
27+
PollNext::Left => PollNext::Right,
28+
PollNext::Right => PollNext::Left,
2629
}
27-
28-
old
2930
}
3031
}
3132

@@ -35,14 +36,41 @@ impl Default for PollNext {
3536
}
3637
}
3738

39+
enum InternalState {
40+
Start,
41+
LeftFinished,
42+
RightFinished,
43+
BothFinished,
44+
}
45+
46+
impl InternalState {
47+
fn finish(&mut self, ps: PollNext) {
48+
match (&self, ps) {
49+
(InternalState::Start, PollNext::Left) => {
50+
*self = InternalState::LeftFinished;
51+
}
52+
(InternalState::Start, PollNext::Right) => {
53+
*self = InternalState::RightFinished;
54+
}
55+
(InternalState::LeftFinished, PollNext::Right)
56+
| (InternalState::RightFinished, PollNext::Left) => {
57+
*self = InternalState::BothFinished;
58+
}
59+
_ => {}
60+
}
61+
}
62+
}
63+
3864
pin_project! {
3965
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
4066
#[must_use = "streams do nothing unless polled"]
67+
#[project = SelectWithStrategyProj]
4168
pub struct SelectWithStrategy<St1, St2, Clos, State> {
4269
#[pin]
43-
stream1: Fuse<St1>,
70+
stream1: St1,
4471
#[pin]
45-
stream2: Fuse<St2>,
72+
stream2: St2,
73+
internal_state: InternalState,
4674
state: State,
4775
clos: Clos,
4876
}
@@ -121,9 +149,10 @@ where
121149
State: Default,
122150
{
123151
assert_stream::<St1::Item, _>(SelectWithStrategy {
124-
stream1: stream1.fuse(),
125-
stream2: stream2.fuse(),
152+
stream1,
153+
stream2,
126154
state: Default::default(),
155+
internal_state: InternalState::Start,
127156
clos: which,
128157
})
129158
}
@@ -132,7 +161,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
132161
/// Acquires a reference to the underlying streams that this combinator is
133162
/// pulling from.
134163
pub fn get_ref(&self) -> (&St1, &St2) {
135-
(self.stream1.get_ref(), self.stream2.get_ref())
164+
(&self.stream1, &self.stream2)
136165
}
137166

138167
/// Acquires a mutable reference to the underlying streams that this
@@ -141,7 +170,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
141170
/// Note that care must be taken to avoid tampering with the state of the
142171
/// stream which may otherwise confuse this combinator.
143172
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
144-
(self.stream1.get_mut(), self.stream2.get_mut())
173+
(&mut self.stream1, &mut self.stream2)
145174
}
146175

147176
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -151,15 +180,15 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
151180
/// stream which may otherwise confuse this combinator.
152181
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
153182
let this = self.project();
154-
(this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
183+
(this.stream1, this.stream2)
155184
}
156185

157186
/// Consumes this combinator, returning the underlying streams.
158187
///
159188
/// Note that this may discard intermediate state of this combinator, so
160189
/// care should be taken to avoid losing resources when this is called.
161190
pub fn into_inner(self) -> (St1, St2) {
162-
(self.stream1.into_inner(), self.stream2.into_inner())
191+
(self.stream1, self.stream2)
163192
}
164193
}
165194

@@ -170,47 +199,88 @@ where
170199
Clos: FnMut(&mut State) -> PollNext,
171200
{
172201
fn is_terminated(&self) -> bool {
173-
self.stream1.is_terminated() && self.stream2.is_terminated()
202+
match self.internal_state {
203+
InternalState::BothFinished => true,
204+
_ => false,
205+
}
174206
}
175207
}
176208

177-
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
209+
#[inline]
210+
fn poll_side<St1, St2, Clos, State>(
211+
select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
212+
side: PollNext,
213+
cx: &mut Context<'_>,
214+
) -> Poll<Option<St1::Item>>
178215
where
179216
St1: Stream,
180217
St2: Stream<Item = St1::Item>,
181-
Clos: FnMut(&mut State) -> PollNext,
182218
{
183-
type Item = St1::Item;
184-
185-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
186-
let this = self.project();
187-
188-
match (this.clos)(this.state) {
189-
PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
190-
PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
191-
}
219+
match side {
220+
PollNext::Left => select.stream1.as_mut().poll_next(cx),
221+
PollNext::Right => select.stream2.as_mut().poll_next(cx),
192222
}
193223
}
194224

195-
fn poll_inner<St1, St2>(
196-
a: Pin<&mut St1>,
197-
b: Pin<&mut St2>,
225+
#[inline]
226+
fn poll_inner<St1, St2, Clos, State>(
227+
select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
228+
side: PollNext,
198229
cx: &mut Context<'_>,
199230
) -> Poll<Option<St1::Item>>
200231
where
201232
St1: Stream,
202233
St2: Stream<Item = St1::Item>,
203234
{
204-
let a_done = match a.poll_next(cx) {
235+
match poll_side(select, side, cx) {
205236
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
206-
Poll::Ready(None) => true,
207-
Poll::Pending => false,
237+
Poll::Ready(None) => {
238+
select.internal_state.finish(side);
239+
}
240+
Poll::Pending => (),
208241
};
242+
let other = side.other();
243+
match poll_side(select, other, cx) {
244+
Poll::Ready(None) => {
245+
select.internal_state.finish(other);
246+
Poll::Ready(None)
247+
}
248+
a => a,
249+
}
250+
}
251+
252+
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
253+
where
254+
St1: Stream,
255+
St2: Stream<Item = St1::Item>,
256+
Clos: FnMut(&mut State) -> PollNext,
257+
{
258+
type Item = St1::Item;
259+
260+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
261+
let mut this = self.project();
209262

210-
match b.poll_next(cx) {
211-
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
212-
Poll::Ready(None) if a_done => Poll::Ready(None),
213-
Poll::Ready(None) | Poll::Pending => Poll::Pending,
263+
match this.internal_state {
264+
InternalState::Start => {
265+
let next_side = (this.clos)(this.state);
266+
poll_inner(&mut this, next_side, cx)
267+
}
268+
InternalState::LeftFinished => match this.stream2.poll_next(cx) {
269+
Poll::Ready(None) => {
270+
*this.internal_state = InternalState::BothFinished;
271+
Poll::Ready(None)
272+
}
273+
a => a,
274+
},
275+
InternalState::RightFinished => match this.stream1.poll_next(cx) {
276+
Poll::Ready(None) => {
277+
*this.internal_state = InternalState::BothFinished;
278+
Poll::Ready(None)
279+
}
280+
a => a,
281+
},
282+
InternalState::BothFinished => Poll::Ready(None),
283+
}
214284
}
215285
}
216286

0 commit comments

Comments
 (0)