@@ -36,6 +36,7 @@ impl Default for PollNext {
36
36
}
37
37
}
38
38
39
+ #[ derive( PartialEq , Eq , Clone , Copy ) ]
39
40
enum InternalState {
40
41
Start ,
41
42
LeftFinished ,
@@ -61,6 +62,29 @@ impl InternalState {
61
62
}
62
63
}
63
64
65
+ /// Decides whether to exit when both streams are completed, or only one
66
+ /// is completed. If you need to exit when a specific stream has finished,
67
+ /// feel free to add a case here.
68
+ #[ derive( Clone , Copy , Debug ) ]
69
+ pub enum ExitStrategy {
70
+ /// Select stream finishes when both substreams finish
71
+ WhenBothFinish ,
72
+ /// Select stream finishes when either substream finishes
73
+ WhenEitherFinish ,
74
+ }
75
+
76
+ impl ExitStrategy {
77
+ #[ inline]
78
+ fn is_finished ( self , state : InternalState ) -> bool {
79
+ match ( state, self ) {
80
+ ( InternalState :: BothFinished , _) => true ,
81
+ ( InternalState :: Start , ExitStrategy :: WhenEitherFinish ) => false ,
82
+ ( _, ExitStrategy :: WhenBothFinish ) => false ,
83
+ _ => true ,
84
+ }
85
+ }
86
+ }
87
+
64
88
pin_project ! {
65
89
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
66
90
#[ must_use = "streams do nothing unless polled" ]
@@ -73,6 +97,7 @@ pin_project! {
73
97
internal_state: InternalState ,
74
98
state: State ,
75
99
clos: Clos ,
100
+ exit_strategy: ExitStrategy ,
76
101
}
77
102
}
78
103
@@ -95,7 +120,7 @@ pin_project! {
95
120
///
96
121
/// ```rust
97
122
/// # futures::executor::block_on(async {
98
- /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
123
+ /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt, ExitStrategy };
99
124
///
100
125
/// let left = repeat(1);
101
126
/// let right = repeat(2);
@@ -106,7 +131,7 @@ pin_project! {
106
131
/// // use a function pointer instead of a closure.
107
132
/// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
108
133
///
109
- /// let mut out = select_with_strategy(left, right, prio_left);
134
+ /// let mut out = select_with_strategy(left, right, prio_left, ExitStrategy::WhenBothFinish );
110
135
///
111
136
/// for _ in 0..100 {
112
137
/// // Whenever we poll out, we will always get `1`.
@@ -121,26 +146,54 @@ pin_project! {
121
146
///
122
147
/// ```rust
123
148
/// # futures::executor::block_on(async {
124
- /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
149
+ /// use futures::stream::{ repeat, select_with_strategy, FusedStream, PollNext, StreamExt, ExitStrategy };
125
150
///
126
- /// let left = repeat(1);
127
- /// let right = repeat(2);
151
+ /// // Finishes when both streams finish
152
+ /// {
153
+ /// let left = repeat(1).take(10);
154
+ /// let right = repeat(2);
128
155
///
129
- /// let rrobin = |last: &mut PollNext| last.toggle();
156
+ /// let rrobin = |last: &mut PollNext| last.toggle();
130
157
///
131
- /// let mut out = select_with_strategy(left, right, rrobin);
158
+ /// let mut out = select_with_strategy(left, right, rrobin, ExitStrategy::WhenBothFinish );
132
159
///
133
- /// for _ in 0..100 {
134
- /// // We should be alternating now.
135
- /// assert_eq!(1, out.select_next_some().await);
136
- /// assert_eq!(2, out.select_next_some().await);
160
+ /// for _ in 0..10 {
161
+ /// // We should be alternating now.
162
+ /// assert_eq!(1, out.select_next_some().await);
163
+ /// assert_eq!(2, out.select_next_some().await);
164
+ /// }
165
+ /// for _ in 0..100 {
166
+ /// // First stream has finished
167
+ /// assert_eq!(2, out.select_next_some().await);
168
+ /// }
169
+ /// assert!(!out.is_terminated());
170
+ /// }
171
+ ///
172
+ /// // Finishes when either stream finishes
173
+ /// {
174
+ /// let left = repeat(1).take(10);
175
+ /// let right = repeat(2);
176
+ ///
177
+ /// let rrobin = |last: &mut PollNext| last.toggle();
178
+ ///
179
+ /// let mut out = select_with_strategy(left, right, rrobin, ExitStrategy::WhenEitherFinish);
180
+ ///
181
+ /// for _ in 0..10 {
182
+ /// // We should be alternating now.
183
+ /// assert_eq!(1, out.select_next_some().await);
184
+ /// assert_eq!(2, out.select_next_some().await);
185
+ /// }
186
+ /// assert_eq!(None, out.next().await);
187
+ /// assert!(out.is_terminated());
137
188
/// }
138
189
/// # });
139
190
/// ```
191
+ ///
140
192
pub fn select_with_strategy < St1 , St2 , Clos , State > (
141
193
stream1 : St1 ,
142
194
stream2 : St2 ,
143
195
which : Clos ,
196
+ exit_strategy : ExitStrategy ,
144
197
) -> SelectWithStrategy < St1 , St2 , Clos , State >
145
198
where
146
199
St1 : Stream ,
@@ -154,6 +207,7 @@ where
154
207
state : Default :: default ( ) ,
155
208
internal_state : InternalState :: Start ,
156
209
clos : which,
210
+ exit_strategy,
157
211
} )
158
212
}
159
213
@@ -199,10 +253,7 @@ where
199
253
Clos : FnMut ( & mut State ) -> PollNext ,
200
254
{
201
255
fn is_terminated ( & self ) -> bool {
202
- match self . internal_state {
203
- InternalState :: BothFinished => true ,
204
- _ => false ,
205
- }
256
+ self . exit_strategy . is_finished ( self . internal_state )
206
257
}
207
258
}
208
259
@@ -227,6 +278,7 @@ fn poll_inner<St1, St2, Clos, State>(
227
278
select : & mut SelectWithStrategyProj < ' _ , St1 , St2 , Clos , State > ,
228
279
side : PollNext ,
229
280
cx : & mut Context < ' _ > ,
281
+ exit_strat : ExitStrategy ,
230
282
) -> Poll < Option < St1 :: Item > >
231
283
where
232
284
St1 : Stream ,
@@ -236,6 +288,9 @@ where
236
288
Poll :: Ready ( Some ( item) ) => return Poll :: Ready ( Some ( item) ) ,
237
289
Poll :: Ready ( None ) => {
238
290
select. internal_state . finish ( side) ;
291
+ if exit_strat. is_finished ( * select. internal_state ) {
292
+ return Poll :: Ready ( None ) ;
293
+ }
239
294
}
240
295
Poll :: Pending => ( ) ,
241
296
} ;
@@ -259,11 +314,16 @@ where
259
314
260
315
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < St1 :: Item > > {
261
316
let mut this = self . project ( ) ;
317
+ let exit_strategy: ExitStrategy = * this. exit_strategy ;
318
+
319
+ if exit_strategy. is_finished ( * this. internal_state ) {
320
+ return Poll :: Ready ( None ) ;
321
+ }
262
322
263
323
match this. internal_state {
264
324
InternalState :: Start => {
265
325
let next_side = ( this. clos ) ( this. state ) ;
266
- poll_inner ( & mut this, next_side, cx)
326
+ poll_inner ( & mut this, next_side, cx, exit_strategy )
267
327
}
268
328
InternalState :: LeftFinished => match this. stream2 . poll_next ( cx) {
269
329
Poll :: Ready ( None ) => {
0 commit comments