Skip to content

Commit 91431c8

Browse files
committed
fix(port_std): replace std::sync::Mutex with a remote-park-friendly implementation on Windows
The test suite was intermittently failing on Windows. The execution stalled in some cases. When I attached a debugger, I observed that two threads (only one of which I speculate was suspended) were attempting to acquire a lock, but there was no thread holding the lock. This suggests `std::sync::Mutex` can deadlock if one of the waiters is suspended by a remote park operation.
1 parent dad16b7 commit 91431c8

File tree

3 files changed

+106
-11
lines changed

3 files changed

+106
-11
lines changed

src/r3_port_std/src/threading_unix.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use std::{
1414
thread,
1515
};
1616

17-
pub use self::thread::ThreadId;
17+
pub use std::{
18+
sync::{Mutex, MutexGuard},
19+
thread::ThreadId,
20+
};
1821

1922
thread_local! {
2023
static EXIT_JMP_BUF: Cell<Option<JmpBuf>> = Cell::new(None);

src/r3_port_std/src/threading_windows.rs

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{
22
mem::MaybeUninit,
33
sync::{
44
atomic::{AtomicIsize, Ordering},
5-
mpsc, Arc, Mutex,
5+
mpsc, Arc,
66
},
77
thread,
88
};
@@ -17,7 +17,7 @@ pub unsafe fn exit_thread() -> ! {
1717
unreachable!();
1818
}
1919

20-
pub use self::thread::ThreadId;
20+
pub use std::thread::ThreadId;
2121

2222
/// [`std::thread::JoinHandle`] with extra functionalities.
2323
#[derive(Debug)]
@@ -208,3 +208,95 @@ fn panic_last_error() -> ! {
208208
errhandlingapi::GetLastError()
209209
});
210210
}
211+
212+
// `std::sync::Mutex` isn't remote-park-friendly
213+
pub use mutex::{Mutex, MutexGuard};
214+
215+
mod mutex {
216+
use std::{
217+
cell::UnsafeCell,
218+
fmt,
219+
sync::atomic::{AtomicBool, Ordering},
220+
};
221+
use winapi::um::{synchapi, winbase::INFINITE};
222+
223+
/// Remote-park-friendly [`std::sync::Mutex`].
224+
pub struct Mutex<T: ?Sized> {
225+
locked: AtomicBool,
226+
data: UnsafeCell<T>,
227+
}
228+
229+
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
230+
231+
pub struct MutexGuard<'a, T: ?Sized> {
232+
data: &'a mut T,
233+
locked: &'a AtomicBool,
234+
}
235+
236+
impl<T> Mutex<T> {
237+
#[inline]
238+
pub const fn new(x: T) -> Self {
239+
Self {
240+
data: UnsafeCell::new(x),
241+
locked: AtomicBool::new(false),
242+
}
243+
}
244+
}
245+
246+
impl<T: ?Sized> Mutex<T> {
247+
#[inline]
248+
pub fn lock(&self) -> Result<MutexGuard<'_, T>, ()> {
249+
while self
250+
.locked
251+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
252+
.is_err()
253+
{
254+
unsafe {
255+
synchapi::WaitOnAddress(
256+
self.locked.as_mut_ptr().cast(), // location to watch
257+
(&true) as *const bool as *mut _, // undesired value
258+
std::mem::size_of::<bool>(), // value size
259+
INFINITE, // timeout
260+
);
261+
}
262+
}
263+
264+
// Poisoning is not supported by this `Mutex`
265+
266+
Ok(MutexGuard {
267+
data: unsafe { &mut *self.data.get() },
268+
locked: &self.locked,
269+
})
270+
}
271+
}
272+
273+
impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
274+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275+
f.write_str("Mutex")
276+
}
277+
}
278+
279+
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
280+
#[inline]
281+
fn drop(&mut self) {
282+
self.locked.store(false, Ordering::Release);
283+
unsafe { synchapi::WakeByAddressSingle(self.locked.as_mut_ptr().cast()) };
284+
}
285+
}
286+
287+
impl<T: ?Sized> std::ops::Deref for MutexGuard<'_, T> {
288+
type Target = T;
289+
290+
#[inline]
291+
fn deref(&self) -> &Self::Target {
292+
self.data
293+
}
294+
}
295+
296+
impl<T: ?Sized> std::ops::DerefMut for MutexGuard<'_, T> {
297+
#[inline]
298+
fn deref_mut(&mut self) -> &mut Self::Target {
299+
self.data
300+
}
301+
}
302+
}

src/r3_port_std/src/ums.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use once_cell::sync::OnceCell;
33
use slab::Slab;
44
use std::{
55
panic::{catch_unwind, AssertUnwindSafe},
6-
sync::{mpsc, Arc, Mutex, MutexGuard},
6+
sync::{mpsc, Arc},
77
thread::Result,
88
};
99

@@ -18,7 +18,7 @@ mod tests;
1818
/// `Sched: `[`Scheduler`].
1919
#[derive(Debug)]
2020
pub struct ThreadGroup<Sched: ?Sized> {
21-
state: Arc<Mutex<State<Sched>>>,
21+
state: Arc<threading::Mutex<State<Sched>>>,
2222
}
2323

2424
impl<Sched: ?Sized> Clone for ThreadGroup<Sched> {
@@ -37,8 +37,8 @@ pub struct ThreadGroupJoinHandle {
3737

3838
/// RAII guard returned by [`ThreadGroup::lock`].
3939
pub struct ThreadGroupLockGuard<'a, Sched: ?Sized> {
40-
state_ref: &'a Arc<Mutex<State<Sched>>>,
41-
guard: MutexGuard<'a, State<Sched>>,
40+
state_ref: &'a Arc<threading::Mutex<State<Sched>>>,
41+
guard: threading::MutexGuard<'a, State<Sched>>,
4242
}
4343

4444
/// Identifies a thread in [`ThreadGroup`].
@@ -82,7 +82,7 @@ struct ThreadLocalBlock {
8282
/// The current thread ID.
8383
thread_id: ThreadId,
8484
/// The thread group the current worker thread belongs to.
85-
state: Arc<Mutex<State<dyn Scheduler>>>,
85+
state: Arc<threading::Mutex<State<dyn Scheduler>>>,
8686
}
8787

8888
impl<Sched: Scheduler> ThreadGroup<Sched> {
@@ -91,7 +91,7 @@ impl<Sched: Scheduler> ThreadGroup<Sched> {
9191
pub fn new(sched: Sched) -> (Self, ThreadGroupJoinHandle) {
9292
let (send, recv) = mpsc::channel();
9393

94-
let state = Arc::new(Mutex::new(State {
94+
let state = Arc::new(threading::Mutex::new(State {
9595
threads: Slab::new(),
9696
num_threads: 0,
9797
cur_thread_id: None,
@@ -264,7 +264,7 @@ impl State<dyn Scheduler> {
264264
///
265265
/// Panics if the current thread is not a worker thread of some [`ThreadGroup`].
266266
pub fn yield_now() {
267-
let thread_group: Arc<Mutex<State<dyn Scheduler>>> = TLB
267+
let thread_group: Arc<threading::Mutex<State<dyn Scheduler>>> = TLB
268268
.with(|cell| cell.get().map(|tlb| Arc::clone(&tlb.state)))
269269
.expect("current thread does not belong to a thread group");
270270

@@ -303,7 +303,7 @@ pub unsafe fn exit_thread() -> ! {
303303

304304
/// Mark the specified thread as exited.
305305
fn finalize_thread(
306-
thread_group: Arc<Mutex<State<dyn Scheduler>>>,
306+
thread_group: Arc<threading::Mutex<State<dyn Scheduler>>>,
307307
thread_id: ThreadId,
308308
result: Result<()>,
309309
) {

0 commit comments

Comments
 (0)