Skip to content

Commit f9e28cd

Browse files
author
Stjepan Glavina
committed
Make all executors scoped
1 parent e714ec4 commit f9e28cd

File tree

3 files changed

+83
-34
lines changed

3 files changed

+83
-34
lines changed

examples/priority.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,23 @@ enum Priority {
1818
/// An executor with task priorities.
1919
///
2020
/// Tasks with lower priorities only get polled when there are no tasks with higher priorities.
21-
struct PriorityExecutor {
22-
ex: [Executor; 3],
21+
struct PriorityExecutor<'a> {
22+
ex: [Executor<'a>; 3],
2323
}
2424

25-
impl PriorityExecutor {
25+
impl<'a> PriorityExecutor<'a> {
2626
/// Creates a new executor.
27-
const fn new() -> PriorityExecutor {
27+
const fn new() -> PriorityExecutor<'a> {
2828
PriorityExecutor {
2929
ex: [Executor::new(), Executor::new(), Executor::new()],
3030
}
3131
}
3232

3333
/// Spawns a task with the given priority.
34-
fn spawn<T: Send + 'static>(
34+
fn spawn<T: Send + 'a>(
3535
&self,
3636
priority: Priority,
37-
future: impl Future<Output = T> + Send + 'static,
37+
future: impl Future<Output = T> + Send + 'a,
3838
) -> Task<T> {
3939
self.ex[priority as usize].spawn(future)
4040
}
@@ -59,7 +59,7 @@ impl PriorityExecutor {
5959
}
6060

6161
fn main() {
62-
static EX: PriorityExecutor = PriorityExecutor::new();
62+
static EX: PriorityExecutor<'_> = PriorityExecutor::new();
6363

6464
// Spawn a thread running the executor forever.
6565
thread::spawn(|| future::block_on(EX.run()));

src/lib.rs

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,21 @@ pub use async_task::Task;
6161
/// }));
6262
/// ```
6363
#[derive(Debug)]
64-
pub struct Executor {
64+
pub struct Executor<'a> {
65+
/// The executor state.
6566
state: once_cell::sync::OnceCell<Arc<State>>,
67+
68+
/// Makes the `'a` lifetime invariant.
69+
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
6670
}
6771

68-
impl UnwindSafe for Executor {}
69-
impl RefUnwindSafe for Executor {}
72+
unsafe impl Send for Executor<'_> {}
73+
unsafe impl Sync for Executor<'_> {}
7074

71-
impl Executor {
75+
impl UnwindSafe for Executor<'_> {}
76+
impl RefUnwindSafe for Executor<'_> {}
77+
78+
impl<'a> Executor<'a> {
7279
/// Creates a new executor.
7380
///
7481
/// # Examples
@@ -78,9 +85,10 @@ impl Executor {
7885
///
7986
/// let ex = Executor::new();
8087
/// ```
81-
pub const fn new() -> Executor {
88+
pub const fn new() -> Executor<'a> {
8289
Executor {
8390
state: once_cell::sync::OnceCell::new(),
91+
_marker: PhantomData,
8492
}
8593
}
8694

@@ -97,10 +105,7 @@ impl Executor {
97105
/// println!("Hello world");
98106
/// });
99107
/// ```
100-
pub fn spawn<T: Send + 'static>(
101-
&self,
102-
future: impl Future<Output = T> + Send + 'static,
103-
) -> Task<T> {
108+
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
104109
let mut active = self.state().active.lock().unwrap();
105110

106111
// Remove the task from the set of active tasks when the future finishes.
@@ -112,7 +117,7 @@ impl Executor {
112117
};
113118

114119
// Create the task and register it in the set of active tasks.
115-
let (runnable, task) = async_task::spawn(future, self.schedule());
120+
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
116121
active.insert(runnable.waker());
117122

118123
runnable.schedule();
@@ -226,7 +231,7 @@ impl Executor {
226231
}
227232
}
228233

229-
impl Drop for Executor {
234+
impl Drop for Executor<'_> {
230235
fn drop(&mut self) {
231236
if let Some(state) = self.state.get() {
232237
let mut active = state.active.lock().unwrap();
@@ -242,8 +247,8 @@ impl Drop for Executor {
242247
}
243248
}
244249

245-
impl Default for Executor {
246-
fn default() -> Executor {
250+
impl<'a> Default for Executor<'a> {
251+
fn default() -> Executor<'a> {
247252
Executor::new()
248253
}
249254
}
@@ -265,18 +270,18 @@ impl Default for Executor {
265270
/// }));
266271
/// ```
267272
#[derive(Debug)]
268-
pub struct LocalExecutor {
273+
pub struct LocalExecutor<'a> {
269274
/// The inner executor.
270-
inner: once_cell::unsync::OnceCell<Executor>,
275+
inner: once_cell::unsync::OnceCell<Executor<'a>>,
271276

272-
/// Make sure the type is `!Send` and `!Sync`.
277+
/// Makes the type `!Send` and `!Sync`.
273278
_marker: PhantomData<Rc<()>>,
274279
}
275280

276-
impl UnwindSafe for LocalExecutor {}
277-
impl RefUnwindSafe for LocalExecutor {}
281+
impl UnwindSafe for LocalExecutor<'_> {}
282+
impl RefUnwindSafe for LocalExecutor<'_> {}
278283

279-
impl LocalExecutor {
284+
impl<'a> LocalExecutor<'a> {
280285
/// Creates a single-threaded executor.
281286
///
282287
/// # Examples
@@ -286,7 +291,7 @@ impl LocalExecutor {
286291
///
287292
/// let local_ex = LocalExecutor::new();
288293
/// ```
289-
pub const fn new() -> LocalExecutor {
294+
pub const fn new() -> LocalExecutor<'a> {
290295
LocalExecutor {
291296
inner: once_cell::unsync::OnceCell::new(),
292297
_marker: PhantomData,
@@ -306,7 +311,7 @@ impl LocalExecutor {
306311
/// println!("Hello world");
307312
/// });
308313
/// ```
309-
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
314+
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
310315
let mut active = self.inner().state().active.lock().unwrap();
311316

312317
// Remove the task from the set of active tasks when the future finishes.
@@ -318,7 +323,7 @@ impl LocalExecutor {
318323
};
319324

320325
// Create the task and register it in the set of active tasks.
321-
let (runnable, task) = async_task::spawn_local(future, self.schedule());
326+
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
322327
active.insert(runnable.waker());
323328

324329
runnable.schedule();
@@ -399,13 +404,13 @@ impl LocalExecutor {
399404
}
400405

401406
/// Returns a reference to the inner executor.
402-
fn inner(&self) -> &Executor {
407+
fn inner(&self) -> &Executor<'a> {
403408
self.inner.get_or_init(|| Executor::new())
404409
}
405410
}
406411

407-
impl Default for LocalExecutor {
408-
fn default() -> LocalExecutor {
412+
impl<'a> Default for LocalExecutor<'a> {
413+
fn default() -> LocalExecutor<'a> {
409414
LocalExecutor::new()
410415
}
411416
}

tests/drop.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1+
use std::mem;
12
use std::panic::catch_unwind;
23
use std::sync::atomic::{AtomicUsize, Ordering};
34
use std::sync::Mutex;
45
use std::task::{Poll, Waker};
56

6-
use async_executor::Executor;
7+
use async_executor::{Task,Executor};
78
use futures_lite::future;
89
use once_cell::sync::Lazy;
910

1011
#[test]
11-
fn smoke() {
12+
fn executor_cancels_everything() {
1213
static DROP: AtomicUsize = AtomicUsize::new(0);
1314
static WAKER: Lazy<Mutex<Option<Waker>>> = Lazy::new(|| Default::default());
1415

@@ -37,6 +38,49 @@ fn smoke() {
3738
assert_eq!(DROP.load(Ordering::SeqCst), 1);
3839
}
3940

41+
#[test]
42+
fn leaked_executor_leaks_everything() {
43+
static DROP: AtomicUsize = AtomicUsize::new(0);
44+
static WAKER: Lazy<Mutex<Option<Waker>>> = Lazy::new(|| Default::default());
45+
46+
let ex = Executor::new();
47+
48+
let task = ex.spawn(async {
49+
let _guard = CallOnDrop(|| {
50+
DROP.fetch_add(1, Ordering::SeqCst);
51+
});
52+
53+
future::poll_fn(|cx| {
54+
*WAKER.lock().unwrap() = Some(cx.waker().clone());
55+
Poll::Pending::<()>
56+
})
57+
.await;
58+
});
59+
60+
future::block_on(ex.tick());
61+
assert!(WAKER.lock().unwrap().is_some());
62+
assert_eq!(DROP.load(Ordering::SeqCst), 0);
63+
64+
mem::forget(ex);
65+
assert_eq!(DROP.load(Ordering::SeqCst), 0);
66+
67+
assert!(future::block_on(future::poll_once(task)).is_none());
68+
assert_eq!(DROP.load(Ordering::SeqCst), 0);
69+
}
70+
71+
#[test]
72+
fn await_task_after_dropping_executor() {
73+
let s: String = "hello".into();
74+
75+
let ex = Executor::new();
76+
let task: Task<&str> = ex.spawn(async { &*s });
77+
assert!(ex.try_tick());
78+
79+
drop(ex);
80+
assert_eq!(future::block_on(task), "hello");
81+
drop(s);
82+
}
83+
4084
struct CallOnDrop<F: Fn()>(F);
4185

4286
impl<F: Fn()> Drop for CallOnDrop<F> {

0 commit comments

Comments
 (0)