Skip to content

Commit 76a3030

Browse files
bors[bot]cuviper
andauthored
1019: Add a fallback when threading is unsupported r=cuviper a=cuviper Fixes rayon-rs#861. Co-authored-by: Josh Stone <cuviper@gmail.com>
2 parents 94c16e0 + 26c249f commit 76a3030

23 files changed

+249
-23
lines changed

.github/workflows/ci.yaml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,24 @@ jobs:
6565
- run: cargo test --verbose --package rayon
6666
- run: cargo test --verbose --package rayon-core
6767

68-
# wasm won't actually work without threading, but it builds
68+
# wasm32-unknown-unknown builds, and even has the runtime fallback for
69+
# unsupported threading, but we don't have an environment to execute in.
70+
# wasm32-wasi can test the fallback by running in wasmtime.
6971
wasm:
7072
name: WebAssembly
7173
runs-on: ubuntu-latest
74+
env:
75+
CARGO_TARGET_WASM32_WASI_RUNNER: /home/runner/.wasmtime/bin/wasmtime
7276
steps:
7377
- uses: actions/checkout@v3
7478
- uses: dtolnay/rust-toolchain@stable
7579
with:
76-
target: wasm32-unknown-unknown
80+
targets: wasm32-unknown-unknown,wasm32-wasi
7781
- run: cargo check --verbose --target wasm32-unknown-unknown
82+
- run: cargo check --verbose --target wasm32-wasi
83+
- run: curl https://wasmtime.dev/install.sh -sSf | bash
84+
- run: cargo test --verbose --target wasm32-wasi --package rayon
85+
- run: cargo test --verbose --target wasm32-wasi --package rayon-core
7886

7987
fmt:
8088
name: Format

rayon-core/src/broadcast/test.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ fn broadcast_global() {
1212
}
1313

1414
#[test]
15+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
1516
fn spawn_broadcast_global() {
1617
let (tx, rx) = crossbeam_channel::unbounded();
1718
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@@ -22,13 +23,15 @@ fn spawn_broadcast_global() {
2223
}
2324

2425
#[test]
26+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
2527
fn broadcast_pool() {
2628
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
2729
let v = pool.broadcast(|ctx| ctx.index());
2830
assert!(v.into_iter().eq(0..7));
2931
}
3032

3133
#[test]
34+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
3235
fn spawn_broadcast_pool() {
3336
let (tx, rx) = crossbeam_channel::unbounded();
3437
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -40,13 +43,15 @@ fn spawn_broadcast_pool() {
4043
}
4144

4245
#[test]
46+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
4347
fn broadcast_self() {
4448
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
4549
let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
4650
assert!(v.into_iter().eq(0..7));
4751
}
4852

4953
#[test]
54+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
5055
fn spawn_broadcast_self() {
5156
let (tx, rx) = crossbeam_channel::unbounded();
5257
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -58,6 +63,7 @@ fn spawn_broadcast_self() {
5863
}
5964

6065
#[test]
66+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
6167
fn broadcast_mutual() {
6268
let count = AtomicUsize::new(0);
6369
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
@@ -73,6 +79,7 @@ fn broadcast_mutual() {
7379
}
7480

7581
#[test]
82+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
7683
fn spawn_broadcast_mutual() {
7784
let (tx, rx) = crossbeam_channel::unbounded();
7885
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
@@ -90,6 +97,7 @@ fn spawn_broadcast_mutual() {
9097
}
9198

9299
#[test]
100+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
93101
fn broadcast_mutual_sleepy() {
94102
let count = AtomicUsize::new(0);
95103
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
@@ -108,6 +116,7 @@ fn broadcast_mutual_sleepy() {
108116
}
109117

110118
#[test]
119+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
111120
fn spawn_broadcast_mutual_sleepy() {
112121
let (tx, rx) = crossbeam_channel::unbounded();
113122
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
@@ -130,6 +139,7 @@ fn spawn_broadcast_mutual_sleepy() {
130139
}
131140

132141
#[test]
142+
#[cfg_attr(not(panic = "unwind"), ignore)]
133143
fn broadcast_panic_one() {
134144
let count = AtomicUsize::new(0);
135145
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -146,6 +156,7 @@ fn broadcast_panic_one() {
146156
}
147157

148158
#[test]
159+
#[cfg_attr(not(panic = "unwind"), ignore)]
149160
fn spawn_broadcast_panic_one() {
150161
let (tx, rx) = crossbeam_channel::unbounded();
151162
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
@@ -166,6 +177,7 @@ fn spawn_broadcast_panic_one() {
166177
}
167178

168179
#[test]
180+
#[cfg_attr(not(panic = "unwind"), ignore)]
169181
fn broadcast_panic_many() {
170182
let count = AtomicUsize::new(0);
171183
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -182,6 +194,7 @@ fn broadcast_panic_many() {
182194
}
183195

184196
#[test]
197+
#[cfg_attr(not(panic = "unwind"), ignore)]
185198
fn spawn_broadcast_panic_many() {
186199
let (tx, rx) = crossbeam_channel::unbounded();
187200
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
@@ -202,6 +215,7 @@ fn spawn_broadcast_panic_many() {
202215
}
203216

204217
#[test]
218+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
205219
fn broadcast_sleep_race() {
206220
let test_duration = time::Duration::from_secs(1);
207221
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -214,3 +228,35 @@ fn broadcast_sleep_race() {
214228
});
215229
}
216230
}
231+
232+
#[test]
233+
fn broadcast_after_spawn_broadcast() {
234+
let (tx, rx) = crossbeam_channel::unbounded();
235+
236+
// Queue a non-blocking spawn_broadcast.
237+
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
238+
239+
// This blocking broadcast runs after all prior broadcasts.
240+
crate::broadcast(|_| {});
241+
242+
// The spawn_broadcast **must** have run by now on all threads.
243+
let mut v: Vec<_> = rx.try_iter().collect();
244+
v.sort_unstable();
245+
assert!(v.into_iter().eq(0..crate::current_num_threads()));
246+
}
247+
248+
#[test]
249+
fn broadcast_after_spawn() {
250+
let (tx, rx) = crossbeam_channel::bounded(1);
251+
252+
// Queue a regular spawn on a thread-local deque.
253+
crate::registry::in_worker(move |_, _| {
254+
crate::spawn(move || tx.send(22).unwrap());
255+
});
256+
257+
// Broadcast runs after the local deque is empty.
258+
crate::broadcast(|_| {});
259+
260+
// The spawn **must** have run by now.
261+
assert_eq!(22, rx.try_recv().unwrap());
262+
}

rayon-core/src/join/test.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ fn sort() {
4747
}
4848

4949
#[test]
50+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
5051
fn sort_in_pool() {
5152
let rng = seeded_rng();
5253
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
@@ -77,6 +78,7 @@ fn panic_propagate_both() {
7778
}
7879

7980
#[test]
81+
#[cfg_attr(not(panic = "unwind"), ignore)]
8082
fn panic_b_still_executes() {
8183
let mut x = false;
8284
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
@@ -86,6 +88,7 @@ fn panic_b_still_executes() {
8688
}
8789

8890
#[test]
91+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
8992
fn join_context_both() {
9093
// If we're not in a pool, both should be marked stolen as they're injected.
9194
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
@@ -94,6 +97,7 @@ fn join_context_both() {
9497
}
9598

9699
#[test]
100+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
97101
fn join_context_neither() {
98102
// If we're already in a 1-thread pool, neither job should be stolen.
99103
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -104,6 +108,7 @@ fn join_context_neither() {
104108
}
105109

106110
#[test]
111+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
107112
fn join_context_second() {
108113
use std::sync::Barrier;
109114

@@ -127,6 +132,7 @@ fn join_context_second() {
127132
}
128133

129134
#[test]
135+
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
130136
fn join_counter_overflow() {
131137
const MAX: u32 = 500_000;
132138

rayon-core/src/lib.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,23 @@
2626
//! [`join()`]: struct.ThreadPool.html#method.join
2727
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
2828
//!
29-
//! ## Restricting multiple versions
29+
//! # Global fallback when threading is unsupported
30+
//!
31+
//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
32+
//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
33+
//! targets are notable examples of this. Rather than panicking on the unsupported error when
34+
//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
35+
//!
36+
//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
37+
//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
38+
//! there is no other thread to share the work. However, since the pool is not running independent
39+
//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
40+
//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41+
//! anything like thread preemption or `async` task switching.
42+
//!
43+
//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
44+
//!
45+
//! # Restricting multiple versions
3046
//!
3147
//! In order to ensure proper coordination between threadpools, and especially
3248
//! to make sure there's only one global threadpool, `rayon-core` is actively
@@ -707,6 +723,10 @@ impl ThreadPoolBuildError {
707723
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
708724
ThreadPoolBuildError { kind }
709725
}
726+
727+
fn is_unsupported(&self) -> bool {
728+
matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
729+
}
710730
}
711731

712732
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =

rayon-core/src/registry.rs

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl ThreadBuilder {
5050
/// Executes the main loop for this thread. This will not return until the
5151
/// thread pool is dropped.
5252
pub fn run(self) {
53-
unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
53+
unsafe { main_loop(self) }
5454
}
5555
}
5656

@@ -164,7 +164,7 @@ static THE_REGISTRY_SET: Once = Once::new();
164164
/// initialization has not already occurred, use the default
165165
/// configuration.
166166
pub(super) fn global_registry() -> &'static Arc<Registry> {
167-
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
167+
set_global_registry(default_global_registry)
168168
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
169169
.expect("The global thread pool has not been initialized.")
170170
}
@@ -198,6 +198,46 @@ where
198198
result
199199
}
200200

201+
fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
202+
let result = Registry::new(ThreadPoolBuilder::new());
203+
204+
// If we're running in an environment that doesn't support threads at all, we can fall back to
205+
// using the current thread alone. This is crude, and probably won't work for non-blocking
206+
// calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
207+
//
208+
// Notably, this allows current WebAssembly targets to work even though their threading support
209+
// is stubbed out, and we won't have to change anything if they do add real threading.
210+
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
211+
if unsupported && WorkerThread::current().is_null() {
212+
let builder = ThreadPoolBuilder::new()
213+
.num_threads(1)
214+
.spawn_handler(|thread| {
215+
// Rather than starting a new thread, we're just taking over the current thread
216+
// *without* running the main loop, so we can still return from here.
217+
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
218+
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
219+
let registry = &*worker_thread.registry;
220+
let index = worker_thread.index;
221+
222+
unsafe {
223+
WorkerThread::set_current(worker_thread);
224+
225+
// let registry know we are ready to do work
226+
Latch::set(&registry.thread_infos[index].primed);
227+
}
228+
229+
Ok(())
230+
});
231+
232+
let fallback_result = Registry::new(builder);
233+
if fallback_result.is_ok() {
234+
return fallback_result;
235+
}
236+
}
237+
238+
result
239+
}
240+
201241
struct Terminator<'a>(&'a Arc<Registry>);
202242

203243
impl<'a> Drop for Terminator<'a> {
@@ -655,6 +695,19 @@ thread_local! {
655695
static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
656696
}
657697

698+
impl From<ThreadBuilder> for WorkerThread {
699+
fn from(thread: ThreadBuilder) -> Self {
700+
Self {
701+
worker: thread.worker,
702+
stealer: thread.stealer,
703+
fifo: JobFifo::new(),
704+
index: thread.index,
705+
rng: XorShift64Star::new(),
706+
registry: thread.registry,
707+
}
708+
}
709+
}
710+
658711
impl Drop for WorkerThread {
659712
fn drop(&mut self) {
660713
// Undo `set_current`
@@ -851,22 +904,11 @@ impl WorkerThread {
851904

852905
/// ////////////////////////////////////////////////////////////////////////
853906
854-
unsafe fn main_loop(
855-
worker: Worker<JobRef>,
856-
stealer: Stealer<JobRef>,
857-
registry: Arc<Registry>,
858-
index: usize,
859-
) {
860-
let worker_thread = &WorkerThread {
861-
worker,
862-
stealer,
863-
fifo: JobFifo::new(),
864-
index,
865-
rng: XorShift64Star::new(),
866-
registry,
867-
};
907+
unsafe fn main_loop(thread: ThreadBuilder) {
908+
let worker_thread = &WorkerThread::from(thread);
868909
WorkerThread::set_current(worker_thread);
869910
let registry = &*worker_thread.registry;
911+
let index = worker_thread.index;
870912

871913
// let registry know we are ready to do work
872914
Latch::set(&registry.thread_infos[index].primed);
@@ -924,7 +966,7 @@ where
924966
// invalidated until we return.
925967
op(&*owner_thread, false)
926968
} else {
927-
global_registry().in_worker_cold(op)
969+
global_registry().in_worker(op)
928970
}
929971
}
930972
}

0 commit comments

Comments
 (0)