@@ -106,17 +106,9 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
106
106
} )
107
107
}
108
108
109
- fn poll_executor < T , F : FnMut ( & mut Context < ' _ > ) -> T > ( mut f : F ) -> T {
110
- let _enter = enter ( ) . expect (
111
- "cannot execute `LocalPool` executor from within \
112
- another executor",
113
- ) ;
114
-
115
- CURRENT_THREAD_NOTIFY . with ( |thread_notify| {
116
- let waker = waker_ref ( thread_notify) ;
117
- let mut cx = Context :: from_waker ( & waker) ;
118
- f ( & mut cx)
119
- } )
109
+ /// Check for a wakeup, but don't consume it.
110
+ fn woken ( ) -> bool {
111
+ CURRENT_THREAD_NOTIFY . with ( |thread_notify| thread_notify. unparked . load ( Ordering :: SeqCst ) )
120
112
}
121
113
122
114
impl LocalPool {
@@ -212,20 +204,26 @@ impl LocalPool {
212
204
/// further use of one of the pool's run or poll methods.
213
205
/// Though only one task will be completed, progress may be made on multiple tasks.
214
206
pub fn try_run_one ( & mut self ) -> bool {
215
- poll_executor ( |ctx | {
207
+ run_executor ( |cx | {
216
208
loop {
217
- let ret = self . poll_pool_once ( ctx) ;
218
-
219
- // return if we have executed a future
220
- if let Poll :: Ready ( Some ( _) ) = ret {
221
- return true ;
209
+ self . drain_incoming ( ) ;
210
+
211
+ match self . pool . poll_next_unpin ( cx) {
212
+ // Success!
213
+ Poll :: Ready ( Some ( ( ) ) ) => return Poll :: Ready ( true ) ,
214
+ // The pool was empty.
215
+ Poll :: Ready ( None ) => return Poll :: Ready ( false ) ,
216
+ Poll :: Pending => ( ) ,
222
217
}
223
218
224
- // if there are no new incoming futures
225
- // then there is no feature that can make progress
226
- // and we can return without having completed a single future
227
- if self . incoming . borrow ( ) . is_empty ( ) {
228
- return false ;
219
+ if !self . incoming . borrow ( ) . is_empty ( ) {
220
+ // New tasks were spawned; try again.
221
+ continue ;
222
+ } else if woken ( ) {
223
+ // The pool yielded to us, but there's more progress to be made.
224
+ return Poll :: Pending ;
225
+ } else {
226
+ return Poll :: Ready ( false ) ;
229
227
}
230
228
}
231
229
} )
@@ -257,44 +255,52 @@ impl LocalPool {
257
255
/// of the pool's run or poll methods. While the function is running, all tasks
258
256
/// in the pool will try to make progress.
259
257
pub fn run_until_stalled ( & mut self ) {
260
- poll_executor ( |ctx| {
261
- let _ = self . poll_pool ( ctx) ;
258
+ run_executor ( |cx| match self . poll_pool ( cx) {
259
+ // The pool is empty.
260
+ Poll :: Ready ( ( ) ) => Poll :: Ready ( ( ) ) ,
261
+ Poll :: Pending => {
262
+ if woken ( ) {
263
+ Poll :: Pending
264
+ } else {
265
+ // We're stalled for now.
266
+ Poll :: Ready ( ( ) )
267
+ }
268
+ }
262
269
} ) ;
263
270
}
264
271
265
- // Make maximal progress on the entire pool of spawned task, returning `Ready`
266
- // if the pool is empty and `Pending` if no further progress can be made.
272
+ /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
273
+ /// Repeat until either the pool is empty, or it returns `Pending`.
274
+ ///
275
+ /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
276
+ ///
277
+ /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
278
+ /// mean that the pool can't make progress.
267
279
fn poll_pool ( & mut self , cx : & mut Context < ' _ > ) -> Poll < ( ) > {
268
- // state for the FuturesUnordered, which will never be used
269
280
loop {
270
- let ret = self . poll_pool_once ( cx ) ;
281
+ self . drain_incoming ( ) ;
271
282
272
- // we queued up some new tasks; add them and poll again
283
+ let pool_ret = self . pool . poll_next_unpin ( cx) ;
284
+
285
+ // We queued up some new tasks; add them and poll again.
273
286
if !self . incoming . borrow ( ) . is_empty ( ) {
274
287
continue ;
275
288
}
276
289
277
- // no queued tasks; we may be done
278
- match ret {
279
- Poll :: Pending => return Poll :: Pending ,
290
+ match pool_ret {
291
+ Poll :: Ready ( Some ( ( ) ) ) => continue ,
280
292
Poll :: Ready ( None ) => return Poll :: Ready ( ( ) ) ,
281
- _ => { }
293
+ Poll :: Pending => return Poll :: Pending ,
282
294
}
283
295
}
284
296
}
285
297
286
- // Try make minimal progress on the pool of spawned tasks
287
- fn poll_pool_once ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < ( ) > > {
288
- // empty the incoming queue of newly-spawned tasks
289
- {
290
- let mut incoming = self . incoming . borrow_mut ( ) ;
291
- for task in incoming. drain ( ..) {
292
- self . pool . push ( task)
293
- }
298
+ /// Empty the incoming queue of newly-spawned tasks.
299
+ fn drain_incoming ( & mut self ) {
300
+ let mut incoming = self . incoming . borrow_mut ( ) ;
301
+ for task in incoming. drain ( ..) {
302
+ self . pool . push ( task)
294
303
}
295
-
296
- // try to execute the next ready future
297
- self . pool . poll_next_unpin ( cx)
298
304
}
299
305
}
300
306
0 commit comments