Skip to content

Commit 417e266

Browse files
bors[bot]japaric
andauthored
Merge #153
153: x86_64: "practically" thread-safe Pool r=korken89 a=japaric ### Summary / motivation this PR makes `Pool` `Sync` on x86_64 with the main goal of being able to test `no_std` code that uses `Pool` (e.g. via the singleton `pool!` macro) on x86_64 (as it's not straight forward to run tests on embedded devices, and even less to CI those tests). ### Details this PR reduces the chance of Pool running into the ABA problem (which corrupts the underlying lock-free stack) by using 32-bit version tags (search term: IBM ABA-prevention tags). Version tags do not 100% prevent the ABA problem but make it almost impossible to run into it in practice (the bigger the tag, in bits, the less the chance of running into ABA). See module level docs for details and limitations. As this does not eliminate ABA with 100% certainty perhaps it should live behind a Cargo feature? It seems to me that hazard pointers may be able to completely get rid of ABA but implementing those, AFAICT, require Thread Local Storage (`#[thread_local]`) which is not supported in `no_std` code. r? @korken89 Co-authored-by: Jorge Aparicio <jorge@japaric.io>
2 parents a28dac7 + ebedb55 commit 417e266

File tree

7 files changed

+471
-81
lines changed

7 files changed

+471
-81
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ version = "0.5.4"
2323
default = ["cas"]
2424
cas = []
2525
ufmt-impl = ["ufmt-write"]
26+
# read the docs before enabling: makes `Pool` Sync on x86_64
27+
x86-sync-pool = []
2628
# only for tests
2729
__trybuild = []
2830

ci/script.sh

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ main() {
55
cargo check --target $TARGET --features 'serde'
66

77
if [ $TARGET = x86_64-unknown-linux-gnu ]; then
8-
cargo test --target $TARGET --features 'serde'
9-
cargo test --target $TARGET --release --features 'serde'
8+
cargo test --test cpass --target $TARGET --features 'serde'
9+
cargo test --test cpass --target $TARGET --release --features 'serde'
1010

1111
if [ $MSRV = 1 ]; then
1212
cd cfail
@@ -17,11 +17,10 @@ main() {
1717

1818
if [ $TRAVIS_RUST_VERSION = nightly ]; then
1919
export RUSTFLAGS="-Z sanitizer=thread"
20-
export RUST_TEST_THREADS=1
2120
export TSAN_OPTIONS="suppressions=$(pwd)/suppressions.txt"
2221

23-
cargo test --test tsan --target $TARGET
24-
cargo test --test tsan --target $TARGET --release
22+
cargo test --test tsan --features x86-sync-pool --target $TARGET
23+
cargo test --test tsan --features x86-sync-pool --target $TARGET --release
2524
fi
2625
fi
2726
}

src/pool/cas.rs

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
//! Stack based on CAS atomics
2+
//!
3+
//! To reduce the chance of hitting the ABA problem we use a 32-bit offset + a 32-bit version tag
4+
//! instead of a 64-bit pointer. The version tag will be bumped on each successful `pop` operation.
5+
6+
use core::{
7+
cell::UnsafeCell,
8+
convert::TryFrom,
9+
marker::PhantomData,
10+
mem,
11+
num::NonZeroUsize,
12+
ptr::NonNull,
13+
sync::atomic::{AtomicUsize, Ordering},
14+
};
15+
16+
/// Unfortunate implementation detail required to use the
17+
/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method
18+
pub struct Node<T> {
19+
next: Atomic<Node<T>>,
20+
pub(crate) data: UnsafeCell<T>,
21+
}
22+
23+
impl<T> Node<T> {
24+
fn next(&self) -> &Atomic<Node<T>> {
25+
&self.next
26+
}
27+
}
28+
29+
pub struct Stack<T> {
30+
head: Atomic<Node<T>>,
31+
}
32+
33+
impl<T> Stack<T> {
34+
pub const fn new() -> Self {
35+
Self {
36+
head: Atomic::null(),
37+
}
38+
}
39+
40+
pub fn push(&self, new_head: Ptr<Node<T>>) {
41+
let mut head = self.head.load(Ordering::Relaxed);
42+
43+
loop {
44+
unsafe {
45+
new_head
46+
.as_raw()
47+
.as_ref()
48+
.next()
49+
.store(head, Ordering::Relaxed);
50+
}
51+
52+
if let Err(p) = self.head.compare_and_exchange_weak(
53+
head,
54+
Some(new_head),
55+
Ordering::Release,
56+
Ordering::Relaxed,
57+
) {
58+
head = p;
59+
} else {
60+
return;
61+
}
62+
}
63+
}
64+
65+
pub fn try_pop(&self) -> Option<Ptr<Node<T>>> {
66+
loop {
67+
if let Some(mut head) = self.head.load(Ordering::Acquire) {
68+
let next = unsafe { head.as_raw().as_ref().next().load(Ordering::Relaxed) };
69+
70+
if self
71+
.head
72+
.compare_and_exchange_weak(
73+
Some(head),
74+
next,
75+
Ordering::Release,
76+
Ordering::Relaxed,
77+
)
78+
.is_ok()
79+
{
80+
head.incr_tag();
81+
return Some(head);
82+
}
83+
} else {
84+
// stack observed empty
85+
return None;
86+
}
87+
}
88+
}
89+
}
90+
91+
fn anchor<T>() -> *mut T {
92+
static mut ANCHOR: u8 = 0;
93+
(unsafe { &mut ANCHOR } as *mut u8 as usize & !(mem::align_of::<T>() - 1)) as *mut T
94+
}
95+
96+
/// Anchored pointer. This is a (signed) 32-bit offset from `anchor` plus a 32-bit tag
97+
pub struct Ptr<T> {
98+
inner: NonZeroUsize,
99+
_marker: PhantomData<*mut T>,
100+
}
101+
102+
impl<T> Clone for Ptr<T> {
103+
fn clone(&self) -> Self {
104+
*self
105+
}
106+
}
107+
108+
impl<T> Copy for Ptr<T> {}
109+
110+
impl<T> Ptr<T> {
111+
pub fn new(p: *mut T) -> Option<Self> {
112+
i32::try_from((p as isize).wrapping_sub(anchor::<T>() as isize))
113+
.ok()
114+
.map(|offset| unsafe { Ptr::from_parts(0, offset) })
115+
}
116+
117+
unsafe fn from_parts(tag: u32, offset: i32) -> Self {
118+
Self {
119+
inner: NonZeroUsize::new_unchecked((tag as usize) << 32 | (offset as u32 as usize)),
120+
_marker: PhantomData,
121+
}
122+
}
123+
124+
fn from_usize(p: usize) -> Option<Self> {
125+
NonZeroUsize::new(p).map(|inner| Self {
126+
inner,
127+
_marker: PhantomData,
128+
})
129+
}
130+
131+
fn into_usize(&self) -> usize {
132+
self.inner.get()
133+
}
134+
135+
fn tag(&self) -> u32 {
136+
(self.inner.get() >> 32) as u32
137+
}
138+
139+
fn incr_tag(&mut self) {
140+
let tag = self.tag().wrapping_add(1);
141+
let offset = self.offset();
142+
143+
*self = unsafe { Ptr::from_parts(tag, offset) };
144+
}
145+
146+
fn offset(&self) -> i32 {
147+
self.inner.get() as i32
148+
}
149+
150+
fn as_raw(&self) -> NonNull<T> {
151+
unsafe {
152+
NonNull::new_unchecked(
153+
(anchor::<T>() as *mut u8).offset(self.offset() as isize) as *mut T
154+
)
155+
}
156+
}
157+
158+
pub fn dangling() -> Self {
159+
unsafe { Self::from_parts(0, 1) }
160+
}
161+
162+
pub unsafe fn as_ref(&self) -> &T {
163+
&*self.as_raw().as_ptr()
164+
}
165+
}
166+
167+
struct Atomic<T> {
168+
inner: AtomicUsize,
169+
_marker: PhantomData<*mut T>,
170+
}
171+
172+
impl<T> Atomic<T> {
173+
const fn null() -> Self {
174+
Self {
175+
inner: AtomicUsize::new(0),
176+
_marker: PhantomData,
177+
}
178+
}
179+
180+
fn compare_and_exchange_weak(
181+
&self,
182+
current: Option<Ptr<T>>,
183+
new: Option<Ptr<T>>,
184+
succ: Ordering,
185+
fail: Ordering,
186+
) -> Result<(), Option<Ptr<T>>> {
187+
self.inner
188+
.compare_exchange_weak(
189+
current.map(|p| p.into_usize()).unwrap_or(0),
190+
new.map(|p| p.into_usize()).unwrap_or(0),
191+
succ,
192+
fail,
193+
)
194+
.map(drop)
195+
.map_err(Ptr::from_usize)
196+
}
197+
198+
fn load(&self, ord: Ordering) -> Option<Ptr<T>> {
199+
NonZeroUsize::new(self.inner.load(ord)).map(|inner| Ptr {
200+
inner,
201+
_marker: PhantomData,
202+
})
203+
}
204+
205+
fn store(&self, val: Option<Ptr<T>>, ord: Ordering) {
206+
self.inner
207+
.store(val.map(|p| p.into_usize()).unwrap_or(0), ord)
208+
}
209+
}

src/pool/llsc.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//! Stack based on LL/SC atomics
2+
3+
pub use core::ptr::NonNull as Ptr;
4+
use core::{
5+
cell::UnsafeCell,
6+
ptr,
7+
sync::atomic::{AtomicPtr, Ordering},
8+
};
9+
10+
/// Unfortunate implementation detail required to use the
11+
/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method
12+
pub struct Node<T> {
13+
next: AtomicPtr<Node<T>>,
14+
pub(crate) data: UnsafeCell<T>,
15+
}
16+
17+
impl<T> Node<T> {
18+
fn next(&self) -> &AtomicPtr<Node<T>> {
19+
&self.next
20+
}
21+
}
22+
23+
pub struct Stack<T> {
24+
head: AtomicPtr<Node<T>>,
25+
}
26+
27+
impl<T> Stack<T> {
28+
pub const fn new() -> Self {
29+
Self {
30+
head: AtomicPtr::new(ptr::null_mut()),
31+
}
32+
}
33+
34+
pub fn push(&self, new_head: Ptr<Node<T>>) {
35+
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
36+
37+
let mut head = self.head.load(Ordering::Relaxed);
38+
loop {
39+
unsafe { new_head.as_ref().next().store(head, Ordering::Relaxed) }
40+
41+
match self.head.compare_exchange_weak(
42+
head,
43+
new_head.as_ptr(),
44+
Ordering::Release, // success
45+
Ordering::Relaxed, // failure
46+
) {
47+
Ok(_) => return,
48+
// interrupt occurred or other core made a successful STREX op on the head
49+
Err(p) => head = p,
50+
}
51+
}
52+
}
53+
54+
pub fn try_pop(&self) -> Option<Ptr<Node<T>>> {
55+
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
56+
57+
loop {
58+
let head = self.head.load(Ordering::Acquire);
59+
if let Some(nn_head) = Ptr::new(head) {
60+
let next = unsafe { nn_head.as_ref().next().load(Ordering::Relaxed) };
61+
62+
match self.head.compare_exchange_weak(
63+
head,
64+
next,
65+
Ordering::Release, // success
66+
Ordering::Relaxed, // failure
67+
) {
68+
Ok(_) => break Some(nn_head),
69+
// interrupt occurred or other core made a successful STREX op on the head
70+
Err(_) => continue,
71+
}
72+
} else {
73+
// stack is observed as empty
74+
break None;
75+
}
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)