Skip to content

chore: replace deprecated compare_and_swap with compare_exchange #3331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {

impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Acquire) {
if self
.locked
.compare_exchange(false, true, Acquire, Acquire)
.is_ok()
{
Poll::Ready(Guard { inner: self })
} else {
// Spin... but investigate a better strategy
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/loom/std/atomic_u64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ mod imp {
prev
}

pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 {
pub(crate) fn compare_exchange(&self, old: u64, new: u64, _: Ordering) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let prev = *lock;

if prev != old {
return prev;
return Err(prev);
}

*lock = new;
prev
Ok(prev)
}
}
}
20 changes: 12 additions & 8 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
Expand Down Expand Up @@ -194,13 +194,17 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
let actual = self.inner.head.compare_and_swap(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
);

if actual != prev {
if self
.inner
.head
.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
Relaxed,
)
.is_err()
{
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
Expand Down
18 changes: 10 additions & 8 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,15 @@ impl<T> Block<T> {
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
ordering: Ordering,
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.next
.compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);

match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
Expand Down Expand Up @@ -306,11 +308,11 @@ impl<T> Block<T> {
//
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(self.next.compare_and_swap(
ptr::null_mut(),
new_block.as_ptr(),
AcqRel,
));
let next = NonNull::new(
self.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);

let next = match next {
Some(next) => next,
Expand All @@ -333,7 +335,7 @@ impl<T> Block<T> {

// TODO: Should this iteration be capped?
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };

curr = match actual {
Ok(_) => {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ impl<T> Tx<T> {
//
// Acquire is not needed as any "actual" value is not accessed.
// At this point, the linked list is walked to acquire blocks.
let actual =
self.block_tail
.compare_and_swap(block_ptr, next_block.as_ptr(), Release);

if actual == block_ptr {
if self
.block_tail
.compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
.is_ok()
{
// Synchronize with any senders
let tail_position = self.tail_position.fetch_add(0, Release);

Expand Down Expand Up @@ -191,7 +191,7 @@ impl<T> Tx<T> {

// TODO: Unify this logic with Block::grow
for _ in 0..3 {
match curr.as_ref().try_push(&mut block, AcqRel) {
match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
Ok(_) => {
reused = true;
break;
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/sync/task/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ impl AtomicWaker {
where
W: WakerRef,
{
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
match self
.state
.compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
.unwrap_or_else(|x| x)
{
WAITING => {
unsafe {
// Locked acquired, update the waker cell
Expand Down