Skip to content

Commit 03dd9fd

Browse files
author
Stjepan Glavina
committed
Implement using async-mutex
1 parent 9cc179e commit 03dd9fd

File tree

2 files changed

+22
-125
lines changed

2 files changed

+22
-125
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ categories = ["asynchronous", "concurrency"]
1313
readme = "README.md"
1414

1515
[dependencies]
16-
event-listener = "2.0.0"
16+
async-mutex = "1.1.5"
1717

1818
[dev-dependencies]
1919
smol = "0.1.18"

src/lib.rs

Lines changed: 21 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@
3939

4040
use std::cell::UnsafeCell;
4141
use std::fmt;
42+
use std::mem;
4243
use std::ops::{Deref, DerefMut};
43-
use std::process;
44-
use std::sync::atomic::{AtomicUsize, Ordering};
4544
use std::sync::Arc;
46-
use std::time::{Duration, Instant};
47-
use std::usize;
4845

49-
use event_listener::Event;
46+
use async_mutex::{Mutex, MutexGuard};
5047

5148
/// An async lock.
5249
pub struct Lock<T>(Arc<Inner<T>>);
5350

51+
unsafe impl<T: Send> Send for Lock<T> {}
52+
unsafe impl<T: Send> Sync for Lock<T> {}
53+
5454
impl<T> Clone for Lock<T> {
5555
fn clone(&self) -> Lock<T> {
5656
Lock(self.0.clone())
@@ -59,21 +59,15 @@ impl<T> Clone for Lock<T> {
5959

6060
/// Data inside [`Lock`].
6161
struct Inner<T> {
62-
/// Current state of the lock.
63-
///
64-
/// The least significant bit is set to 1 if the lock is acquired.
65-
/// The other bits hold the number of starved lock operations.
66-
state: AtomicUsize,
67-
68-
/// Lock operations waiting for the lock to be released.
69-
lock_ops: Event,
62+
/// The inner mutex.
63+
mutex: Mutex<()>,
7064

7165
/// The value inside the lock.
7266
data: UnsafeCell<T>,
7367
}
7468

75-
unsafe impl<T: Send> Send for Lock<T> {}
76-
unsafe impl<T: Send> Sync for Lock<T> {}
69+
unsafe impl<T: Send> Send for Inner<T> {}
70+
unsafe impl<T: Send> Sync for Inner<T> {}
7771

7872
impl<T> Lock<T> {
7973
/// Creates a new async lock.
@@ -87,8 +81,7 @@ impl<T> Lock<T> {
8781
/// ```
8882
pub fn new(data: T) -> Lock<T> {
8983
Lock(Arc::new(Inner {
90-
state: AtomicUsize::new(0),
91-
lock_ops: Event::new(),
84+
mutex: Mutex::new(()),
9285
data: UnsafeCell::new(data),
9386
}))
9487
}
@@ -110,99 +103,7 @@ impl<T> Lock<T> {
110103
/// ```
111104
#[inline]
112105
pub async fn lock(&self) -> LockGuard<T> {
113-
if let Some(guard) = self.try_lock() {
114-
return guard;
115-
}
116-
self.lock_slow().await
117-
}
118-
119-
/// Slow path for acquiring the lock.
120-
#[cold]
121-
pub async fn lock_slow(&self) -> LockGuard<T> {
122-
// Get the current time.
123-
let start = Instant::now();
124-
125-
loop {
126-
// Start listening for events.
127-
let listener = self.0.lock_ops.listen();
128-
129-
// Try locking if nobody is being starved.
130-
match self.0.state.compare_and_swap(0, 1, Ordering::Acquire) {
131-
// Lock acquired!
132-
0 => return LockGuard(self.clone()),
133-
134-
// Lock is held and nobody is starved.
135-
1 => {}
136-
137-
// Somebody is starved.
138-
_ => break,
139-
}
140-
141-
// Wait for a notification.
142-
listener.await;
143-
144-
// Try locking if nobody is being starved.
145-
match self.0.state.compare_and_swap(0, 1, Ordering::Acquire) {
146-
// Lock acquired!
147-
0 => return LockGuard(self.clone()),
148-
149-
// Lock is held and nobody is starved.
150-
1 => {}
151-
152-
// Somebody is starved.
153-
_ => {
154-
// Notify the first listener in line because we probably received a
155-
// notification that was meant for a starved task.
156-
self.0.lock_ops.notify(1);
157-
break;
158-
}
159-
}
160-
161-
// If waiting for too long, fall back to a fairer locking strategy that will prevent
162-
// newer lock operations from starving us forever.
163-
if start.elapsed() > Duration::from_micros(500) {
164-
break;
165-
}
166-
}
167-
168-
// Increment the number of starved lock operations.
169-
if self.0.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
170-
// In case of potential overflow, abort.
171-
process::abort();
172-
}
173-
174-
// Decrement the counter when exiting this function.
175-
let _call = CallOnDrop(|| {
176-
self.0.state.fetch_sub(2, Ordering::Release);
177-
});
178-
179-
loop {
180-
// Start listening for events.
181-
let listener = self.0.lock_ops.listen();
182-
183-
// Try locking if nobody else is being starved.
184-
match self.0.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) {
185-
// Lock acquired!
186-
2 => return LockGuard(self.clone()),
187-
188-
// Lock is held by someone.
189-
s if s % 2 == 1 => {}
190-
191-
// Lock is available.
192-
_ => {
193-
// Be fair: notify the first listener and then go wait in line.
194-
self.0.lock_ops.notify(1);
195-
}
196-
}
197-
198-
// Wait for a notification.
199-
listener.await;
200-
201-
// Try acquiring the lock without waiting for others.
202-
if self.0.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
203-
return LockGuard(self.clone());
204-
}
205-
}
106+
LockGuard::new(self.clone(), self.0.mutex.lock().await)
206107
}
207108

208109
/// Attempts to acquire the lock.
@@ -223,11 +124,10 @@ impl<T> Lock<T> {
223124
/// ```
224125
#[inline]
225126
pub fn try_lock(&self) -> Option<LockGuard<T>> {
226-
if self.0.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
227-
Some(LockGuard(self.clone()))
228-
} else {
229-
None
230-
}
127+
self.0
128+
.mutex
129+
.try_lock()
130+
.map(|guard| LockGuard::new(self.clone(), guard))
231131
}
232132
}
233133

@@ -260,12 +160,17 @@ impl<T: Default> Default for Lock<T> {
260160
}
261161

262162
/// A guard that releases the lock when dropped.
263-
pub struct LockGuard<T>(Lock<T>);
163+
pub struct LockGuard<T>(Lock<T>, MutexGuard<'static, ()>);
264164

265165
unsafe impl<T: Send> Send for LockGuard<T> {}
266166
unsafe impl<T: Sync> Sync for LockGuard<T> {}
267167

268168
impl<T> LockGuard<T> {
169+
fn new(lock: Lock<T>, inner: MutexGuard<'_, ()>) -> LockGuard<T> {
170+
let inner = unsafe { mem::transmute::<MutexGuard<'_, ()>, MutexGuard<'static, ()>>(inner) };
171+
LockGuard(lock, inner)
172+
}
173+
269174
/// Returns a reference to the lock a guard came from.
270175
///
271176
/// # Examples
@@ -284,14 +189,6 @@ impl<T> LockGuard<T> {
284189
}
285190
}
286191

287-
impl<T> Drop for LockGuard<T> {
288-
fn drop(&mut self) {
289-
// Remove the last bit and notify a waiting lock operation.
290-
(self.0).0.state.fetch_sub(1, Ordering::Release);
291-
(self.0).0.lock_ops.notify(1);
292-
}
293-
}
294-
295192
impl<T: fmt::Debug> fmt::Debug for LockGuard<T> {
296193
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297194
fmt::Debug::fmt(&**self, f)

0 commit comments

Comments
 (0)