Skip to content

Commit e4580da

Browse files
japarickorken89
authored andcommitted
x86_64: "practically" thread-safe Pool
1 parent a28dac7 commit e4580da

File tree

7 files changed

+451
-75
lines changed

7 files changed

+451
-75
lines changed

ci/script.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ main() {
1717

1818
if [ $TRAVIS_RUST_VERSION = nightly ]; then
1919
export RUSTFLAGS="-Z sanitizer=thread"
20-
export RUST_TEST_THREADS=1
2120
export TSAN_OPTIONS="suppressions=$(pwd)/suppressions.txt"
2221

2322
cargo test --test tsan --target $TARGET

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@
7373
#![deny(missing_docs)]
7474
#![deny(rust_2018_compatibility)]
7575
#![deny(rust_2018_idioms)]
76-
#![deny(warnings)]
76+
// #![deny(warnings)]
77+
#![allow(warnings)] // FIXME
7778

7879
pub use binary_heap::BinaryHeap;
7980
pub use generic_array::typenum::consts;

src/pool/cas.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
//! Stack based on CAS atomics
2+
//!
3+
//! To reduce the chance of hitting the ABA problem we use a 32-bit offset + a 32-bit version tag
4+
//! instead of a 64-bit pointer. The version tag will be bumped on each successful `pop` operation.
5+
6+
use core::{
7+
cell::UnsafeCell,
8+
convert::TryFrom,
9+
marker::PhantomData,
10+
mem::{self, MaybeUninit},
11+
num::NonZeroUsize,
12+
ptr::NonNull,
13+
sync::atomic::{AtomicUsize, Ordering},
14+
};
15+
16+
/// Unfortunate implementation detail required to use the
17+
/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method
18+
pub struct Node<T> {
19+
next: Atomic<Node<T>>,
20+
pub(crate) data: UnsafeCell<T>,
21+
}
22+
23+
impl<T> Node<T> {
24+
fn next(&self) -> &Atomic<Node<T>> {
25+
&self.next
26+
}
27+
}
28+
29+
pub struct Stack<T> {
30+
head: Atomic<Node<T>>,
31+
}
32+
33+
impl<T> Stack<T> {
34+
pub const fn new() -> Self {
35+
Self {
36+
head: Atomic::null(),
37+
}
38+
}
39+
40+
pub fn push(&self, new_head: Ptr<Node<T>>) {
41+
let mut head = self.head.load(Ordering::Relaxed);
42+
43+
loop {
44+
unsafe {
45+
new_head
46+
.as_raw()
47+
.as_ref()
48+
.next()
49+
.store(head, Ordering::Relaxed);
50+
}
51+
52+
if let Err(p) = self.head.compare_and_exchange_weak(
53+
head,
54+
Some(new_head),
55+
Ordering::Release,
56+
Ordering::Relaxed,
57+
) {
58+
head = p;
59+
} else {
60+
return;
61+
}
62+
}
63+
}
64+
65+
pub fn try_pop(&self) -> Option<Ptr<Node<T>>> {
66+
loop {
67+
if let Some(mut head) = self.head.load(Ordering::Acquire) {
68+
let next = unsafe { head.as_raw().as_ref().next().load(Ordering::Relaxed) };
69+
70+
if self
71+
.head
72+
.compare_and_exchange_weak(
73+
Some(head),
74+
next,
75+
Ordering::Release,
76+
Ordering::Relaxed,
77+
)
78+
.is_ok()
79+
{
80+
head.incr_tag();
81+
return Some(head);
82+
}
83+
} else {
84+
// stack observed empty
85+
return None;
86+
}
87+
}
88+
}
89+
}
90+
91+
fn anchor<T>() -> *mut T {
92+
static mut ANCHOR: u8 = 0;
93+
(unsafe { &mut ANCHOR } as *mut u8 as usize & !(mem::align_of::<T>() - 1)) as *mut T
94+
}
95+
96+
/// Anchored pointer. This is a (signed) 32-bit offset from `anchor` plus a 32-bit tag
97+
pub struct Ptr<T> {
98+
inner: NonZeroUsize,
99+
_marker: PhantomData<*mut T>,
100+
}
101+
102+
impl<T> Clone for Ptr<T> {
103+
fn clone(&self) -> Self {
104+
*self
105+
}
106+
}
107+
108+
impl<T> Copy for Ptr<T> {}
109+
110+
impl<T> Ptr<T> {
111+
pub fn new(p: *mut T) -> Option<Self> {
112+
i32::try_from((p as isize).wrapping_sub(anchor::<T>() as isize))
113+
.ok()
114+
.map(|offset| unsafe { Ptr::from_parts(0, offset) })
115+
}
116+
117+
unsafe fn from_parts(tag: u32, offset: i32) -> Self {
118+
Self {
119+
inner: NonZeroUsize::new_unchecked((tag as usize) << 32 | (offset as u32 as usize)),
120+
_marker: PhantomData,
121+
}
122+
}
123+
124+
fn from_usize(p: usize) -> Option<Self> {
125+
NonZeroUsize::new(p).map(|inner| Self {
126+
inner,
127+
_marker: PhantomData,
128+
})
129+
}
130+
131+
fn into_usize(&self) -> usize {
132+
self.inner.get()
133+
}
134+
135+
fn tag(&self) -> u32 {
136+
(self.inner.get() >> 32) as u32
137+
}
138+
139+
fn incr_tag(&mut self) {
140+
let tag = self.tag().wrapping_add(1);
141+
let offset = self.offset();
142+
143+
*self = unsafe { Ptr::from_parts(tag, offset) };
144+
}
145+
146+
fn offset(&self) -> i32 {
147+
self.inner.get() as i32
148+
}
149+
150+
fn as_raw(&self) -> NonNull<T> {
151+
unsafe {
152+
NonNull::new_unchecked(
153+
anchor::<T>()
154+
.cast::<u8>()
155+
.offset(self.offset() as isize)
156+
.cast(),
157+
)
158+
}
159+
}
160+
161+
pub fn dangling() -> Self {
162+
unsafe { Self::from_parts(0, 1) }
163+
}
164+
165+
pub unsafe fn as_ref(&self) -> &T {
166+
&*self.as_raw().as_ptr()
167+
}
168+
}
169+
170+
struct Atomic<T> {
171+
inner: AtomicUsize,
172+
_marker: PhantomData<*mut T>,
173+
}
174+
175+
impl<T> Atomic<T> {
176+
const fn null() -> Self {
177+
Self {
178+
inner: AtomicUsize::new(0),
179+
_marker: PhantomData,
180+
}
181+
}
182+
183+
fn compare_and_exchange_weak(
184+
&self,
185+
current: Option<Ptr<T>>,
186+
new: Option<Ptr<T>>,
187+
succ: Ordering,
188+
fail: Ordering,
189+
) -> Result<(), Option<Ptr<T>>> {
190+
self.inner
191+
.compare_exchange_weak(
192+
current.map(|p| p.into_usize()).unwrap_or(0),
193+
new.map(|p| p.into_usize()).unwrap_or(0),
194+
succ,
195+
fail,
196+
)
197+
.map(drop)
198+
.map_err(Ptr::from_usize)
199+
}
200+
201+
fn load(&self, ord: Ordering) -> Option<Ptr<T>> {
202+
NonZeroUsize::new(self.inner.load(ord)).map(|inner| Ptr {
203+
inner,
204+
_marker: PhantomData,
205+
})
206+
}
207+
208+
fn store(&self, val: Option<Ptr<T>>, ord: Ordering) {
209+
self.inner
210+
.store(val.map(|p| p.into_usize()).unwrap_or(0), ord)
211+
}
212+
}

src/pool/llsc.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//! Stack based on LL/SC atomics
2+
3+
pub use core::ptr::NonNull as Ptr;
4+
use core::{
5+
cell::UnsafeCell,
6+
ptr,
7+
sync::atomic::{self, AtomicPtr, Ordering},
8+
};
9+
10+
pub struct Node<T> {
11+
next: AtomicPtr<Node<T>>,
12+
pub(crate) data: UnsafeCell<T>,
13+
}
14+
15+
impl<T> Node<T> {
16+
fn next(&self) -> &AtomicPtr<Node<T>> {
17+
&self.next
18+
}
19+
}
20+
21+
pub struct Stack<T> {
22+
head: AtomicPtr<Node<T>>,
23+
}
24+
25+
impl<T> Stack<T> {
26+
pub const fn new() -> Self {
27+
Self {
28+
head: AtomicPtr::new(ptr::null_mut()),
29+
}
30+
}
31+
32+
pub fn push(&self, new_head: Ptr<Node<T>>) {
33+
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
34+
35+
let mut head = self.head.load(Ordering::Relaxed);
36+
loop {
37+
unsafe { new_head.as_ref().next().store(head, Ordering::Relaxed) }
38+
39+
match self.head.compare_exchange_weak(
40+
head,
41+
new_head.as_ptr(),
42+
Ordering::Release, // success
43+
Ordering::Relaxed, // failure
44+
) {
45+
Ok(_) => return,
46+
// interrupt occurred or other core made a successful STREX op on the head
47+
Err(p) => head = p,
48+
}
49+
}
50+
}
51+
52+
pub fn try_pop(&self) -> Option<Ptr<Node<T>>> {
53+
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
54+
55+
loop {
56+
let head = self.head.load(Ordering::Acquire);
57+
if let Some(nn_head) = Ptr::new(head) {
58+
let next = unsafe { nn_head.as_ref().next().load(Ordering::Relaxed) };
59+
60+
match self.head.compare_exchange_weak(
61+
head,
62+
next,
63+
Ordering::Release, // success
64+
Ordering::Relaxed, // failure
65+
) {
66+
Ok(_) => break Some(nn_head),
67+
// interrupt occurred or other core made a successful STREX op on the head
68+
Err(_) => continue,
69+
}
70+
} else {
71+
// stack is observed as empty
72+
break None;
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)