Skip to content

Commit f4b419c

Browse files
committed
Implement thread parking, RwLock, Once fallbacks
- Bring back "Generic" thread parker - Add checks for Win8 and WinXP APIs to decide which thread parker impl to use - Use `queue::RwLock` for rust9x. Now that the thread parker is available, we get this high-quality `RwLock` implementation on all API levels, without any extra fallback impls! - Same for `Once`
1 parent f9d7036 commit f4b419c

File tree

9 files changed

+422
-10
lines changed

9 files changed

+422
-10
lines changed

library/std/src/sys/pal/windows/c.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
#![unstable(issue = "none", feature = "windows_c")]
66
#![allow(clippy::style)]
77

8-
use core::ffi::{c_uint, c_ulong, c_ushort, c_void};
8+
#[allow(unused_imports)]
9+
use core::ffi::c_void;
10+
use core::ffi::{c_uint, c_ulong, c_ushort};
911
use core::{mem, ptr};
1012

1113
mod windows_sys;
@@ -150,7 +152,7 @@ compat_fn_with_fallback! {
150152
}
151153
}
152154

153-
#[cfg(not(target_vendor = "win7"))]
155+
#[cfg(not(any(target_vendor = "win7", target_vendor = "rust9x")))]
154156
// Use raw-dylib to import synchronization functions to workaround issues with the older mingw import library.
155157
#[cfg_attr(
156158
target_arch = "x86",
@@ -187,11 +189,31 @@ compat_fn_optional! {
187189
pub fn WakeByAddressSingle(address: *const c_void);
188190
}
189191

190-
#[cfg(any(target_vendor = "win7", target_vendor = "uwp"))]
192+
#[cfg(target_vendor = "rust9x")]
193+
compat_fn_with_fallback! {
194+
pub static SYNCH: &CStr = c"api-ms-win-core-synch-l1-2-0" => { load: true, unicows: false };
195+
196+
pub fn WaitOnAddress(
197+
address: *const c_void,
198+
compareaddress: *const c_void,
199+
addresssize: usize,
200+
dwmilliseconds: u32
201+
) -> BOOL {
202+
rtabort!("unimplemented")
203+
}
204+
pub fn WakeByAddressSingle(address: *const c_void) {
205+
rtabort!("unimplemented")
206+
}
207+
pub fn WakeByAddressAll(address: *const c_void) {
208+
rtabort!("unimplemented")
209+
}
210+
}
211+
212+
#[cfg(any(target_vendor = "win7", target_vendor = "rust9x", target_vendor = "uwp"))]
191213
compat_fn_with_fallback! {
192214
pub static NTDLL: &CStr = c"ntdll" => { load: true, unicows: false };
193215

194-
#[cfg(target_vendor = "win7")]
216+
#[cfg(any(target_vendor = "win7", target_vendor = "rust9x"))]
195217
pub fn NtCreateKeyedEvent(
196218
KeyedEventHandle: *mut HANDLE,
197219
DesiredAccess: u32,
@@ -200,7 +222,7 @@ compat_fn_with_fallback! {
200222
) -> NTSTATUS {
201223
panic!("keyed events not available")
202224
}
203-
#[cfg(target_vendor = "win7")]
225+
#[cfg(any(target_vendor = "win7", target_vendor = "rust9x"))]
204226
pub fn NtReleaseKeyedEvent(
205227
EventHandle: HANDLE,
206228
Key: *const c_void,
@@ -209,7 +231,7 @@ compat_fn_with_fallback! {
209231
) -> NTSTATUS {
210232
panic!("keyed events not available")
211233
}
212-
#[cfg(target_vendor = "win7")]
234+
#[cfg(any(target_vendor = "win7", target_vendor = "rust9x"))]
213235
pub fn NtWaitForKeyedEvent(
214236
EventHandle: HANDLE,
215237
Key: *const c_void,

library/std/src/sys/pal/windows/compat.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use crate::sys::c;
2626
#[cfg(target_vendor = "rust9x")]
2727
pub(crate) mod checks;
2828

29+
#[cfg(target_vendor = "rust9x")]
30+
pub(crate) mod thread_parking;
31+
2932
// This uses a static initializer to preload some imported functions.
3033
// The CRT (C runtime) executes static initializers before `main`
3134
// is called (for binaries) and before `DllMain` is called (for DLLs).
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
static mut THREAD_PARKING_IMPL: Option<ThreadParkingImpl> = None;
2+
3+
#[derive(Clone, Copy, PartialEq, Eq)]
4+
pub(crate) enum ThreadParkingImpl {
5+
// WaitOnAddress-based, Windows 8+
6+
Futex,
7+
// NtCreatedKeyedEvent-based, Windows XP+
8+
KeyedEvent,
9+
// Generic thread parker based on Mutex+Condvar
10+
Generic,
11+
}
12+
13+
// This CANNOT be called during global init because it has to `LoadLibrary` to figure out which impl
14+
// is available.
15+
pub(crate) fn thread_parking_impl() -> ThreadParkingImpl {
16+
if let Some(implementation) = unsafe { THREAD_PARKING_IMPL } {
17+
return implementation;
18+
}
19+
20+
let implementation = {
21+
if crate::sys::c::WaitOnAddress::available().is_some()
22+
&& crate::sys::c::WakeByAddressSingle::available().is_some()
23+
{
24+
ThreadParkingImpl::Futex
25+
} else if crate::sys::c::NtCreateKeyedEvent::available().is_some()
26+
&& crate::sys::c::NtReleaseKeyedEvent::available().is_some()
27+
&& crate::sys::c::NtWaitForKeyedEvent::available().is_some()
28+
{
29+
ThreadParkingImpl::KeyedEvent
30+
} else {
31+
ThreadParkingImpl::Generic
32+
}
33+
};
34+
35+
unsafe {
36+
THREAD_PARKING_IMPL = Some(implementation);
37+
}
38+
39+
implementation
40+
}

library/std/src/sys/sync/once/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
cfg_if::cfg_if! {
1111
if #[cfg(any(
12-
all(target_os = "windows", not(target_vendor="win7")),
12+
all(target_os = "windows", not(any(target_vendor = "win7", target_vendor = "rust9x"))),
1313
target_os = "linux",
1414
target_os = "android",
1515
all(target_arch = "wasm32", target_feature = "atomics"),

library/std/src/sys/sync/rwlock/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cfg_if::cfg_if! {
22
if #[cfg(any(
3-
all(target_os = "windows", not(target_vendor = "win7")),
3+
all(target_os = "windows", not(any(target_vendor = "win7", target_vendor = "rust9x"))),
44
target_os = "linux",
55
target_os = "android",
66
target_os = "freebsd",
@@ -14,7 +14,7 @@ cfg_if::cfg_if! {
1414
pub use futex::RwLock;
1515
} else if #[cfg(any(
1616
target_family = "unix",
17-
all(target_os = "windows", target_vendor = "win7"),
17+
all(target_os = "windows", any(target_vendor = "win7", target_vendor = "rust9x")),
1818
all(target_vendor = "fortanix", target_env = "sgx"),
1919
target_os = "xous",
2020
))] {
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
//! Parker implementation based on a Mutex and Condvar.
2+
3+
use crate::pin::Pin;
4+
use crate::sync::atomic::AtomicUsize;
5+
use crate::sync::atomic::Ordering::SeqCst;
6+
use crate::sync::{Condvar, Mutex};
7+
use crate::time::Duration;
8+
9+
const EMPTY: usize = 0;
10+
const PARKED: usize = 1;
11+
const NOTIFIED: usize = 2;
12+
13+
pub struct Parker {
14+
state: AtomicUsize,
15+
lock: Mutex<()>,
16+
cvar: Condvar,
17+
}
18+
19+
impl Parker {
20+
/// Construct the generic parker. The UNIX parker implementation
21+
/// requires this to happen in-place.
22+
pub unsafe fn new_in_place(parker: *mut Parker) {
23+
parker.write(Parker {
24+
state: AtomicUsize::new(EMPTY),
25+
lock: Mutex::new(()),
26+
cvar: Condvar::new(),
27+
});
28+
}
29+
30+
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
31+
pub unsafe fn park(self: Pin<&Self>) {
32+
// If we were previously notified then we consume this notification and
33+
// return quickly.
34+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
35+
return;
36+
}
37+
38+
// Otherwise we need to coordinate going to sleep
39+
let mut m = self.lock.lock().unwrap();
40+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
41+
Ok(_) => {}
42+
Err(NOTIFIED) => {
43+
// We must read here, even though we know it will be `NOTIFIED`.
44+
// This is because `unpark` may have been called again since we read
45+
// `NOTIFIED` in the `compare_exchange` above. We must perform an
46+
// acquire operation that synchronizes with that `unpark` to observe
47+
// any writes it made before the call to unpark. To do that we must
48+
// read from the write it made to `state`.
49+
let old = self.state.swap(EMPTY, SeqCst);
50+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
51+
return;
52+
} // should consume this notification, so prohibit spurious wakeups in next park.
53+
Err(_) => panic!("inconsistent park state"),
54+
}
55+
loop {
56+
m = self.cvar.wait(m).unwrap();
57+
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
58+
Ok(_) => return, // got a notification
59+
Err(_) => {} // spurious wakeup, go back to sleep
60+
}
61+
}
62+
}
63+
64+
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
65+
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
66+
// Like `park` above we have a fast path for an already-notified thread, and
67+
// afterwards we start coordinating for a sleep.
68+
// return quickly.
69+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
70+
return;
71+
}
72+
let m = self.lock.lock().unwrap();
73+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
74+
Ok(_) => {}
75+
Err(NOTIFIED) => {
76+
// We must read again here, see `park`.
77+
let old = self.state.swap(EMPTY, SeqCst);
78+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
79+
return;
80+
} // should consume this notification, so prohibit spurious wakeups in next park.
81+
Err(_) => panic!("inconsistent park_timeout state"),
82+
}
83+
84+
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
85+
// from a notification we just want to unconditionally set the state back to
86+
// empty, either consuming a notification or un-flagging ourselves as
87+
// parked.
88+
let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
89+
match self.state.swap(EMPTY, SeqCst) {
90+
NOTIFIED => {} // got a notification, hurray!
91+
PARKED => {} // no notification, alas
92+
n => panic!("inconsistent park_timeout state: {n}"),
93+
}
94+
}
95+
96+
// This implementation doesn't require `Pin`, but other implementations do.
97+
pub fn unpark(self: Pin<&Self>) {
98+
// To ensure the unparked thread will observe any writes we made
99+
// before this call, we must perform a release operation that `park`
100+
// can synchronize with. To do that we must write `NOTIFIED` even if
101+
// `state` is already `NOTIFIED`. That is why this must be a swap
102+
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
103+
// on failure.
104+
match self.state.swap(NOTIFIED, SeqCst) {
105+
EMPTY => return, // no one was waiting
106+
NOTIFIED => return, // already unparked
107+
PARKED => {} // gotta go wake someone up
108+
_ => panic!("inconsistent state in unpark"),
109+
}
110+
111+
// There is a period between when the parked thread sets `state` to
112+
// `PARKED` (or last checked `state` in the case of a spurious wake
113+
// up) and when it actually waits on `cvar`. If we were to notify
114+
// during this period it would be ignored and then when the parked
115+
// thread went to sleep it would never wake up. Fortunately, it has
116+
// `lock` locked at this stage so we can acquire `lock` to wait until
117+
// it is ready to receive the notification.
118+
//
119+
// Releasing `lock` before the call to `notify_one` means that when the
120+
// parked thread wakes it doesn't get woken only to have to wait for us
121+
// to release `lock`.
122+
drop(self.lock.lock().unwrap());
123+
self.cvar.notify_one()
124+
}
125+
}

library/std/src/sys/sync/thread_parking/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cfg_if::cfg_if! {
22
if #[cfg(any(
3-
all(target_os = "windows", not(target_vendor = "win7")),
3+
all(target_os = "windows", not(any(target_vendor = "win7", target_vendor = "rust9x"))),
44
target_os = "linux",
55
target_os = "android",
66
all(target_arch = "wasm32", target_feature = "atomics"),
@@ -22,6 +22,12 @@ cfg_if::cfg_if! {
2222
} else if #[cfg(target_vendor = "win7")] {
2323
mod windows7;
2424
pub use windows7::Parker;
25+
} else if #[cfg(target_vendor = "rust9x")] {
26+
mod futex;
27+
mod windowsxp;
28+
mod generic;
29+
mod rust9x;
30+
pub use rust9x::Parker;
2531
} else if #[cfg(all(target_vendor = "apple", not(miri)))] {
2632
// Doesn't work in Miri, see <https://github.com/rust-lang/miri/issues/2589>.
2733
mod darwin;
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::mem::ManuallyDrop;
2+
use crate::pin::Pin;
3+
use crate::sys::compat::thread_parking::{ThreadParkingImpl, thread_parking_impl};
4+
use crate::time::Duration;
5+
6+
pub union Parker {
7+
futex: ManuallyDrop<super::futex::Parker>,
8+
keyed_event: ManuallyDrop<super::windowsxp::Parker>,
9+
generic: ManuallyDrop<super::generic::Parker>,
10+
}
11+
12+
impl Parker {
13+
pub unsafe fn new_in_place(parker: *mut Parker) {
14+
let impl_ = thread_parking_impl();
15+
16+
match impl_ {
17+
ThreadParkingImpl::Futex => unsafe {
18+
super::futex::Parker::new_in_place(&raw mut (*(*parker).futex))
19+
},
20+
ThreadParkingImpl::KeyedEvent => unsafe {
21+
super::windowsxp::Parker::new_in_place(&raw mut (*(*parker).keyed_event))
22+
},
23+
ThreadParkingImpl::Generic => unsafe {
24+
super::generic::Parker::new_in_place(&raw mut (*(*parker).generic))
25+
},
26+
}
27+
}
28+
29+
pub unsafe fn park(self: Pin<&Self>) {
30+
let impl_ = thread_parking_impl();
31+
32+
match impl_ {
33+
ThreadParkingImpl::Futex => unsafe { self.map_unchecked(|p| &*p.futex).park() },
34+
ThreadParkingImpl::KeyedEvent => unsafe {
35+
self.map_unchecked(|p| &*p.keyed_event).park()
36+
},
37+
ThreadParkingImpl::Generic => unsafe { self.map_unchecked(|p| &*p.generic).park() },
38+
}
39+
}
40+
41+
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
42+
let impl_ = thread_parking_impl();
43+
44+
match impl_ {
45+
ThreadParkingImpl::Futex => unsafe {
46+
self.map_unchecked(|p| &*p.futex).park_timeout(timeout)
47+
},
48+
ThreadParkingImpl::KeyedEvent => unsafe {
49+
self.map_unchecked(|p| &*p.keyed_event).park_timeout(timeout)
50+
},
51+
ThreadParkingImpl::Generic => unsafe {
52+
self.map_unchecked(|p| &*p.generic).park_timeout(timeout)
53+
},
54+
}
55+
}
56+
57+
pub fn unpark(self: Pin<&Self>) {
58+
let impl_ = thread_parking_impl();
59+
60+
match impl_ {
61+
ThreadParkingImpl::Futex => unsafe {
62+
self.map_unchecked(|p| &*p.futex).unpark();
63+
},
64+
ThreadParkingImpl::KeyedEvent => unsafe {
65+
self.map_unchecked(|p| &*p.keyed_event).unpark()
66+
},
67+
ThreadParkingImpl::Generic => unsafe { self.map_unchecked(|p| &*p.generic).unpark() },
68+
}
69+
}
70+
}
71+
72+
impl Drop for Parker {
73+
fn drop(&mut self) {
74+
let impl_ = thread_parking_impl();
75+
76+
match impl_ {
77+
ThreadParkingImpl::Futex | ThreadParkingImpl::KeyedEvent => {
78+
// these don't require any cleanup
79+
}
80+
ThreadParkingImpl::Generic => unsafe { ManuallyDrop::drop(&mut self.generic) },
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)