From 9d75276e4b0d7802e68f7cce4dbce4c6fe6a9133 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 14:54:11 -0700 Subject: [PATCH 01/13] Add test for nested threadpool deadlock condition. --- rayon-core/src/test.rs | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index 25b8487f7..b2b3e1e08 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -2,7 +2,9 @@ use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Barrier}; +use std::sync::{Arc, Barrier, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] @@ -198,3 +200,41 @@ fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { Ok(()) } + + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_thread_pools_deadlock() { + let global_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let lock_pool = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap()); + let mutex = Arc::new(Mutex::new(0)); + let start_time = Instant::now(); + + global_pool.scope(|s| { + for i in 0..5 { + let mutex = mutex.clone(); + let lock_pool = lock_pool.clone(); + s.spawn(move |_| { + let mut acquired = false; + while start_time.elapsed() < Duration::from_secs(2) { + if let Ok(mut value) = mutex.try_lock() { + println!("Thread {i} acquired the mutex"); + lock_pool.scope(|lock_s| { + lock_s.spawn(|_| { + thread::sleep(Duration::from_millis(100)); + }); + }); + *value += 1; + acquired = true; + break; + } + // Sleep for a short duration to avoid busy waiting. + thread::sleep(Duration::from_millis(10)); + } + if !acquired { + panic!("Thread {i} failed to acquire the mutex within 2 seconds."); + } + }); + } + }); +} From 87a1bf26e269c5ff18dad410a90f0c20785cddab Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 15:43:11 -0700 Subject: [PATCH 02/13] Add full_blocking feature. --- rayon-core/src/lib.rs | 30 ++++++++++++++++++++++++++++++ rayon-core/src/registry.rs | 34 ++++++++++++++++++++++++++++++++-- rayon-core/src/test.rs | 2 +- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 03456b3ee..8d10c0457 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -200,6 +200,12 @@ pub struct ThreadPoolBuilder { /// Closure invoked on worker thread exit. exit_handler: Option>, + /// Affects the behavior of nested thread pools. If true, jobs from a parent thread pool will + /// block until jobs in this thread pool are completed. If false, jobs from a parent thread + /// pool are free to complete other jobs while jobs are processed in this thread pool. This is + /// useful for avoiding deadlock when using mutexes. Default is false. + full_blocking: bool, + /// Closure invoked to spawn threads. spawn_handler: S, @@ -245,6 +251,7 @@ impl Default for ThreadPoolBuilder { exit_handler: None, spawn_handler: DefaultSpawn, breadth_first: false, + full_blocking: false, } } } @@ -455,6 +462,7 @@ impl ThreadPoolBuilder { start_handler: self.start_handler, exit_handler: self.exit_handler, breadth_first: self.breadth_first, + full_blocking: self.full_blocking, } } @@ -672,6 +680,26 @@ impl ThreadPoolBuilder { self.exit_handler = Some(Box::new(exit_handler)); self } + + /// Changes the behavior of nested thread pools. + /// + /// + /// If false, when a job is created on this thread pool by a job running in a separate thread + /// pool, the parent thread to start executing a new job in the parent thread pool. + /// + /// If true, when a job is created on this thread pool by a job running in a separate thread + /// pool, the parent thread will block until the jobs in this thread pool are completed. This + /// is useful for avoiding deadlock when using mutexes. + /// + /// Default is false. + pub fn full_blocking(mut self) -> Self { + self.full_blocking = true; + self + } + + fn get_full_blocking(&self) -> bool { + self.full_blocking + } } #[allow(deprecated)] @@ -811,6 +839,7 @@ impl fmt::Debug for ThreadPoolBuilder { ref exit_handler, spawn_handler: _, ref breadth_first, + ref full_blocking, } = *self; // Just print `Some()` or `None` to the debug @@ -835,6 +864,7 @@ impl fmt::Debug for ThreadPoolBuilder { .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) .field("breadth_first", &breadth_first) + .field("full_blocking", &full_blocking) .finish() } } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index d30f815bd..87b13a173 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -135,6 +135,7 @@ pub(super) struct Registry { panic_handler: Option>, start_handler: Option>, exit_handler: Option>, + full_blocking: bool, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: @@ -267,6 +268,7 @@ impl Registry { panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), + full_blocking: builder.get_full_blocking(), }); // If we return early or panic, make sure to terminate existing threads. @@ -493,7 +495,11 @@ impl Registry { if worker_thread.is_null() { self.in_worker_cold(op) } else if (*worker_thread).registry().id() != self.id() { - self.in_worker_cross(&*worker_thread, op) + if self.full_blocking { + self.in_worker_cross_blocking(&*worker_thread, op) + } else { + self.in_worker_cross(&*worker_thread, op) + } } else { // Perfectly valid to give them a `&T`: this is the // current thread, so we know the data structure won't be @@ -502,7 +508,7 @@ impl Registry { } } } - + #[cold] unsafe fn in_worker_cold(&self, op: OP) -> R where @@ -552,6 +558,30 @@ impl Registry { job.into_result() } + #[cold] + unsafe fn in_worker_cross_blocking(&self, current_thread: &WorkerThread, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); + + LOCK_LATCH.with(|l| { + let job = StackJob::new( + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + LatchRef::new(l), + ); + self.inject(job.as_job_ref()); + job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. + + job.into_result() + }) + } + /// Increments the terminate counter. This increment should be /// balanced by a call to `terminate`, which will decrement. This /// is used when spawning asynchronous work, which needs to diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index b2b3e1e08..50e406129 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -206,7 +206,7 @@ fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_thread_pools_deadlock() { let global_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - let lock_pool = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap()); + let lock_pool = Arc::new(ThreadPoolBuilder::new().full_blocking().num_threads(1).build().unwrap()); let mutex = Arc::new(Mutex::new(0)); let start_time = Instant::now(); From b4761dd9dc8f117907d3a9289f3c94563dee7869 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:01:25 -0700 Subject: [PATCH 03/13] Run cargo fmt. --- rayon-core/src/lib.rs | 2 +- rayon-core/src/registry.rs | 2 +- rayon-core/src/test.rs | 11 ++++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 8d10c0457..1e97203bc 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -696,7 +696,7 @@ impl ThreadPoolBuilder { self.full_blocking = true; self } - + fn get_full_blocking(&self) -> bool { self.full_blocking } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 87b13a173..9787b9af9 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -508,7 +508,7 @@ impl Registry { } } } - + #[cold] unsafe fn in_worker_cold(&self, op: OP) -> R where diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index 50e406129..1a0a61c8e 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -201,12 +201,17 @@ fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { Ok(()) } - #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_thread_pools_deadlock() { - let global_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - let lock_pool = Arc::new(ThreadPoolBuilder::new().full_blocking().num_threads(1).build().unwrap()); + let global_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let lock_pool = Arc::new( + ThreadPoolBuilder::new() + .full_blocking() + .num_threads(1) + .build() + .unwrap(), + ); let mutex = Arc::new(Mutex::new(0)); let start_time = Instant::now(); From 2e0eeaa490d3ff78f8de82635b6c4fc6c20fc835 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:03:22 -0700 Subject: [PATCH 04/13] Remove unnecessary value from test. --- rayon-core/src/test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index 1a0a61c8e..61ea8e426 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -212,7 +212,7 @@ fn nested_thread_pools_deadlock() { .build() .unwrap(), ); - let mutex = Arc::new(Mutex::new(0)); + let mutex = Arc::new(Mutex::new(())); let start_time = Instant::now(); global_pool.scope(|s| { @@ -229,7 +229,6 @@ fn nested_thread_pools_deadlock() { thread::sleep(Duration::from_millis(100)); }); }); - *value += 1; acquired = true; break; } From f8d92bb96883a1fd0765b36dff198e95bd5b7621 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:05:21 -0700 Subject: [PATCH 05/13] Add some comments in the test. --- rayon-core/src/test.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index 61ea8e426..273e6926f 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -205,6 +205,7 @@ fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_thread_pools_deadlock() { let global_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + // The lock thread pool must be full_blocking for this test to pass. let lock_pool = Arc::new( ThreadPoolBuilder::new() .full_blocking() @@ -219,6 +220,8 @@ fn nested_thread_pools_deadlock() { for i in 0..5 { let mutex = mutex.clone(); let lock_pool = lock_pool.clone(); + // Create 5 jobs that try to acquire the lock. + // If all 5 jobs are unable the acquire the lock in 2 seconds, deadlock occurred. s.spawn(move |_| { let mut acquired = false; while start_time.elapsed() < Duration::from_secs(2) { From 3a024deded2b77519a17554e77ae386133db8929 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:17:33 -0700 Subject: [PATCH 06/13] Clean up compiler warnings. --- rayon-core/src/registry.rs | 4 ++-- rayon-core/src/test.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 9787b9af9..7db367224 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -496,7 +496,7 @@ impl Registry { self.in_worker_cold(op) } else if (*worker_thread).registry().id() != self.id() { if self.full_blocking { - self.in_worker_cross_blocking(&*worker_thread, op) + self.in_worker_cross_blocking(op) } else { self.in_worker_cross(&*worker_thread, op) } @@ -559,7 +559,7 @@ impl Registry { } #[cold] - unsafe fn in_worker_cross_blocking(&self, current_thread: &WorkerThread, op: OP) -> R + unsafe fn in_worker_cross_blocking(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index 273e6926f..bca4730d8 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -225,7 +225,7 @@ fn nested_thread_pools_deadlock() { s.spawn(move |_| { let mut acquired = false; while start_time.elapsed() < Duration::from_secs(2) { - if let Ok(mut value) = mutex.try_lock() { + if let Ok(_guard) = mutex.try_lock() { println!("Thread {i} acquired the mutex"); lock_pool.scope(|lock_s| { lock_s.spawn(|_| { From e7621741644d58c0359b0f64dd97ed426e83dee9 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:19:28 -0700 Subject: [PATCH 07/13] Minor style cleanups. --- rayon-core/src/lib.rs | 1 - rayon-core/src/test.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 1e97203bc..99d0b4346 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -683,7 +683,6 @@ impl ThreadPoolBuilder { /// Changes the behavior of nested thread pools. /// - /// /// If false, when a job is created on this thread pool by a job running in a separate thread /// pool, the parent thread to start executing a new job in the parent thread pool. /// diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index bca4730d8..e91685e86 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -235,7 +235,6 @@ fn nested_thread_pools_deadlock() { acquired = true; break; } - // Sleep for a short duration to avoid busy waiting. thread::sleep(Duration::from_millis(10)); } if !acquired { From f11ccb5b45b9930408be8ce5b208b18742a0ac34 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:28:33 -0700 Subject: [PATCH 08/13] Fix documentation grammar. --- rayon-core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 99d0b4346..1d04ece02 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -684,7 +684,7 @@ impl ThreadPoolBuilder { /// Changes the behavior of nested thread pools. /// /// If false, when a job is created on this thread pool by a job running in a separate thread - /// pool, the parent thread to start executing a new job in the parent thread pool. + /// pool, the parent thread is allowed to start executing a new job in the parent thread pool. /// /// If true, when a job is created on this thread pool by a job running in a separate thread /// pool, the parent thread will block until the jobs in this thread pool are completed. This From cf1493b778a644c146570e5d90d8c2a8c4ca7d51 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:30:50 -0700 Subject: [PATCH 09/13] Improve documentation. --- rayon-core/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 1d04ece02..811bf4ad8 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -200,10 +200,7 @@ pub struct ThreadPoolBuilder { /// Closure invoked on worker thread exit. exit_handler: Option>, - /// Affects the behavior of nested thread pools. If true, jobs from a parent thread pool will - /// block until jobs in this thread pool are completed. If false, jobs from a parent thread - /// pool are free to complete other jobs while jobs are processed in this thread pool. This is - /// useful for avoiding deadlock when using mutexes. Default is false. + /// Affects the blocking/work-stealing behavior when using nested thread pools. full_blocking: bool, /// Closure invoked to spawn threads. From 8b66e6349c1fd2e3f983b6739a40578f2bd02df8 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:53:48 -0700 Subject: [PATCH 10/13] Add a test matching issue 592. --- tests/issue592.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tests/issue592.rs diff --git a/tests/issue592.rs b/tests/issue592.rs new file mode 100644 index 000000000..ec717b5e4 --- /dev/null +++ b/tests/issue592.rs @@ -0,0 +1,31 @@ +use std::sync::{Arc, Mutex}; +use rayon::ThreadPoolBuilder; +use rayon::iter::IntoParallelRefIterator; +use rayon::iter::ParallelIterator; + +fn mutex_and_par(mutex: Arc>>, blocking_pool: &rayon::ThreadPool) { + // Lock the mutex and collect items using the custom thread pool + let vec = mutex.lock().unwrap(); + let result: Vec = blocking_pool.install(|| vec.par_iter().cloned().collect()); + println!("{:?}", result); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn test_issue592() { + // Initialize a collection and a mutex + let collection = vec![1, 2, 3, 4, 5]; + let mutex = Arc::new(Mutex::new(collection)); + + // Create a custom thread pool for the test + let blocking_pool = ThreadPoolBuilder::new().full_blocking().num_threads(4).build().unwrap(); + + // Call the function with the custom thread pool within a parallel iterator's for_each method + let dummy_collection: Vec = (1..=100).collect(); + dummy_collection.par_iter().for_each(|_| { + mutex_and_par(mutex.clone(), &blocking_pool); + }); + + // Additional assertions can be added here to verify the behavior +} + From db5b2de42cfe9c63d0508afd976af8ffe1ff75b9 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 16:55:02 -0700 Subject: [PATCH 11/13] Improve the issue 592 test. --- tests/issue592.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tests/issue592.rs b/tests/issue592.rs index ec717b5e4..8b5409938 100644 --- a/tests/issue592.rs +++ b/tests/issue592.rs @@ -4,28 +4,22 @@ use rayon::iter::IntoParallelRefIterator; use rayon::iter::ParallelIterator; fn mutex_and_par(mutex: Arc>>, blocking_pool: &rayon::ThreadPool) { - // Lock the mutex and collect items using the custom thread pool + // Lock the mutex and collect items using the full blocking thread pool let vec = mutex.lock().unwrap(); let result: Vec = blocking_pool.install(|| vec.par_iter().cloned().collect()); println!("{:?}", result); } #[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn test_issue592() { - // Initialize a collection and a mutex let collection = vec![1, 2, 3, 4, 5]; let mutex = Arc::new(Mutex::new(collection)); - // Create a custom thread pool for the test let blocking_pool = ThreadPoolBuilder::new().full_blocking().num_threads(4).build().unwrap(); - // Call the function with the custom thread pool within a parallel iterator's for_each method let dummy_collection: Vec = (1..=100).collect(); dummy_collection.par_iter().for_each(|_| { mutex_and_par(mutex.clone(), &blocking_pool); }); - - // Additional assertions can be added here to verify the behavior } From 93e8b925aad258498e52fbaa0ec976ead96ce462 Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 17:43:01 -0700 Subject: [PATCH 12/13] Move the test to a more suitable location. --- rayon-core/src/test.rs | 47 +----------------------------- rayon-core/src/thread_pool/test.rs | 45 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index e91685e86..69ff13f69 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -2,9 +2,7 @@ use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Barrier, Mutex}; -use std::thread; -use std::time::{Duration, Instant}; +use std::sync::{Arc, Barrier}; #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] @@ -201,46 +199,3 @@ fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { Ok(()) } -#[test] -#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] -fn nested_thread_pools_deadlock() { - let global_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); - // The lock thread pool must be full_blocking for this test to pass. - let lock_pool = Arc::new( - ThreadPoolBuilder::new() - .full_blocking() - .num_threads(1) - .build() - .unwrap(), - ); - let mutex = Arc::new(Mutex::new(())); - let start_time = Instant::now(); - - global_pool.scope(|s| { - for i in 0..5 { - let mutex = mutex.clone(); - let lock_pool = lock_pool.clone(); - // Create 5 jobs that try to acquire the lock. - // If all 5 jobs are unable the acquire the lock in 2 seconds, deadlock occurred. - s.spawn(move |_| { - let mut acquired = false; - while start_time.elapsed() < Duration::from_secs(2) { - if let Ok(_guard) = mutex.try_lock() { - println!("Thread {i} acquired the mutex"); - lock_pool.scope(|lock_s| { - lock_s.spawn(|_| { - thread::sleep(Duration::from_millis(100)); - }); - }); - acquired = true; - break; - } - thread::sleep(Duration::from_millis(10)); - } - if !acquired { - panic!("Thread {i} failed to acquire the mutex within 2 seconds."); - } - }); - } - }); -} diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 88b36282d..811125aaf 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; @@ -416,3 +418,46 @@ fn yield_local_to_spawn() { // for it to finish if a different thread stole it first. assert_eq!(22, rx.recv().unwrap()); } + +#[test] +fn nested_thread_pools_deadlock() { + let global_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + // The lock thread pool must be full_blocking for this test to pass. + let lock_pool = Arc::new( + ThreadPoolBuilder::new() + .full_blocking() + .num_threads(1) + .build() + .unwrap(), + ); + let mutex = Arc::new(Mutex::new(())); + let start_time = Instant::now(); + + global_pool.scope(|s| { + for i in 0..5 { + let mutex = mutex.clone(); + let lock_pool = lock_pool.clone(); + // Create 5 jobs that try to acquire the lock. + // If all 5 jobs are unable the acquire the lock in 2 seconds, deadlock occurred. + s.spawn(move |_| { + let mut acquired = false; + while start_time.elapsed() < Duration::from_secs(2) { + if let Ok(_guard) = mutex.try_lock() { + println!("Thread {i} acquired the mutex"); + lock_pool.scope(|lock_s| { + lock_s.spawn(|_| { + thread::sleep(Duration::from_millis(100)); + }); + }); + acquired = true; + break; + } + thread::sleep(Duration::from_millis(10)); + } + if !acquired { + panic!("Thread {i} failed to acquire the mutex within 2 seconds."); + } + }); + } + }); +} From 2e70c71dae093323d0070bf873539d7c2c6133ea Mon Sep 17 00:00:00 2001 From: Nathan Huckleberry Date: Tue, 18 Jun 2024 17:43:34 -0700 Subject: [PATCH 13/13] Remove unnecessary changes. --- rayon-core/src/test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rayon-core/src/test.rs b/rayon-core/src/test.rs index 69ff13f69..25b8487f7 100644 --- a/rayon-core/src/test.rs +++ b/rayon-core/src/test.rs @@ -198,4 +198,3 @@ fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { Ok(()) } -