40
40
use std:: cell:: UnsafeCell ;
41
41
use std:: fmt;
42
42
use std:: ops:: { Deref , DerefMut } ;
43
- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
43
+ use std:: process;
44
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
44
45
use std:: sync:: Arc ;
46
+ use std:: time:: { Duration , Instant } ;
45
47
46
48
use event_listener:: Event ;
47
49
@@ -56,8 +58,11 @@ impl<T> Clone for Lock<T> {
56
58
57
59
/// Data inside [`Lock`].
58
60
struct Inner < T > {
59
- /// Set to `true` when the lock is acquired by a [`LockGuard`].
60
- locked : AtomicBool ,
61
+ /// Current state of the lock.
62
+ ///
63
+ /// The least significant bit is set to 1 if the lock is acquired.
64
+ /// The other bits hold the number of starved lock operations.
65
+ state : AtomicUsize ,
61
66
62
67
/// Lock operations waiting for the lock to be released.
63
68
lock_ops : Event ,
@@ -81,7 +86,7 @@ impl<T> Lock<T> {
81
86
/// ```
82
87
pub fn new ( data : T ) -> Lock < T > {
83
88
Lock ( Arc :: new ( Inner {
84
- locked : AtomicBool :: new ( false ) ,
89
+ state : AtomicUsize :: new ( 0 ) ,
85
90
lock_ops : Event :: new ( ) ,
86
91
data : UnsafeCell :: new ( data) ,
87
92
} ) )
@@ -102,19 +107,92 @@ impl<T> Lock<T> {
102
107
/// assert_eq!(*guard, 10);
103
108
/// # })
104
109
/// ```
110
+ #[ inline]
105
111
pub async fn lock ( & self ) -> LockGuard < T > {
112
+ if let Some ( guard) = self . try_lock ( ) {
113
+ return guard;
114
+ }
115
+ self . lock_slow ( ) . await
116
+ }
117
+
118
+ /// Slow path for acquiring the mutex.
119
+ #[ cold]
120
+ pub async fn lock_slow ( & self ) -> LockGuard < T > {
121
+ // Get the current time.
122
+ let start = Instant :: now ( ) ;
123
+
106
124
loop {
107
- // Try acquiring the lock.
108
- if let Some ( guard) = self . try_lock ( ) {
109
- return guard;
125
+ // Start listening for events.
126
+ let listener = self . 0 . lock_ops . listen ( ) ;
127
+
128
+ // Try locking if nobody is being starved.
129
+ match self . 0 . state . compare_and_swap ( 0 , 1 , Ordering :: Acquire ) {
130
+ // Lock acquired!
131
+ 0 => return LockGuard ( self . clone ( ) ) ,
132
+
133
+ // Unlocked and somebody is starved - notify the first waiter in line.
134
+ s if s % 2 == 0 => self . 0 . lock_ops . notify_one ( ) ,
135
+
136
+ // The mutex is currently locked.
137
+ _ => { }
138
+ }
139
+
140
+ // Wait for a notification.
141
+ listener. await ;
142
+
143
+ // Try locking if nobody is being starved.
144
+ match self . 0 . state . compare_and_swap ( 0 , 1 , Ordering :: Acquire ) {
145
+ // Lock acquired!
146
+ 0 => return LockGuard ( self . clone ( ) ) ,
147
+
148
+ // Unlocked and somebody is starved - notify the first waiter in line.
149
+ s if s % 2 == 0 => self . 0 . lock_ops . notify_one ( ) ,
150
+
151
+ // The mutex is currently locked.
152
+ _ => { }
153
+ }
154
+
155
+ // If waiting for too long, fall back to a fairer locking strategy that will prevent
156
+ // newer lock operations from starving us forever.
157
+ if start. elapsed ( ) > Duration :: from_micros ( 500 ) {
158
+ break ;
110
159
}
160
+ }
161
+
162
+ // Increment the number of starved lock operations.
163
+ if self . 0 . state . fetch_add ( 2 , Ordering :: Release ) > usize:: MAX / 2 {
164
+ // In case of potential overflow, abort.
165
+ process:: abort ( ) ;
166
+ }
167
+
168
+ // Decrement the counter when exiting this function.
169
+ let _call = CallOnDrop ( || {
170
+ self . 0 . state . fetch_sub ( 2 , Ordering :: Release ) ;
171
+ } ) ;
111
172
112
- // Start watching for notifications and try locking again.
173
+ loop {
174
+ // Start listening for events.
113
175
let listener = self . 0 . lock_ops . listen ( ) ;
114
- if let Some ( guard) = self . try_lock ( ) {
115
- return guard;
176
+
177
+ // Try locking if nobody else is being starved.
178
+ match self . 0 . state . compare_and_swap ( 2 , 2 | 1 , Ordering :: Acquire ) {
179
+ // Lock acquired!
180
+ 0 => return LockGuard ( self . clone ( ) ) ,
181
+
182
+ // Unlocked and somebody is starved - notify the first waiter in line.
183
+ s if s % 2 == 0 => self . 0 . lock_ops . notify_one ( ) ,
184
+
185
+ // The mutex is currently locked.
186
+ _ => { }
116
187
}
188
+
189
+ // Wait for a notification.
117
190
listener. await ;
191
+
192
+ // Try acquiring the lock without waiting for others.
193
+ if self . 0 . state . fetch_or ( 1 , Ordering :: Acquire ) % 2 == 0 {
194
+ return LockGuard ( self . clone ( ) ) ;
195
+ }
118
196
}
119
197
}
120
198
@@ -136,11 +214,7 @@ impl<T> Lock<T> {
136
214
/// ```
137
215
#[ inline]
138
216
pub fn try_lock ( & self ) -> Option < LockGuard < T > > {
139
- if !self
140
- . 0
141
- . locked
142
- . compare_and_swap ( false , true , Ordering :: Acquire )
143
- {
217
+ if self . 0 . state . compare_and_swap ( 0 , 1 , Ordering :: Acquire ) == 0 {
144
218
Some ( LockGuard ( self . clone ( ) ) )
145
219
} else {
146
220
None
@@ -203,7 +277,8 @@ impl<T> LockGuard<T> {
203
277
204
278
impl < T > Drop for LockGuard < T > {
205
279
fn drop ( & mut self ) {
206
- ( self . 0 ) . 0 . locked . store ( false , Ordering :: Release ) ;
280
+ // Remove the last bit and notify a waiting lock operation.
281
+ ( self . 0 ) . 0 . state . fetch_sub ( 1 , Ordering :: Release ) ;
207
282
( self . 0 ) . 0 . lock_ops . notify_one ( ) ;
208
283
}
209
284
}
@@ -233,3 +308,12 @@ impl<T> DerefMut for LockGuard<T> {
233
308
unsafe { & mut * ( self . 0 ) . 0 . data . get ( ) }
234
309
}
235
310
}
311
+
312
+ /// Calls a function when dropped.
313
+ struct CallOnDrop < F : Fn ( ) > ( F ) ;
314
+
315
+ impl < F : Fn ( ) > Drop for CallOnDrop < F > {
316
+ fn drop ( & mut self ) {
317
+ ( self . 0 ) ( ) ;
318
+ }
319
+ }
0 commit comments