Skip to content

Commit c5a09be

Browse files
414owentaiki-e
authored andcommitted
Remove Fuses from select, and only poll non-terminated streams (#2583)
1 parent 00bee71 commit c5a09be

File tree

2 files changed

+106
-36
lines changed

2 files changed

+106
-36
lines changed

futures-util/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ memchr = { version = "2.2", optional = true }
4545
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
4646
tokio-io = { version = "0.1.9", optional = true }
4747
pin-utils = "0.1.0"
48-
pin-project-lite = "0.2.4"
48+
pin-project-lite = "0.2.6"
4949

5050
[dev-dependencies]
5151
futures = { path = "../futures", features = ["async-await", "thread-pool"] }

futures-util/src/stream/select_with_strategy.rs

Lines changed: 105 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use super::assert_stream;
2-
use crate::stream::{Fuse, StreamExt};
32
use core::{fmt, pin::Pin};
43
use futures_core::stream::{FusedStream, Stream};
54
use futures_core::task::{Context, Poll};
@@ -18,13 +17,15 @@ impl PollNext {
1817
/// Toggle the value and return the old one.
1918
pub fn toggle(&mut self) -> Self {
2019
let old = *self;
20+
*self = self.other();
21+
old
22+
}
2123

24+
fn other(&self) -> PollNext {
2225
match self {
23-
PollNext::Left => *self = PollNext::Right,
24-
PollNext::Right => *self = PollNext::Left,
26+
PollNext::Left => PollNext::Right,
27+
PollNext::Right => PollNext::Left,
2528
}
26-
27-
old
2829
}
2930
}
3031

@@ -34,14 +35,41 @@ impl Default for PollNext {
3435
}
3536
}
3637

38+
enum InternalState {
39+
Start,
40+
LeftFinished,
41+
RightFinished,
42+
BothFinished,
43+
}
44+
45+
impl InternalState {
46+
fn finish(&mut self, ps: PollNext) {
47+
match (&self, ps) {
48+
(InternalState::Start, PollNext::Left) => {
49+
*self = InternalState::LeftFinished;
50+
}
51+
(InternalState::Start, PollNext::Right) => {
52+
*self = InternalState::RightFinished;
53+
}
54+
(InternalState::LeftFinished, PollNext::Right)
55+
| (InternalState::RightFinished, PollNext::Left) => {
56+
*self = InternalState::BothFinished;
57+
}
58+
_ => {}
59+
}
60+
}
61+
}
62+
3763
pin_project! {
3864
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
3965
#[must_use = "streams do nothing unless polled"]
66+
#[project = SelectWithStrategyProj]
4067
pub struct SelectWithStrategy<St1, St2, Clos, State> {
4168
#[pin]
42-
stream1: Fuse<St1>,
69+
stream1: St1,
4370
#[pin]
44-
stream2: Fuse<St2>,
71+
stream2: St2,
72+
internal_state: InternalState,
4573
state: State,
4674
clos: Clos,
4775
}
@@ -120,9 +148,10 @@ where
120148
State: Default,
121149
{
122150
assert_stream::<St1::Item, _>(SelectWithStrategy {
123-
stream1: stream1.fuse(),
124-
stream2: stream2.fuse(),
151+
stream1,
152+
stream2,
125153
state: Default::default(),
154+
internal_state: InternalState::Start,
126155
clos: which,
127156
})
128157
}
@@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
131160
/// Acquires a reference to the underlying streams that this combinator is
132161
/// pulling from.
133162
pub fn get_ref(&self) -> (&St1, &St2) {
134-
(self.stream1.get_ref(), self.stream2.get_ref())
163+
(&self.stream1, &self.stream2)
135164
}
136165

137166
/// Acquires a mutable reference to the underlying streams that this
@@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
140169
/// Note that care must be taken to avoid tampering with the state of the
141170
/// stream which may otherwise confuse this combinator.
142171
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
143-
(self.stream1.get_mut(), self.stream2.get_mut())
172+
(&mut self.stream1, &mut self.stream2)
144173
}
145174

146175
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -150,15 +179,15 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
150179
/// stream which may otherwise confuse this combinator.
151180
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
152181
let this = self.project();
153-
(this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
182+
(this.stream1, this.stream2)
154183
}
155184

156185
/// Consumes this combinator, returning the underlying streams.
157186
///
158187
/// Note that this may discard intermediate state of this combinator, so
159188
/// care should be taken to avoid losing resources when this is called.
160189
pub fn into_inner(self) -> (St1, St2) {
161-
(self.stream1.into_inner(), self.stream2.into_inner())
190+
(self.stream1, self.stream2)
162191
}
163192
}
164193

@@ -169,47 +198,88 @@ where
169198
Clos: FnMut(&mut State) -> PollNext,
170199
{
171200
fn is_terminated(&self) -> bool {
172-
self.stream1.is_terminated() && self.stream2.is_terminated()
201+
match self.internal_state {
202+
InternalState::BothFinished => true,
203+
_ => false,
204+
}
173205
}
174206
}
175207

176-
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
208+
#[inline]
209+
fn poll_side<St1, St2, Clos, State>(
210+
select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
211+
side: PollNext,
212+
cx: &mut Context<'_>,
213+
) -> Poll<Option<St1::Item>>
177214
where
178215
St1: Stream,
179216
St2: Stream<Item = St1::Item>,
180-
Clos: FnMut(&mut State) -> PollNext,
181217
{
182-
type Item = St1::Item;
183-
184-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
185-
let this = self.project();
186-
187-
match (this.clos)(this.state) {
188-
PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
189-
PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
190-
}
218+
match side {
219+
PollNext::Left => select.stream1.as_mut().poll_next(cx),
220+
PollNext::Right => select.stream2.as_mut().poll_next(cx),
191221
}
192222
}
193223

194-
fn poll_inner<St1, St2>(
195-
a: Pin<&mut St1>,
196-
b: Pin<&mut St2>,
224+
#[inline]
225+
fn poll_inner<St1, St2, Clos, State>(
226+
select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
227+
side: PollNext,
197228
cx: &mut Context<'_>,
198229
) -> Poll<Option<St1::Item>>
199230
where
200231
St1: Stream,
201232
St2: Stream<Item = St1::Item>,
202233
{
203-
let a_done = match a.poll_next(cx) {
234+
match poll_side(select, side, cx) {
204235
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
205-
Poll::Ready(None) => true,
206-
Poll::Pending => false,
236+
Poll::Ready(None) => {
237+
select.internal_state.finish(side);
238+
}
239+
Poll::Pending => (),
207240
};
241+
let other = side.other();
242+
match poll_side(select, other, cx) {
243+
Poll::Ready(None) => {
244+
select.internal_state.finish(other);
245+
Poll::Ready(None)
246+
}
247+
a => a,
248+
}
249+
}
250+
251+
impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
252+
where
253+
St1: Stream,
254+
St2: Stream<Item = St1::Item>,
255+
Clos: FnMut(&mut State) -> PollNext,
256+
{
257+
type Item = St1::Item;
258+
259+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
260+
let mut this = self.project();
208261

209-
match b.poll_next(cx) {
210-
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
211-
Poll::Ready(None) if a_done => Poll::Ready(None),
212-
Poll::Ready(None) | Poll::Pending => Poll::Pending,
262+
match this.internal_state {
263+
InternalState::Start => {
264+
let next_side = (this.clos)(this.state);
265+
poll_inner(&mut this, next_side, cx)
266+
}
267+
InternalState::LeftFinished => match this.stream2.poll_next(cx) {
268+
Poll::Ready(None) => {
269+
*this.internal_state = InternalState::BothFinished;
270+
Poll::Ready(None)
271+
}
272+
a => a,
273+
},
274+
InternalState::RightFinished => match this.stream1.poll_next(cx) {
275+
Poll::Ready(None) => {
276+
*this.internal_state = InternalState::BothFinished;
277+
Poll::Ready(None)
278+
}
279+
a => a,
280+
},
281+
InternalState::BothFinished => Poll::Ready(None),
282+
}
213283
}
214284
}
215285

0 commit comments

Comments
 (0)