Skip to content

Commit e3df22b

Browse files
committed
Add the ability to spawn futures
For Rust 1.36+ with `std::future::Future`, add a way to spawn tasks with a returned `Future`. The task is immediately queued for the thread pool to execute.
1 parent 678ffbb commit e3df22b

File tree

4 files changed

+154
-1
lines changed

4 files changed

+154
-1
lines changed

rayon-core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,6 @@ path = "tests/simple_panic.rs"
5555
[[test]]
5656
name = "scoped_threadpool"
5757
path = "tests/scoped_threadpool.rs"
58+
59+
[build-dependencies]
60+
autocfg = "0.1.5"

rayon-core/build.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
extern crate autocfg;
2+
13
// We need a build script to use `link = "rayon-core"`. But we're not
24
// *actually* linking to anything, just making sure that we're the only
35
// rayon-core in use.
46
fn main() {
7+
let ac = autocfg::new();
8+
ac.emit_path_cfg("std::future::Future", "has_future");
9+
510
// we don't need to rebuild for anything else
6-
println!("cargo:rerun-if-changed=build.rs");
11+
autocfg::rerun_path("build.rs");
712
}

rayon-core/src/future.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#![allow(missing_docs)]
2+
3+
use crate::ThreadPool;
4+
use crate::{spawn, spawn_fifo};
5+
use crate::{Scope, ScopeFifo};
6+
7+
use std::future::Future;
8+
use std::mem;
9+
use std::pin::Pin;
10+
use std::sync::{Arc, Mutex};
11+
use std::task::{Context, Poll, Waker};
12+
13+
use job::JobResult;
14+
use unwind;
15+
16+
struct RayonFuture<T> {
17+
state: Arc<Mutex<State<T>>>,
18+
}
19+
20+
struct RayonFutureJob<T> {
21+
state: Arc<Mutex<State<T>>>,
22+
}
23+
24+
struct State<T> {
25+
result: JobResult<T>,
26+
waker: Option<Waker>,
27+
}
28+
29+
fn new<T>() -> (RayonFuture<T>, RayonFutureJob<T>) {
30+
let state = Arc::new(Mutex::new(State {
31+
result: JobResult::None,
32+
waker: None,
33+
}));
34+
(
35+
RayonFuture {
36+
state: state.clone(),
37+
},
38+
RayonFutureJob { state },
39+
)
40+
}
41+
42+
impl<T> Future for RayonFuture<T> {
43+
type Output = T;
44+
45+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
46+
let mut guard = self.state.lock().expect("rayon future lock");
47+
match mem::replace(&mut guard.result, JobResult::None) {
48+
JobResult::None => {
49+
guard.waker = Some(cx.waker().clone());
50+
Poll::Pending
51+
}
52+
JobResult::Ok(x) => Poll::Ready(x),
53+
JobResult::Panic(p) => {
54+
drop(guard); // don't poison the lock
55+
unwind::resume_unwinding(p);
56+
}
57+
}
58+
}
59+
}
60+
61+
impl<T> RayonFutureJob<T> {
62+
fn execute(self, func: impl FnOnce() -> T) {
63+
let result = unwind::halt_unwinding(func);
64+
let mut guard = self.state.lock().expect("rayon future lock");
65+
guard.result = match result {
66+
Ok(x) => JobResult::Ok(x),
67+
Err(p) => JobResult::Panic(p),
68+
};
69+
if let Some(waker) = guard.waker.take() {
70+
waker.wake();
71+
}
72+
}
73+
}
74+
75+
pub fn spawn_future<F, T>(func: F) -> impl Future<Output = T>
76+
where
77+
F: FnOnce() -> T + Send + 'static,
78+
T: Send + 'static,
79+
{
80+
let (future, job) = new();
81+
spawn(move || job.execute(func));
82+
future
83+
}
84+
85+
pub fn spawn_fifo_future<F, T>(func: F) -> impl Future<Output = T>
86+
where
87+
F: FnOnce() -> T + Send + 'static,
88+
T: Send + 'static,
89+
{
90+
let (future, job) = new();
91+
spawn_fifo(move || job.execute(func));
92+
future
93+
}
94+
95+
impl ThreadPool {
96+
pub fn spawn_future<F, T>(&self, func: F) -> impl Future<Output = T>
97+
where
98+
F: FnOnce() -> T + Send + 'static,
99+
T: Send + 'static,
100+
{
101+
let (future, job) = new();
102+
self.spawn(move || job.execute(func));
103+
future
104+
}
105+
106+
pub fn spawn_fifo_future<F, T>(&self, func: F) -> impl Future<Output = T>
107+
where
108+
F: FnOnce() -> T + Send + 'static,
109+
T: Send + 'static,
110+
{
111+
let (future, job) = new();
112+
self.spawn_fifo(move || job.execute(func));
113+
future
114+
}
115+
}
116+
117+
impl<'scope> Scope<'scope> {
118+
pub fn spawn_future<F, T>(&self, func: F) -> impl Future<Output = T>
119+
where
120+
F: FnOnce(&Self) -> T + Send + 'scope,
121+
T: Send + 'scope,
122+
{
123+
let (future, job) = new();
124+
self.spawn(|scope| job.execute(move || func(scope)));
125+
future
126+
}
127+
}
128+
129+
impl<'scope> ScopeFifo<'scope> {
130+
pub fn spawn_fifo_future<F, T>(&self, func: F) -> impl Future<Output = T>
131+
where
132+
F: FnOnce(&Self) -> T + Send + 'scope,
133+
T: Send + 'scope,
134+
{
135+
let (future, job) = new();
136+
self.spawn_fifo(|scope| job.execute(move || func(scope)));
137+
future
138+
}
139+
}

rayon-core/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ mod thread_pool;
6161
mod unwind;
6262
mod util;
6363

64+
#[cfg(has_future)]
65+
mod future;
66+
6467
mod compile_fail;
6568
mod test;
6669

@@ -75,6 +78,9 @@ pub use thread_pool::current_thread_has_pending_tasks;
7578
pub use thread_pool::current_thread_index;
7679
pub use thread_pool::ThreadPool;
7780

81+
#[cfg(has_future)]
82+
pub use future::{spawn_future, spawn_fifo_future};
83+
7884
use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
7985

8086
/// Returns the number of threads in the current registry. If this

0 commit comments

Comments
 (0)