1
- use futures_core:: future:: { FusedFuture , Future } ;
2
- use futures_core:: task:: { Context , Poll , Waker } ;
3
- use slab:: Slab ;
4
1
use std:: cell:: UnsafeCell ;
5
2
use std:: marker:: PhantomData ;
6
3
use std:: ops:: { Deref , DerefMut } ;
7
4
use std:: pin:: Pin ;
8
5
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
9
- use std:: sync:: Mutex as StdMutex ;
6
+ use std:: sync:: { Arc , Mutex as StdMutex } ;
10
7
use std:: { fmt, mem} ;
11
8
9
+ use slab:: Slab ;
10
+
11
+ use futures_core:: future:: { FusedFuture , Future } ;
12
+ use futures_core:: task:: { Context , Poll , Waker } ;
13
+
12
14
/// A futures-aware mutex.
13
15
///
14
16
/// # Fairness
@@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> {
107
109
}
108
110
}
109
111
112
+ /// Attempt to acquire the lock immediately.
113
+ ///
114
+ /// If the lock is currently held, this will return `None`.
115
+ pub fn try_lock_owned ( self : & Arc < Self > ) -> Option < OwnedMutexGuard < T > > {
116
+ let old_state = self . state . fetch_or ( IS_LOCKED , Ordering :: Acquire ) ;
117
+ if ( old_state & IS_LOCKED ) == 0 {
118
+ Some ( OwnedMutexGuard { mutex : self . clone ( ) } )
119
+ } else {
120
+ None
121
+ }
122
+ }
123
+
110
124
/// Acquire the lock asynchronously.
111
125
///
112
126
/// This method returns a future that will resolve once the lock has been
@@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> {
115
129
MutexLockFuture { mutex : Some ( self ) , wait_key : WAIT_KEY_NONE }
116
130
}
117
131
132
+ /// Acquire the lock asynchronously.
133
+ ///
134
+ /// This method returns a future that will resolve once the lock has been
135
+ /// successfully acquired.
136
+ pub fn lock_owned ( self : Arc < Self > ) -> OwnedMutexLockFuture < T > {
137
+ OwnedMutexLockFuture { mutex : Some ( self ) , wait_key : WAIT_KEY_NONE }
138
+ }
139
+
118
140
/// Returns a mutable reference to the underlying data.
119
141
///
120
142
/// Since this call borrows the `Mutex` mutably, no actual locking needs to
@@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> {
173
195
}
174
196
175
197
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
176
- const WAIT_KEY_NONE : usize = usize:: max_value ( ) ;
198
+ const WAIT_KEY_NONE : usize = usize:: MAX ;
199
+
200
+ /// A future which resolves when the target mutex has been successfully acquired, owned version.
201
+ pub struct OwnedMutexLockFuture < T : ?Sized > {
202
+ // `None` indicates that the mutex was successfully acquired.
203
+ mutex : Option < Arc < Mutex < T > > > ,
204
+ wait_key : usize ,
205
+ }
206
+
207
+ impl < T : ?Sized > fmt:: Debug for OwnedMutexLockFuture < T > {
208
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
209
+ f. debug_struct ( "OwnedMutexLockFuture" )
210
+ . field ( "was_acquired" , & self . mutex . is_none ( ) )
211
+ . field ( "mutex" , & self . mutex )
212
+ . field (
213
+ "wait_key" ,
214
+ & ( if self . wait_key == WAIT_KEY_NONE { None } else { Some ( self . wait_key ) } ) ,
215
+ )
216
+ . finish ( )
217
+ }
218
+ }
219
+
220
+ impl < T : ?Sized > FusedFuture for OwnedMutexLockFuture < T > {
221
+ fn is_terminated ( & self ) -> bool {
222
+ self . mutex . is_none ( )
223
+ }
224
+ }
225
+
226
+ impl < T : ?Sized > Future for OwnedMutexLockFuture < T > {
227
+ type Output = OwnedMutexGuard < T > ;
228
+
229
+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
230
+ let this = self . get_mut ( ) ;
231
+
232
+ let mutex = this. mutex . as_ref ( ) . expect ( "polled OwnedMutexLockFuture after completion" ) ;
233
+
234
+ if let Some ( lock) = mutex. try_lock_owned ( ) {
235
+ mutex. remove_waker ( this. wait_key , false ) ;
236
+ this. mutex = None ;
237
+ return Poll :: Ready ( lock) ;
238
+ }
239
+
240
+ {
241
+ let mut waiters = mutex. waiters . lock ( ) . unwrap ( ) ;
242
+ if this. wait_key == WAIT_KEY_NONE {
243
+ this. wait_key = waiters. insert ( Waiter :: Waiting ( cx. waker ( ) . clone ( ) ) ) ;
244
+ if waiters. len ( ) == 1 {
245
+ mutex. state . fetch_or ( HAS_WAITERS , Ordering :: Relaxed ) ; // released by mutex unlock
246
+ }
247
+ } else {
248
+ waiters[ this. wait_key ] . register ( cx. waker ( ) ) ;
249
+ }
250
+ }
251
+
252
+ // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
253
+ // attempting to acquire the lock again.
254
+ if let Some ( lock) = mutex. try_lock_owned ( ) {
255
+ mutex. remove_waker ( this. wait_key , false ) ;
256
+ this. mutex = None ;
257
+ return Poll :: Ready ( lock) ;
258
+ }
259
+
260
+ Poll :: Pending
261
+ }
262
+ }
263
+
264
+ impl < T : ?Sized > Drop for OwnedMutexLockFuture < T > {
265
+ fn drop ( & mut self ) {
266
+ if let Some ( mutex) = self . mutex . as_ref ( ) {
267
+ // This future was dropped before it acquired the mutex.
268
+ //
269
+ // Remove ourselves from the map, waking up another waiter if we
270
+ // had been awoken to acquire the lock.
271
+ mutex. remove_waker ( self . wait_key , true ) ;
272
+ }
273
+ }
274
+ }
275
+
276
+ /// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
277
+ /// When this structure is dropped (falls out of scope), the lock will be
278
+ /// unlocked.
279
+ pub struct OwnedMutexGuard < T : ?Sized > {
280
+ mutex : Arc < Mutex < T > > ,
281
+ }
282
+
283
+ impl < T : ?Sized + fmt:: Debug > fmt:: Debug for OwnedMutexGuard < T > {
284
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
285
+ f. debug_struct ( "OwnedMutexGuard" )
286
+ . field ( "value" , & & * * self )
287
+ . field ( "mutex" , & self . mutex )
288
+ . finish ( )
289
+ }
290
+ }
291
+
292
+ impl < T : ?Sized > Drop for OwnedMutexGuard < T > {
293
+ fn drop ( & mut self ) {
294
+ self . mutex . unlock ( )
295
+ }
296
+ }
297
+
298
+ impl < T : ?Sized > Deref for OwnedMutexGuard < T > {
299
+ type Target = T ;
300
+ fn deref ( & self ) -> & T {
301
+ unsafe { & * self . mutex . value . get ( ) }
302
+ }
303
+ }
304
+
305
+ impl < T : ?Sized > DerefMut for OwnedMutexGuard < T > {
306
+ fn deref_mut ( & mut self ) -> & mut T {
307
+ unsafe { & mut * self . mutex . value . get ( ) }
308
+ }
309
+ }
177
310
178
311
/// A future which resolves when the target mutex has been successfully acquired.
179
312
pub struct MutexLockFuture < ' a , T : ?Sized > {
@@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
386
519
// It's safe to switch which thread the acquire is being attempted on so long as
387
520
// `T` can be accessed on that thread.
388
521
unsafe impl < T : ?Sized + Send > Send for MutexLockFuture < ' _ , T > { }
522
+
389
523
// doesn't have any interesting `&self` methods (only Debug)
390
524
unsafe impl < T : ?Sized > Sync for MutexLockFuture < ' _ , T > { }
391
525
526
+ // It's safe to switch which thread the acquire is being attempted on so long as
527
+ // `T` can be accessed on that thread.
528
+ unsafe impl < T : ?Sized + Send > Send for OwnedMutexLockFuture < T > { }
529
+
530
+ // doesn't have any interesting `&self` methods (only Debug)
531
+ unsafe impl < T : ?Sized > Sync for OwnedMutexLockFuture < T > { }
532
+
392
533
// Safe to send since we don't track any thread-specific details-- the inner
393
534
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
394
535
unsafe impl < T : ?Sized + Send > Send for MutexGuard < ' _ , T > { }
395
536
unsafe impl < T : ?Sized + Sync > Sync for MutexGuard < ' _ , T > { }
537
+
538
+ unsafe impl < T : ?Sized + Send > Send for OwnedMutexGuard < T > { }
539
+ unsafe impl < T : ?Sized + Sync > Sync for OwnedMutexGuard < T > { }
540
+
396
541
unsafe impl < T : ?Sized + Send , U : ?Sized + Send > Send for MappedMutexGuard < ' _ , T , U > { }
397
542
unsafe impl < T : ?Sized + Sync , U : ?Sized + Sync > Sync for MappedMutexGuard < ' _ , T , U > { }
398
543
0 commit comments