1
1
use super :: assert_stream;
2
- use crate :: stream:: { Fuse , StreamExt } ;
3
2
use core:: { fmt, pin:: Pin } ;
4
3
use futures_core:: stream:: { FusedStream , Stream } ;
5
4
use futures_core:: task:: { Context , Poll } ;
@@ -18,13 +17,15 @@ impl PollNext {
18
17
/// Toggle the value and return the old one.
19
18
pub fn toggle ( & mut self ) -> Self {
20
19
let old = * self ;
20
+ * self = self . other ( ) ;
21
+ old
22
+ }
21
23
24
+ fn other ( & self ) -> PollNext {
22
25
match self {
23
- PollNext :: Left => * self = PollNext :: Right ,
24
- PollNext :: Right => * self = PollNext :: Left ,
26
+ PollNext :: Left => PollNext :: Right ,
27
+ PollNext :: Right => PollNext :: Left ,
25
28
}
26
-
27
- old
28
29
}
29
30
}
30
31
@@ -34,14 +35,41 @@ impl Default for PollNext {
34
35
}
35
36
}
36
37
38
+ enum InternalState {
39
+ Start ,
40
+ LeftFinished ,
41
+ RightFinished ,
42
+ BothFinished ,
43
+ }
44
+
45
+ impl InternalState {
46
+ fn finish ( & mut self , ps : PollNext ) {
47
+ match ( & self , ps) {
48
+ ( InternalState :: Start , PollNext :: Left ) => {
49
+ * self = InternalState :: LeftFinished ;
50
+ }
51
+ ( InternalState :: Start , PollNext :: Right ) => {
52
+ * self = InternalState :: RightFinished ;
53
+ }
54
+ ( InternalState :: LeftFinished , PollNext :: Right )
55
+ | ( InternalState :: RightFinished , PollNext :: Left ) => {
56
+ * self = InternalState :: BothFinished ;
57
+ }
58
+ _ => { }
59
+ }
60
+ }
61
+ }
62
+
37
63
pin_project ! {
38
64
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
39
65
#[ must_use = "streams do nothing unless polled" ]
66
+ #[ project = SelectWithStrategyProj ]
40
67
pub struct SelectWithStrategy <St1 , St2 , Clos , State > {
41
68
#[ pin]
42
- stream1: Fuse < St1 > ,
69
+ stream1: St1 ,
43
70
#[ pin]
44
- stream2: Fuse <St2 >,
71
+ stream2: St2 ,
72
+ internal_state: InternalState ,
45
73
state: State ,
46
74
clos: Clos ,
47
75
}
@@ -120,9 +148,10 @@ where
120
148
State : Default ,
121
149
{
122
150
assert_stream :: < St1 :: Item , _ > ( SelectWithStrategy {
123
- stream1 : stream1 . fuse ( ) ,
124
- stream2 : stream2 . fuse ( ) ,
151
+ stream1,
152
+ stream2,
125
153
state : Default :: default ( ) ,
154
+ internal_state : InternalState :: Start ,
126
155
clos : which,
127
156
} )
128
157
}
@@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
131
160
/// Acquires a reference to the underlying streams that this combinator is
132
161
/// pulling from.
133
162
pub fn get_ref ( & self ) -> ( & St1 , & St2 ) {
134
- ( self . stream1 . get_ref ( ) , self . stream2 . get_ref ( ) )
163
+ ( & self . stream1 , & self . stream2 )
135
164
}
136
165
137
166
/// Acquires a mutable reference to the underlying streams that this
@@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
140
169
/// Note that care must be taken to avoid tampering with the state of the
141
170
/// stream which may otherwise confuse this combinator.
142
171
pub fn get_mut ( & mut self ) -> ( & mut St1 , & mut St2 ) {
143
- ( self . stream1 . get_mut ( ) , self . stream2 . get_mut ( ) )
172
+ ( & mut self . stream1 , & mut self . stream2 )
144
173
}
145
174
146
175
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -150,15 +179,15 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
150
179
/// stream which may otherwise confuse this combinator.
151
180
pub fn get_pin_mut ( self : Pin < & mut Self > ) -> ( Pin < & mut St1 > , Pin < & mut St2 > ) {
152
181
let this = self . project ( ) ;
153
- ( this. stream1 . get_pin_mut ( ) , this. stream2 . get_pin_mut ( ) )
182
+ ( this. stream1 , this. stream2 )
154
183
}
155
184
156
185
/// Consumes this combinator, returning the underlying streams.
157
186
///
158
187
/// Note that this may discard intermediate state of this combinator, so
159
188
/// care should be taken to avoid losing resources when this is called.
160
189
pub fn into_inner ( self ) -> ( St1 , St2 ) {
161
- ( self . stream1 . into_inner ( ) , self . stream2 . into_inner ( ) )
190
+ ( self . stream1 , self . stream2 )
162
191
}
163
192
}
164
193
@@ -169,47 +198,88 @@ where
169
198
Clos : FnMut ( & mut State ) -> PollNext ,
170
199
{
171
200
fn is_terminated ( & self ) -> bool {
172
- self . stream1 . is_terminated ( ) && self . stream2 . is_terminated ( )
201
+ match self . internal_state {
202
+ InternalState :: BothFinished => true ,
203
+ _ => false ,
204
+ }
173
205
}
174
206
}
175
207
176
- impl < St1 , St2 , Clos , State > Stream for SelectWithStrategy < St1 , St2 , Clos , State >
208
+ #[ inline]
209
+ fn poll_side < St1 , St2 , Clos , State > (
210
+ select : & mut SelectWithStrategyProj < ' _ , St1 , St2 , Clos , State > ,
211
+ side : PollNext ,
212
+ cx : & mut Context < ' _ > ,
213
+ ) -> Poll < Option < St1 :: Item > >
177
214
where
178
215
St1 : Stream ,
179
216
St2 : Stream < Item = St1 :: Item > ,
180
- Clos : FnMut ( & mut State ) -> PollNext ,
181
217
{
182
- type Item = St1 :: Item ;
183
-
184
- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < St1 :: Item > > {
185
- let this = self . project ( ) ;
186
-
187
- match ( this. clos ) ( this. state ) {
188
- PollNext :: Left => poll_inner ( this. stream1 , this. stream2 , cx) ,
189
- PollNext :: Right => poll_inner ( this. stream2 , this. stream1 , cx) ,
190
- }
218
+ match side {
219
+ PollNext :: Left => select. stream1 . as_mut ( ) . poll_next ( cx) ,
220
+ PollNext :: Right => select. stream2 . as_mut ( ) . poll_next ( cx) ,
191
221
}
192
222
}
193
223
194
- fn poll_inner < St1 , St2 > (
195
- a : Pin < & mut St1 > ,
196
- b : Pin < & mut St2 > ,
224
+ #[ inline]
225
+ fn poll_inner < St1 , St2 , Clos , State > (
226
+ select : & mut SelectWithStrategyProj < ' _ , St1 , St2 , Clos , State > ,
227
+ side : PollNext ,
197
228
cx : & mut Context < ' _ > ,
198
229
) -> Poll < Option < St1 :: Item > >
199
230
where
200
231
St1 : Stream ,
201
232
St2 : Stream < Item = St1 :: Item > ,
202
233
{
203
- let a_done = match a . poll_next ( cx) {
234
+ match poll_side ( select , side , cx) {
204
235
Poll :: Ready ( Some ( item) ) => return Poll :: Ready ( Some ( item) ) ,
205
- Poll :: Ready ( None ) => true ,
206
- Poll :: Pending => false ,
236
+ Poll :: Ready ( None ) => {
237
+ select. internal_state . finish ( side) ;
238
+ }
239
+ Poll :: Pending => ( ) ,
207
240
} ;
241
+ let other = side. other ( ) ;
242
+ match poll_side ( select, other, cx) {
243
+ Poll :: Ready ( None ) => {
244
+ select. internal_state . finish ( other) ;
245
+ Poll :: Ready ( None )
246
+ }
247
+ a => a,
248
+ }
249
+ }
250
+
251
+ impl < St1 , St2 , Clos , State > Stream for SelectWithStrategy < St1 , St2 , Clos , State >
252
+ where
253
+ St1 : Stream ,
254
+ St2 : Stream < Item = St1 :: Item > ,
255
+ Clos : FnMut ( & mut State ) -> PollNext ,
256
+ {
257
+ type Item = St1 :: Item ;
258
+
259
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < St1 :: Item > > {
260
+ let mut this = self . project ( ) ;
208
261
209
- match b. poll_next ( cx) {
210
- Poll :: Ready ( Some ( item) ) => Poll :: Ready ( Some ( item) ) ,
211
- Poll :: Ready ( None ) if a_done => Poll :: Ready ( None ) ,
212
- Poll :: Ready ( None ) | Poll :: Pending => Poll :: Pending ,
262
+ match this. internal_state {
263
+ InternalState :: Start => {
264
+ let next_side = ( this. clos ) ( this. state ) ;
265
+ poll_inner ( & mut this, next_side, cx)
266
+ }
267
+ InternalState :: LeftFinished => match this. stream2 . poll_next ( cx) {
268
+ Poll :: Ready ( None ) => {
269
+ * this. internal_state = InternalState :: BothFinished ;
270
+ Poll :: Ready ( None )
271
+ }
272
+ a => a,
273
+ } ,
274
+ InternalState :: RightFinished => match this. stream1 . poll_next ( cx) {
275
+ Poll :: Ready ( None ) => {
276
+ * this. internal_state = InternalState :: BothFinished ;
277
+ Poll :: Ready ( None )
278
+ }
279
+ a => a,
280
+ } ,
281
+ InternalState :: BothFinished => Poll :: Ready ( None ) ,
282
+ }
213
283
}
214
284
}
215
285
0 commit comments