Skip to content

Commit ebc7878

Browse files
committed
use ParkThread in LocalPool
- split all state but park into separate struct - various "poll" methods need to call park(Poll) as it might change the readiness of futures (expired timers, IO ready, ...) or spawn new futures (although ParkThread::park(Poll) won't)
1 parent 3b59167 commit ebc7878

File tree

1 file changed

+162
-104
lines changed

1 file changed

+162
-104
lines changed

futures-executor/src/local_pool.rs

Lines changed: 162 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,60 @@
11
use crate::enter;
2+
use crate::park::{Park, ParkDuration};
3+
use crate::park_thread::ParkThread;
24
use futures_core::future::{Future, FutureObj, LocalFutureObj};
35
use futures_core::stream::{Stream};
46
use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError};
5-
use futures_util::task::{waker_ref, ArcWake};
67
use futures_util::stream::FuturesUnordered;
78
use futures_util::stream::StreamExt;
89
use pin_utils::pin_mut;
910
use std::cell::{RefCell};
1011
use std::ops::{Deref, DerefMut};
1112
use std::prelude::v1::*;
1213
use std::rc::{Rc, Weak};
13-
use std::sync::Arc;
14-
use std::thread::{self, Thread};
14+
15+
// state outside park
16+
#[derive(Debug)]
17+
struct State {
18+
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
19+
incoming: Rc<Incoming>,
20+
}
21+
22+
impl State {
23+
// Make maximal progress on the entire pool of spawned task, returning `Ready`
24+
// if the pool is empty and `Pending` if no further progress can be made.
25+
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
26+
// state for the FuturesUnordered, which will never be used
27+
loop {
28+
let ret = self.poll_pool_once(cx);
29+
30+
// we queued up some new tasks; add them and poll again
31+
if !self.incoming.borrow().is_empty() {
32+
continue;
33+
}
34+
35+
// no queued tasks; we may be done
36+
match ret {
37+
Poll::Pending => return Poll::Pending,
38+
Poll::Ready(None) => return Poll::Ready(()),
39+
_ => {}
40+
}
41+
}
42+
}
43+
44+
// Try make minimal progress on the pool of spawned tasks
45+
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
46+
// empty the incoming queue of newly-spawned tasks
47+
{
48+
let mut incoming = self.incoming.borrow_mut();
49+
for task in incoming.drain(..) {
50+
self.pool.push(task)
51+
}
52+
}
53+
54+
// try to execute the next ready future
55+
self.pool.poll_next_unpin(cx)
56+
}
57+
}
1558

1659
/// A single-threaded task pool for polling futures to completion.
1760
///
@@ -25,9 +68,9 @@ use std::thread::{self, Thread};
2568
/// single-threaded, it supports a special form of task spawning for non-`Send`
2669
/// futures, via [`spawn_local_obj`](futures_core::task::LocalSpawn::spawn_local_obj).
2770
#[derive(Debug)]
28-
pub struct LocalPool {
29-
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
30-
incoming: Rc<Incoming>,
71+
pub struct LocalPool<P = ParkThread> {
72+
park: P,
73+
state: State,
3174
}
3275

3376
/// A handle to a [`LocalPool`](LocalPool) that implements
@@ -39,66 +82,44 @@ pub struct LocalSpawner {
3982

4083
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
4184

42-
pub(crate) struct ThreadNotify {
43-
thread: Thread
44-
}
45-
46-
thread_local! {
47-
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
48-
thread: thread::current(),
49-
});
50-
}
51-
52-
impl ArcWake for ThreadNotify {
53-
fn wake_by_ref(arc_self: &Arc<Self>) {
54-
arc_self.thread.unpark();
85+
impl LocalPool<ParkThread> {
86+
/// Create a new, empty pool of tasks.
87+
pub fn new() -> Self
88+
{
89+
Self::new_with_park(ParkThread::default())
5590
}
5691
}
5792

58-
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
59-
// turn.
60-
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
61-
let _enter = enter()
62-
.expect("cannot execute `LocalPool` executor from within \
63-
another executor");
64-
65-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
66-
let waker = waker_ref(thread_notify);
67-
let mut cx = Context::from_waker(&waker);
68-
loop {
69-
if let Poll::Ready(t) = f(&mut cx) {
70-
return t;
71-
}
72-
thread::park();
73-
}
74-
})
75-
}
76-
77-
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
78-
let _enter = enter()
79-
.expect("cannot execute `LocalPool` executor from within \
80-
another executor");
81-
82-
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
83-
let waker = waker_ref(thread_notify);
84-
let mut cx = Context::from_waker(&waker);
85-
f(&mut cx)
86-
})
87-
}
88-
89-
impl LocalPool {
93+
impl<P> LocalPool<P>
94+
where
95+
P: Park,
96+
P::Error: std::fmt::Debug,
97+
{
9098
/// Create a new, empty pool of tasks.
91-
pub fn new() -> LocalPool {
99+
pub fn new_with_park(park: P) -> Self {
92100
LocalPool {
93-
pool: FuturesUnordered::new(),
94-
incoming: Default::default(),
101+
park,
102+
state: State {
103+
pool: FuturesUnordered::new(),
104+
incoming: Default::default(),
105+
},
95106
}
96107
}
97108

109+
/// Returns a reference to the underlying Park instance.
110+
pub fn get_park(&self) -> &P {
111+
&self.park
112+
}
113+
114+
/// Returns a mutable reference to the underlying Park instance.
115+
pub fn get_park_mut(&mut self) -> &mut P {
116+
&mut self.park
117+
}
118+
98119
/// Get a clonable handle to the pool as a [`Spawn`].
99120
pub fn spawner(&self) -> LocalSpawner {
100121
LocalSpawner {
101-
incoming: Rc::downgrade(&self.incoming)
122+
incoming: Rc::downgrade(&self.state.incoming)
102123
}
103124
}
104125

@@ -122,7 +143,7 @@ impl LocalPool {
122143
/// The function will block the calling thread until *all* tasks in the pool
123144
/// are complete, including any spawned while running existing tasks.
124145
pub fn run(&mut self) {
125-
run_executor(|cx| self.poll_pool(cx))
146+
self.run_executor(|state, cx| state.poll_pool(cx))
126147
}
127148

128149
/// Runs all the tasks in the pool until the given future completes.
@@ -151,7 +172,7 @@ impl LocalPool {
151172
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
152173
pin_mut!(future);
153174

154-
run_executor(|cx| {
175+
self.run_executor(|state, cx| {
155176
{
156177
// if our main task is done, so are we
157178
let result = future.as_mut().poll(cx);
@@ -160,7 +181,7 @@ impl LocalPool {
160181
}
161182
}
162183

163-
let _ = self.poll_pool(cx);
184+
let _ = state.poll_pool(cx);
164185
Poll::Pending
165186
})
166187
}
@@ -194,15 +215,35 @@ impl LocalPool {
194215
/// further use of one of the pool's run or poll methods.
195216
/// Though only one task will be completed, progress may be made on multiple tasks.
196217
pub fn try_run_one(&mut self) -> bool {
197-
poll_executor(|ctx| {
198-
let ret = self.poll_pool_once(ctx);
218+
let mut enter = enter()
219+
.expect("cannot execute `LocalPool` executor from within \
220+
another executor");
221+
222+
{
223+
// first round
224+
let waker = self.park.waker();
225+
let mut cx = Context::from_waker(&waker);
226+
227+
let result = self.state.poll_pool_once(&mut cx);
199228

200229
// return if we really have executed a future
201-
match ret {
202-
Poll::Ready(Some(_)) => true,
203-
_ => false
230+
if let Poll::Ready(Some(_)) = result {
231+
return true;
204232
}
205-
})
233+
}
234+
235+
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");
236+
237+
let waker = self.park.waker();
238+
let mut cx = Context::from_waker(&waker);
239+
240+
let result = self.state.poll_pool_once(&mut cx);
241+
242+
// return whether we really have executed a future
243+
match result {
244+
Poll::Ready(Some(_)) => true,
245+
_ => false
246+
}
206247
}
207248

208249
/// Runs all tasks in the pool and returns if no more progress can be made
@@ -231,58 +272,61 @@ impl LocalPool {
231272
/// of the pool's run or poll methods. While the function is running, all tasks
232273
/// in the pool will try to make progress.
233274
pub fn run_until_stalled(&mut self) {
234-
poll_executor(|ctx| {
235-
loop {
236-
let result = self.poll_pool_once(ctx);
237-
238-
// if there are no more ready futures exit
239-
match result {
240-
Poll::Pending | Poll::Ready(None) => return,
241-
_ => continue
242-
}
243-
}
244-
})
245-
}
275+
let mut enter = enter()
276+
.expect("cannot execute `LocalPool` executor from within \
277+
another executor");
278+
279+
{
280+
// first round
281+
let waker = self.park.waker();
282+
let mut cx = Context::from_waker(&waker);
283+
284+
// ignore first result, we need to run park anyway and try again
285+
let _ = self.state.poll_pool_once(&mut cx);
286+
}
246287

247-
// Make maximal progress on the entire pool of spawned task, returning `Ready`
248-
// if the pool is empty and `Pending` if no further progress can be made.
249-
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
250-
// state for the FuturesUnordered, which will never be used
251288
loop {
252-
let ret = self.poll_pool_once(cx);
289+
self.park.park(&mut enter, ParkDuration::Poll).expect("park failed");
253290

254-
// we queued up some new tasks; add them and poll again
255-
if !self.incoming.borrow().is_empty() {
256-
continue;
257-
}
291+
let waker = self.park.waker();
292+
let mut cx = Context::from_waker(&waker);
258293

259-
// no queued tasks; we may be done
260-
match ret {
261-
Poll::Pending => return Poll::Pending,
262-
Poll::Ready(None) => return Poll::Ready(()),
263-
_ => {}
294+
let result = self.state.poll_pool_once(&mut cx);
295+
296+
// if there are no more ready futures exit
297+
match result {
298+
Poll::Pending | Poll::Ready(None) => return,
299+
_ => continue
264300
}
265301
}
266302
}
267303

268-
// Try make minimal progress on the pool of spawned tasks
269-
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
270-
// empty the incoming queue of newly-spawned tasks
271-
{
272-
let mut incoming = self.incoming.borrow_mut();
273-
for task in incoming.drain(..) {
274-
self.pool.push(task)
304+
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
305+
// turn.
306+
fn run_executor<T, F: FnMut(&mut State, &mut Context<'_>) -> Poll<T>>(&mut self, mut f: F) -> T {
307+
let mut enter = enter()
308+
.expect("cannot execute `LocalPool` executor from within \
309+
another executor");
310+
311+
loop {
312+
let waker = self.park.waker();
313+
let mut cx = Context::from_waker(&waker);
314+
315+
if let Poll::Ready(t) = f(&mut self.state, &mut cx) {
316+
return t;
275317
}
318+
self.park.park(&mut enter, ParkDuration::Block).expect("park failed");
276319
}
277-
278-
// try to execute the next ready future
279-
self.pool.poll_next_unpin(cx)
280320
}
281321
}
282322

283-
impl Default for LocalPool {
323+
impl<P> Default for LocalPool<P>
324+
where
325+
P: Default + Park,
326+
P::Error: std::fmt::Debug,
327+
{
284328
fn default() -> Self {
285-
Self::new()
329+
Self::new_with_park(P::default())
286330
}
287331
}
288332

@@ -294,7 +338,21 @@ impl Default for LocalPool {
294338
/// spawned tasks.
295339
pub fn block_on<F: Future>(f: F) -> F::Output {
296340
pin_mut!(f);
297-
run_executor(|cx| f.as_mut().poll(cx))
341+
342+
let mut enter = enter()
343+
.expect("cannot execute `block_on` executor from within \
344+
another executor");
345+
346+
let mut park = ParkThread::new();
347+
let waker = park.waker().clone();
348+
let mut cx = Context::from_waker(&waker);
349+
350+
loop {
351+
if let Poll::Ready(t) = f.as_mut().poll(&mut cx) {
352+
return t;
353+
}
354+
park.park(&mut enter, ParkDuration::Block).unwrap_or_else(|i| match i {});
355+
}
298356
}
299357

300358
/// Turn a stream into a blocking iterator.
@@ -332,7 +390,7 @@ impl<S: Stream + Unpin> BlockingStream<S> {
332390
impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
333391
type Item = S::Item;
334392
fn next(&mut self) -> Option<Self::Item> {
335-
LocalPool::new().run_until(self.stream.next())
393+
LocalPool::<ParkThread>::new().run_until(self.stream.next())
336394
}
337395
}
338396

0 commit comments

Comments
 (0)