@@ -3,8 +3,8 @@ use rand::Rng;
3
3
use crate :: sync:: atomic:: { AtomicUsize , Ordering } ;
4
4
use crate :: sync:: mpsc:: channel;
5
5
use crate :: sync:: {
6
- Arc , MappedRwLockReadGuard , MappedRwLockWriteGuard , RwLock , RwLockReadGuard , RwLockWriteGuard ,
7
- TryLockError ,
6
+ Arc , Barrier , MappedRwLockReadGuard , MappedRwLockWriteGuard , RwLock , RwLockReadGuard ,
7
+ RwLockWriteGuard , TryLockError ,
8
8
} ;
9
9
use crate :: thread;
10
10
@@ -509,3 +509,81 @@ fn test_downgrade_basic() {
509
509
let write_guard = r. write ( ) . unwrap ( ) ;
510
510
let _read_guard = RwLockWriteGuard :: downgrade ( write_guard) ;
511
511
}
512
+
513
+ #[ test]
514
+ fn test_downgrade_frob ( ) {
515
+ const N : u32 = 10 ;
516
+ const M : usize = if cfg ! ( miri) { 100 } else { 1000 } ;
517
+
518
+ let r = Arc :: new ( RwLock :: new ( ( ) ) ) ;
519
+
520
+ let ( tx, rx) = channel :: < ( ) > ( ) ;
521
+ for _ in 0 ..N {
522
+ let tx = tx. clone ( ) ;
523
+ let r = r. clone ( ) ;
524
+ thread:: spawn ( move || {
525
+ let mut rng = crate :: test_helpers:: test_rng ( ) ;
526
+ for _ in 0 ..M {
527
+ if rng. gen_bool ( 1.0 / ( N as f64 ) ) {
528
+ drop ( RwLockWriteGuard :: downgrade ( r. write ( ) . unwrap ( ) ) ) ;
529
+ } else {
530
+ drop ( r. read ( ) . unwrap ( ) ) ;
531
+ }
532
+ }
533
+ drop ( tx) ;
534
+ } ) ;
535
+ }
536
+ drop ( tx) ;
537
+ let _ = rx. recv ( ) ;
538
+ }
539
+
540
+ #[ test]
541
+ fn test_downgrade_readers ( ) {
542
+ const R : usize = 16 ;
543
+ const N : usize = 1000 ;
544
+
545
+ let r = Arc :: new ( RwLock :: new ( 0 ) ) ;
546
+ let b = Arc :: new ( Barrier :: new ( R + 1 ) ) ;
547
+
548
+ // Create the writing thread.
549
+ let r_writer = r. clone ( ) ;
550
+ let b_writer = b. clone ( ) ;
551
+ thread:: spawn ( move || {
552
+ for i in 0 ..N {
553
+ let mut write_guard = r_writer. write ( ) . unwrap ( ) ;
554
+ * write_guard = i;
555
+
556
+ let read_guard = RwLockWriteGuard :: downgrade ( write_guard) ;
557
+ assert_eq ! ( * read_guard, i) ;
558
+
559
+ // Wait for all readers to observe the new value.
560
+ b_writer. wait ( ) ;
561
+ }
562
+ } ) ;
563
+
564
+ for _ in 0 ..R {
565
+ let r = r. clone ( ) ;
566
+ let b = b. clone ( ) ;
567
+ thread:: spawn ( move || {
568
+ // Every reader thread needs to observe every value up to `N`.
569
+ for i in 0 ..N {
570
+ let read_guard = r. read ( ) . unwrap ( ) ;
571
+ assert_eq ! ( * read_guard, i) ;
572
+ drop ( read_guard) ;
573
+
574
+ // Wait for everyone to read and for the writer to change the value again.
575
+ b. wait ( ) ;
576
+ // Spin until the writer has changed the value.
577
+
578
+ loop {
579
+ let read_guard = r. read ( ) . unwrap ( ) ;
580
+ assert ! ( * read_guard >= i) ;
581
+
582
+ if * read_guard > i {
583
+ break ;
584
+ }
585
+ }
586
+ }
587
+ } ) ;
588
+ }
589
+ }
0 commit comments