Skip to content

Commit 9d40d9d

Browse files
committed
use ParkThread in LocalPool
- split all state but park into separate struct - various "poll" methods need to call park(Poll) as it might change the readiness of futures (expired timers, IO ready, ...) or spawn new futures (although ParkThread::park(Poll) won't)
1 parent e631733 commit 9d40d9d

File tree

1 file changed

+162
-104
lines changed

1 file changed

+162
-104
lines changed

futures-executor/src/local_pool.rs

Lines changed: 162 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,59 @@
11
use crate::enter;
2+
use crate::park::{Park, ParkDuration};
3+
use crate::park_thread::ParkThread;
24
use futures_core::future::{Future, FutureObj, LocalFutureObj};
35
use futures_core::stream::{Stream};
46
use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError};
5-
use futures_util::task::{waker_ref, ArcWake};
67
use futures_util::stream::FuturesUnordered;
78
use futures_util::stream::StreamExt;
89
use futures_util::pin_mut;
910
use std::cell::{RefCell};
1011
use std::ops::{Deref, DerefMut};
1112
use std::rc::{Rc, Weak};
12-
use std::sync::Arc;
13-
use std::thread::{self, Thread};
13+
14+
// state outside park
15+
#[derive(Debug)]
16+
struct State {
17+
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
18+
incoming: Rc<Incoming>,
19+
}
20+
21+
impl State {
22+
// Make maximal progress on the entire pool of spawned task, returning `Ready`
23+
// if the pool is empty and `Pending` if no further progress can be made.
24+
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
25+
// state for the FuturesUnordered, which will never be used
26+
loop {
27+
let ret = self.poll_pool_once(cx);
28+
29+
// we queued up some new tasks; add them and poll again
30+
if !self.incoming.borrow().is_empty() {
31+
continue;
32+
}
33+
34+
// no queued tasks; we may be done
35+
match ret {
36+
Poll::Pending => return Poll::Pending,
37+
Poll::Ready(None) => return Poll::Ready(()),
38+
_ => {}
39+
}
40+
}
41+
}
42+
43+
// Try make minimal progress on the pool of spawned tasks
44+
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
45+
// empty the incoming queue of newly-spawned tasks
46+
{
47+
let mut incoming = self.incoming.borrow_mut();
48+
for task in incoming.drain(..) {
49+
self.pool.push(task)
50+
}
51+
}
52+
53+
// try to execute the next ready future
54+
self.pool.poll_next_unpin(cx)
55+
}
56+
}
1457

1558
/// A single-threaded task pool for polling futures to completion.
1659
///
@@ -24,9 +67,9 @@ use std::thread::{self, Thread};
2467
/// single-threaded, it supports a special form of task spawning for non-`Send`
2568
/// futures, via [`spawn_local_obj`](futures_core::task::LocalSpawn::spawn_local_obj).
2669
#[derive(Debug)]
27-
pub struct LocalPool {
28-
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
29-
incoming: Rc<Incoming>,
70+
pub struct LocalPool<P = ParkThread> {
71+
park: P,
72+
state: State,
3073
}
3174

3275
/// A handle to a [`LocalPool`](LocalPool) that implements
@@ -38,66 +81,44 @@ pub struct LocalSpawner {
3881

3982
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
4083

41-
pub(crate) struct ThreadNotify {
42-
thread: Thread
43-
}
44-
45-
thread_local! {
46-
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
47-
thread: thread::current(),
48-
});
49-
}
50-
51-
impl ArcWake for ThreadNotify {
52-
fn wake_by_ref(arc_self: &Arc<Self>) {
53-
arc_self.thread.unpark();
84+
impl LocalPool<ParkThread> {
85+
/// Create a new, empty pool of tasks.
86+
pub fn new() -> Self
87+
{
88+
Self::new_with_park(ParkThread::default())
5489
}
5590
}
5691

57-
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
58-
// turn.
59-
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
60-
let _enter = enter()
61-
.expect("cannot execute `LocalPool` executor from within \
62-
another executor");
63-
64-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
65-
let waker = waker_ref(thread_notify);
66-
let mut cx = Context::from_waker(&waker);
67-
loop {
68-
if let Poll::Ready(t) = f(&mut cx) {
69-
return t;
70-
}
71-
thread::park();
72-
}
73-
})
74-
}
75-
76-
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
77-
let _enter = enter()
78-
.expect("cannot execute `LocalPool` executor from within \
79-
another executor");
80-
81-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
82-
let waker = waker_ref(thread_notify);
83-
let mut cx = Context::from_waker(&waker);
84-
f(&mut cx)
85-
})
86-
}
87-
88-
impl LocalPool {
92+
impl<P> LocalPool<P>
93+
where
94+
P: Park,
95+
P::Error: std::fmt::Debug,
96+
{
8997
/// Create a new, empty pool of tasks.
90-
pub fn new() -> LocalPool {
98+
pub fn new_with_park(park: P) -> Self {
9199
LocalPool {
92-
pool: FuturesUnordered::new(),
93-
incoming: Default::default(),
100+
park,
101+
state: State {
102+
pool: FuturesUnordered::new(),
103+
incoming: Default::default(),
104+
},
94105
}
95106
}
96107

108+
/// Returns a reference to the underlying Park instance.
109+
pub fn get_park(&self) -> &P {
110+
&self.park
111+
}
112+
113+
/// Returns a mutable reference to the underlying Park instance.
114+
pub fn get_park_mut(&mut self) -> &mut P {
115+
&mut self.park
116+
}
117+
97118
/// Get a clonable handle to the pool as a [`Spawn`].
98119
pub fn spawner(&self) -> LocalSpawner {
99120
LocalSpawner {
100-
incoming: Rc::downgrade(&self.incoming)
121+
incoming: Rc::downgrade(&self.state.incoming)
101122
}
102123
}
103124

@@ -121,7 +142,7 @@ impl LocalPool {
121142
/// The function will block the calling thread until *all* tasks in the pool
122143
/// are complete, including any spawned while running existing tasks.
123144
pub fn run(&mut self) {
124-
run_executor(|cx| self.poll_pool(cx))
145+
self.run_executor(|state, cx| state.poll_pool(cx))
125146
}
126147

127148
/// Runs all the tasks in the pool until the given future completes.
@@ -149,7 +170,7 @@ impl LocalPool {
149170
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
150171
pin_mut!(future);
151172

152-
run_executor(|cx| {
173+
self.run_executor(|state, cx| {
153174
{
154175
// if our main task is done, so are we
155176
let result = future.as_mut().poll(cx);
@@ -158,7 +179,7 @@ impl LocalPool {
158179
}
159180
}
160181

161-
let _ = self.poll_pool(cx);
182+
let _ = state.poll_pool(cx);
162183
Poll::Pending
163184
})
164185
}
@@ -192,15 +213,35 @@ impl LocalPool {
192213
/// further use of one of the pool's run or poll methods.
193214
/// Though only one task will be completed, progress may be made on multiple tasks.
194215
pub fn try_run_one(&mut self) -> bool {
195-
poll_executor(|ctx| {
196-
let ret = self.poll_pool_once(ctx);
216+
let mut enter = enter()
217+
.expect("cannot execute `LocalPool` executor from within \
218+
another executor");
219+
220+
{
221+
// first round
222+
let waker = self.park.waker();
223+
let mut cx = Context::from_waker(&waker);
224+
225+
let result = self.state.poll_pool_once(&mut cx);
197226

198227
// return if we really have executed a future
199-
match ret {
200-
Poll::Ready(Some(_)) => true,
201-
_ => false
228+
if let Poll::Ready(Some(_)) = result {
229+
return true;
202230
}
203-
})
231+
}
232+
233+
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");
234+
235+
let waker = self.park.waker();
236+
let mut cx = Context::from_waker(&waker);
237+
238+
let result = self.state.poll_pool_once(&mut cx);
239+
240+
// return whether we really have executed a future
241+
match result {
242+
Poll::Ready(Some(_)) => true,
243+
_ => false
244+
}
204245
}
205246

206247
/// Runs all tasks in the pool and returns if no more progress can be made
@@ -229,58 +270,61 @@ impl LocalPool {
229270
/// of the pool's run or poll methods. While the function is running, all tasks
230271
/// in the pool will try to make progress.
231272
pub fn run_until_stalled(&mut self) {
232-
poll_executor(|ctx| {
233-
loop {
234-
let result = self.poll_pool_once(ctx);
235-
236-
// if there are no more ready futures exit
237-
match result {
238-
Poll::Pending | Poll::Ready(None) => return,
239-
_ => continue
240-
}
241-
}
242-
})
243-
}
273+
let mut enter = enter()
274+
.expect("cannot execute `LocalPool` executor from within \
275+
another executor");
276+
277+
{
278+
// first round
279+
let waker = self.park.waker();
280+
let mut cx = Context::from_waker(&waker);
281+
282+
// ignore first result, we need to run park anyway and try again
283+
let _ = self.state.poll_pool_once(&mut cx);
284+
}
244285

245-
// Make maximal progress on the entire pool of spawned task, returning `Ready`
246-
// if the pool is empty and `Pending` if no further progress can be made.
247-
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
248-
// state for the FuturesUnordered, which will never be used
249286
loop {
250-
let ret = self.poll_pool_once(cx);
287+
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");
251288

252-
// we queued up some new tasks; add them and poll again
253-
if !self.incoming.borrow().is_empty() {
254-
continue;
255-
}
289+
let waker = self.park.waker();
290+
let mut cx = Context::from_waker(&waker);
256291

257-
// no queued tasks; we may be done
258-
match ret {
259-
Poll::Pending => return Poll::Pending,
260-
Poll::Ready(None) => return Poll::Ready(()),
261-
_ => {}
292+
let result = self.state.poll_pool_once(&mut cx);
293+
294+
// if there are no more ready futures exit
295+
match result {
296+
Poll::Pending | Poll::Ready(None) => return,
297+
_ => continue
262298
}
263299
}
264300
}
265301

266-
// Try make minimal progress on the pool of spawned tasks
267-
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
268-
// empty the incoming queue of newly-spawned tasks
269-
{
270-
let mut incoming = self.incoming.borrow_mut();
271-
for task in incoming.drain(..) {
272-
self.pool.push(task)
302+
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
303+
// turn.
304+
fn run_executor<T, F: FnMut(&mut State, &mut Context<'_>) -> Poll<T>>(&mut self, mut f: F) -> T {
305+
let mut enter = enter()
306+
.expect("cannot execute `LocalPool` executor from within \
307+
another executor");
308+
309+
loop {
310+
let waker = self.park.waker();
311+
let mut cx = Context::from_waker(&waker);
312+
313+
if let Poll::Ready(t) = f(&mut self.state, &mut cx) {
314+
return t;
273315
}
316+
self.park.park(&mut enter, ParkDuration::Block).expect("park failed");
274317
}
275-
276-
// try to execute the next ready future
277-
self.pool.poll_next_unpin(cx)
278318
}
279319
}
280320

281-
impl Default for LocalPool {
321+
impl<P> Default for LocalPool<P>
322+
where
323+
P: Default + Park,
324+
P::Error: std::fmt::Debug,
325+
{
282326
fn default() -> Self {
283-
Self::new()
327+
Self::new_with_park(P::default())
284328
}
285329
}
286330

@@ -292,7 +336,21 @@ impl Default for LocalPool {
292336
/// spawned tasks.
293337
pub fn block_on<F: Future>(f: F) -> F::Output {
294338
pin_mut!(f);
295-
run_executor(|cx| f.as_mut().poll(cx))
339+
340+
let mut enter = enter()
341+
.expect("cannot execute `block_on` executor from within \
342+
another executor");
343+
344+
let mut park = ParkThread::new();
345+
let waker = park.waker().clone();
346+
let mut cx = Context::from_waker(&waker);
347+
348+
loop {
349+
if let Poll::Ready(t) = f.as_mut().poll(&mut cx) {
350+
return t;
351+
}
352+
park.park(&mut enter, ParkDuration::Block).unwrap_or_else(|i| match i {});
353+
}
296354
}
297355

298356
/// Turn a stream into a blocking iterator.
@@ -331,7 +389,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
331389
type Item = S::Item;
332390

333391
fn next(&mut self) -> Option<Self::Item> {
334-
LocalPool::new().run_until(self.stream.next())
392+
LocalPool::<ParkThread>::new().run_until(self.stream.next())
335393
}
336394

337395
fn size_hint(&self) -> (usize, Option<usize>) {

0 commit comments

Comments
 (0)