Skip to content

Commit 3f298e6

Browse files
committed
use ParkThread in LocalPool
- split all state but park into separate struct - various "poll" methods need to call park(Poll) as it might change the readiness of futures (expired timers, IO ready, ...) or spawn new futures (although ParkThread::park(Poll) won't)
1 parent bdab64b commit 3f298e6

File tree

1 file changed

+162
-104
lines changed

1 file changed

+162
-104
lines changed

futures-executor/src/local_pool.rs

Lines changed: 162 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,59 @@
11
use crate::enter;
2+
use crate::park::{Park, ParkDuration};
3+
use crate::park_thread::ParkThread;
24
use futures_core::future::{Future, FutureObj, LocalFutureObj};
35
use futures_core::stream::{Stream};
46
use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError};
5-
use futures_util::task::{waker_ref, ArcWake};
67
use futures_util::stream::FuturesUnordered;
78
use futures_util::stream::StreamExt;
89
use futures_util::pin_mut;
910
use std::cell::{RefCell};
1011
use std::ops::{Deref, DerefMut};
1112
use std::rc::{Rc, Weak};
12-
use std::sync::Arc;
13-
use std::thread::{self, Thread};
13+
14+
// state outside park
15+
#[derive(Debug)]
16+
struct State {
17+
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
18+
incoming: Rc<Incoming>,
19+
}
20+
21+
impl State {
22+
// Make maximal progress on the entire pool of spawned task, returning `Ready`
23+
// if the pool is empty and `Pending` if no further progress can be made.
24+
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
25+
// state for the FuturesUnordered, which will never be used
26+
loop {
27+
let ret = self.poll_pool_once(cx);
28+
29+
// we queued up some new tasks; add them and poll again
30+
if !self.incoming.borrow().is_empty() {
31+
continue;
32+
}
33+
34+
// no queued tasks; we may be done
35+
match ret {
36+
Poll::Pending => return Poll::Pending,
37+
Poll::Ready(None) => return Poll::Ready(()),
38+
_ => {}
39+
}
40+
}
41+
}
42+
43+
// Try make minimal progress on the pool of spawned tasks
44+
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
45+
// empty the incoming queue of newly-spawned tasks
46+
{
47+
let mut incoming = self.incoming.borrow_mut();
48+
for task in incoming.drain(..) {
49+
self.pool.push(task)
50+
}
51+
}
52+
53+
// try to execute the next ready future
54+
self.pool.poll_next_unpin(cx)
55+
}
56+
}
1457

1558
/// A single-threaded task pool for polling futures to completion.
1659
///
@@ -24,9 +67,9 @@ use std::thread::{self, Thread};
2467
/// single-threaded, it supports a special form of task spawning for non-`Send`
2568
/// futures, via [`spawn_local_obj`](futures_core::task::LocalSpawn::spawn_local_obj).
2669
#[derive(Debug)]
27-
pub struct LocalPool {
28-
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
29-
incoming: Rc<Incoming>,
70+
pub struct LocalPool<P = ParkThread> {
71+
park: P,
72+
state: State,
3073
}
3174

3275
/// A handle to a [`LocalPool`](LocalPool) that implements
@@ -38,66 +81,44 @@ pub struct LocalSpawner {
3881

3982
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
4083

41-
pub(crate) struct ThreadNotify {
42-
thread: Thread
43-
}
44-
45-
thread_local! {
46-
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
47-
thread: thread::current(),
48-
});
49-
}
50-
51-
impl ArcWake for ThreadNotify {
52-
fn wake_by_ref(arc_self: &Arc<Self>) {
53-
arc_self.thread.unpark();
84+
impl LocalPool<ParkThread> {
85+
/// Create a new, empty pool of tasks.
86+
pub fn new() -> Self
87+
{
88+
Self::new_with_park(ParkThread::default())
5489
}
5590
}
5691

57-
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
58-
// turn.
59-
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
60-
let _enter = enter()
61-
.expect("cannot execute `LocalPool` executor from within \
62-
another executor");
63-
64-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
65-
let waker = waker_ref(thread_notify);
66-
let mut cx = Context::from_waker(&waker);
67-
loop {
68-
if let Poll::Ready(t) = f(&mut cx) {
69-
return t;
70-
}
71-
thread::park();
72-
}
73-
})
74-
}
75-
76-
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
77-
let _enter = enter()
78-
.expect("cannot execute `LocalPool` executor from within \
79-
another executor");
80-
81-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
82-
let waker = waker_ref(thread_notify);
83-
let mut cx = Context::from_waker(&waker);
84-
f(&mut cx)
85-
})
86-
}
87-
88-
impl LocalPool {
92+
impl<P> LocalPool<P>
93+
where
94+
P: Park,
95+
P::Error: std::fmt::Debug,
96+
{
8997
/// Create a new, empty pool of tasks.
90-
pub fn new() -> LocalPool {
98+
pub fn new_with_park(park: P) -> Self {
9199
LocalPool {
92-
pool: FuturesUnordered::new(),
93-
incoming: Default::default(),
100+
park,
101+
state: State {
102+
pool: FuturesUnordered::new(),
103+
incoming: Default::default(),
104+
},
94105
}
95106
}
96107

108+
/// Returns a reference to the underlying Park instance.
109+
pub fn get_park(&self) -> &P {
110+
&self.park
111+
}
112+
113+
/// Returns a mutable reference to the underlying Park instance.
114+
pub fn get_park_mut(&mut self) -> &mut P {
115+
&mut self.park
116+
}
117+
97118
/// Get a clonable handle to the pool as a [`Spawn`].
98119
pub fn spawner(&self) -> LocalSpawner {
99120
LocalSpawner {
100-
incoming: Rc::downgrade(&self.incoming)
121+
incoming: Rc::downgrade(&self.state.incoming)
101122
}
102123
}
103124

@@ -121,7 +142,7 @@ impl LocalPool {
121142
/// The function will block the calling thread until *all* tasks in the pool
122143
/// are complete, including any spawned while running existing tasks.
123144
pub fn run(&mut self) {
124-
run_executor(|cx| self.poll_pool(cx))
145+
self.run_executor(|state, cx| state.poll_pool(cx))
125146
}
126147

127148
/// Runs all the tasks in the pool until the given future completes.
@@ -150,7 +171,7 @@ impl LocalPool {
150171
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
151172
pin_mut!(future);
152173

153-
run_executor(|cx| {
174+
self.run_executor(|state, cx| {
154175
{
155176
// if our main task is done, so are we
156177
let result = future.as_mut().poll(cx);
@@ -159,7 +180,7 @@ impl LocalPool {
159180
}
160181
}
161182

162-
let _ = self.poll_pool(cx);
183+
let _ = state.poll_pool(cx);
163184
Poll::Pending
164185
})
165186
}
@@ -193,15 +214,35 @@ impl LocalPool {
193214
/// further use of one of the pool's run or poll methods.
194215
/// Though only one task will be completed, progress may be made on multiple tasks.
195216
pub fn try_run_one(&mut self) -> bool {
196-
poll_executor(|ctx| {
197-
let ret = self.poll_pool_once(ctx);
217+
let mut enter = enter()
218+
.expect("cannot execute `LocalPool` executor from within \
219+
another executor");
220+
221+
{
222+
// first round
223+
let waker = self.park.waker();
224+
let mut cx = Context::from_waker(&waker);
225+
226+
let result = self.state.poll_pool_once(&mut cx);
198227

199228
// return if we really have executed a future
200-
match ret {
201-
Poll::Ready(Some(_)) => true,
202-
_ => false
229+
if let Poll::Ready(Some(_)) = result {
230+
return true;
203231
}
204-
})
232+
}
233+
234+
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");
235+
236+
let waker = self.park.waker();
237+
let mut cx = Context::from_waker(&waker);
238+
239+
let result = self.state.poll_pool_once(&mut cx);
240+
241+
// return whether we really have executed a future
242+
match result {
243+
Poll::Ready(Some(_)) => true,
244+
_ => false
245+
}
205246
}
206247

207248
/// Runs all tasks in the pool and returns if no more progress can be made
@@ -230,58 +271,61 @@ impl LocalPool {
230271
/// of the pool's run or poll methods. While the function is running, all tasks
231272
/// in the pool will try to make progress.
232273
pub fn run_until_stalled(&mut self) {
233-
poll_executor(|ctx| {
234-
loop {
235-
let result = self.poll_pool_once(ctx);
236-
237-
// if there are no more ready futures exit
238-
match result {
239-
Poll::Pending | Poll::Ready(None) => return,
240-
_ => continue
241-
}
242-
}
243-
})
244-
}
274+
let mut enter = enter()
275+
.expect("cannot execute `LocalPool` executor from within \
276+
another executor");
277+
278+
{
279+
// first round
280+
let waker = self.park.waker();
281+
let mut cx = Context::from_waker(&waker);
282+
283+
// ignore first result, we need to run park anyway and try again
284+
let _ = self.state.poll_pool_once(&mut cx);
285+
}
245286

246-
// Make maximal progress on the entire pool of spawned task, returning `Ready`
247-
// if the pool is empty and `Pending` if no further progress can be made.
248-
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
249-
// state for the FuturesUnordered, which will never be used
250287
loop {
251-
let ret = self.poll_pool_once(cx);
288+
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");
252289

253-
// we queued up some new tasks; add them and poll again
254-
if !self.incoming.borrow().is_empty() {
255-
continue;
256-
}
290+
let waker = self.park.waker();
291+
let mut cx = Context::from_waker(&waker);
257292

258-
// no queued tasks; we may be done
259-
match ret {
260-
Poll::Pending => return Poll::Pending,
261-
Poll::Ready(None) => return Poll::Ready(()),
262-
_ => {}
293+
let result = self.state.poll_pool_once(&mut cx);
294+
295+
// if there are no more ready futures exit
296+
match result {
297+
Poll::Pending | Poll::Ready(None) => return,
298+
_ => continue
263299
}
264300
}
265301
}
266302

267-
// Try make minimal progress on the pool of spawned tasks
268-
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
269-
// empty the incoming queue of newly-spawned tasks
270-
{
271-
let mut incoming = self.incoming.borrow_mut();
272-
for task in incoming.drain(..) {
273-
self.pool.push(task)
303+
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
304+
// turn.
305+
fn run_executor<T, F: FnMut(&mut State, &mut Context<'_>) -> Poll<T>>(&mut self, mut f: F) -> T {
306+
let mut enter = enter()
307+
.expect("cannot execute `LocalPool` executor from within \
308+
another executor");
309+
310+
loop {
311+
let waker = self.park.waker();
312+
let mut cx = Context::from_waker(&waker);
313+
314+
if let Poll::Ready(t) = f(&mut self.state, &mut cx) {
315+
return t;
274316
}
317+
self.park.park(&mut enter, ParkDuration::Block).expect("park failed");
275318
}
276-
277-
// try to execute the next ready future
278-
self.pool.poll_next_unpin(cx)
279319
}
280320
}
281321

282-
impl Default for LocalPool {
322+
impl<P> Default for LocalPool<P>
323+
where
324+
P: Default + Park,
325+
P::Error: std::fmt::Debug,
326+
{
283327
fn default() -> Self {
284-
Self::new()
328+
Self::new_with_park(P::default())
285329
}
286330
}
287331

@@ -293,7 +337,21 @@ impl Default for LocalPool {
293337
/// spawned tasks.
294338
pub fn block_on<F: Future>(f: F) -> F::Output {
295339
pin_mut!(f);
296-
run_executor(|cx| f.as_mut().poll(cx))
340+
341+
let mut enter = enter()
342+
.expect("cannot execute `block_on` executor from within \
343+
another executor");
344+
345+
let mut park = ParkThread::new();
346+
let waker = park.waker().clone();
347+
let mut cx = Context::from_waker(&waker);
348+
349+
loop {
350+
if let Poll::Ready(t) = f.as_mut().poll(&mut cx) {
351+
return t;
352+
}
353+
park.park(&mut enter, ParkDuration::Block).unwrap_or_else(|i| match i {});
354+
}
297355
}
298356

299357
/// Turn a stream into a blocking iterator.
@@ -331,7 +389,7 @@ impl<S: Stream + Unpin> BlockingStream<S> {
331389
impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
332390
type Item = S::Item;
333391
fn next(&mut self) -> Option<Self::Item> {
334-
LocalPool::new().run_until(self.stream.next())
392+
LocalPool::<ParkThread>::new().run_until(self.stream.next())
335393
}
336394
}
337395

0 commit comments

Comments
 (0)