1
1
use super :: mutex:: Mutex ;
2
2
use crate :: os:: xous:: ffi:: { blocking_scalar, scalar} ;
3
- use crate :: os:: xous:: services:: ticktimer_server;
4
- use crate :: sync:: Mutex as StdMutex ;
3
+ use crate :: os:: xous:: services:: { ticktimer_server, TicktimerScalar } ;
5
4
use crate :: time:: Duration ;
5
+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
6
6
7
7
// The implementation is inspired by Andrew D. Birrell's paper
8
8
// "Implementing Condition Variables with Semaphores"
9
9
10
+ const NOTIFY_TRIES : usize = 3 ;
11
+
10
12
pub struct Condvar {
11
- counter : StdMutex < usize > ,
13
+ counter : AtomicUsize ,
14
+ timed_out : AtomicUsize ,
12
15
}
13
16
14
17
unsafe impl Send for Condvar { }
@@ -18,94 +21,128 @@ impl Condvar {
18
21
#[ inline]
19
22
#[ rustc_const_stable( feature = "const_locks" , since = "1.63.0" ) ]
20
23
pub const fn new ( ) -> Condvar {
21
- Condvar { counter : StdMutex :: new ( 0 ) }
24
+ Condvar { counter : AtomicUsize :: new ( 0 ) , timed_out : AtomicUsize :: new ( 0 ) }
22
25
}
23
26
24
- pub fn notify_one ( & self ) {
25
- let mut counter = self . counter . lock ( ) . unwrap ( ) ;
26
- if * counter <= 0 {
27
+ fn notify_some ( & self , to_notify : usize ) {
28
+ // Assumption: The Mutex protecting this condvar is locked throughout the
29
+ // entirety of this call, preventing calls to `wait` and `wait_timeout`.
30
+
31
+ // Logic check: Ensure that there aren't any missing waiters. Remove any that
32
+ // timed-out, ensuring the counter doesn't underflow.
33
+ assert ! ( self . timed_out. load( Ordering :: Relaxed ) <= self . counter. load( Ordering :: Relaxed ) ) ;
34
+ self . counter . fetch_sub ( self . timed_out . swap ( 0 , Ordering :: Relaxed ) , Ordering :: Relaxed ) ;
35
+
36
+ // Figure out how many threads to notify. Note that it is impossible for `counter`
37
+ // to increase during this operation because Mutex is locked. However, it is
38
+ // possible for `counter` to decrease due to a condvar timing out, in which
39
+ // case the corresponding `timed_out` will increase accordingly.
40
+ let Ok ( waiter_count) =
41
+ self . counter . fetch_update ( Ordering :: Relaxed , Ordering :: Relaxed , |counter| {
42
+ if counter == 0 {
43
+ return None ;
44
+ } else {
45
+ Some ( counter - counter. min ( to_notify) )
46
+ }
47
+ } )
48
+ else {
49
+ // No threads are waiting on this condvar
27
50
return ;
28
- } else {
29
- * counter -= 1 ;
30
- }
31
- let result = blocking_scalar (
32
- ticktimer_server ( ) ,
33
- crate :: os:: xous:: services:: TicktimerScalar :: NotifyCondition ( self . index ( ) , 1 ) . into ( ) ,
34
- ) ;
35
- drop ( counter) ;
36
- result. expect ( "failure to send NotifyCondition command" ) ;
37
- }
51
+ } ;
38
52
39
- pub fn notify_all ( & self ) {
40
- let mut counter = self . counter . lock ( ) . unwrap ( ) ;
41
- if * counter <= 0 {
53
+ let mut remaining_to_wake = waiter_count. min ( to_notify) ;
54
+ if remaining_to_wake == 0 {
42
55
return ;
43
56
}
44
- let result = blocking_scalar (
45
- ticktimer_server ( ) ,
46
- crate :: os:: xous:: services:: TicktimerScalar :: NotifyCondition ( self . index ( ) , * counter)
47
- . into ( ) ,
48
- ) ;
49
- * counter = 0 ;
50
- drop ( counter) ;
57
+ for _wake_tries in 0 ..NOTIFY_TRIES {
58
+ let result = blocking_scalar (
59
+ ticktimer_server ( ) ,
60
+ TicktimerScalar :: NotifyCondition ( self . index ( ) , remaining_to_wake) . into ( ) ,
61
+ )
62
+ . expect ( "failure to send NotifyCondition command" ) ;
63
+
64
+ // Remove the list of waiters that were notified
65
+ remaining_to_wake -= result[ 0 ] ;
66
+
67
+ // Also remove the number of waiters that timed out. Clamp it to 0 in order to
68
+ // ensure we don't wait forever in case the waiter woke up between the time
69
+ // we counted the remaining waiters and now.
70
+ remaining_to_wake =
71
+ remaining_to_wake. saturating_sub ( self . timed_out . swap ( 0 , Ordering :: Relaxed ) ) ;
72
+ if remaining_to_wake == 0 {
73
+ return ;
74
+ }
75
+ crate :: thread:: yield_now ( ) ;
76
+ }
77
+ }
51
78
52
- result. expect ( "failure to send NotifyCondition command" ) ;
79
+ pub fn notify_one ( & self ) {
80
+ self . notify_some ( 1 )
81
+ }
82
+
83
+ pub fn notify_all ( & self ) {
84
+ self . notify_some ( self . counter . load ( Ordering :: Relaxed ) )
53
85
}
54
86
55
87
fn index ( & self ) -> usize {
56
- self as * const Condvar as usize
88
+ core :: ptr :: from_ref ( self ) . addr ( )
57
89
}
58
90
59
- pub unsafe fn wait ( & self , mutex : & Mutex ) {
60
- let mut counter = self . counter . lock ( ) . unwrap ( ) ;
61
- * counter += 1 ;
91
+ /// Unlock the given Mutex and wait for the notification. Wait at most
92
+ /// `ms` milliseconds, or pass `0` to wait forever.
93
+ ///
94
+ /// Returns `true` if the condition was received, `false` if it timed out
95
+ fn wait_ms ( & self , mutex : & Mutex , ms : usize ) -> bool {
96
+ self . counter . fetch_add ( 1 , Ordering :: Relaxed ) ;
62
97
unsafe { mutex. unlock ( ) } ;
63
- drop ( counter) ;
64
98
99
+ // Threading concern: There is a chance that the `notify` thread wakes up here before
100
+ // we have a chance to wait for the condition. This is fine because we've recorded
101
+ // the fact that we're waiting by incrementing the counter.
65
102
let result = blocking_scalar (
66
103
ticktimer_server ( ) ,
67
- crate :: os :: xous :: services :: TicktimerScalar :: WaitForCondition ( self . index ( ) , 0 ) . into ( ) ,
104
+ TicktimerScalar :: WaitForCondition ( self . index ( ) , ms ) . into ( ) ,
68
105
) ;
106
+ let awoken = result. expect ( "Ticktimer: failure to send WaitForCondition command" ) [ 0 ] == 0 ;
107
+
108
+ // If we awoke due to a timeout, increment the `timed_out` counter so that the
109
+ // main loop of `notify` knows there's a timeout.
110
+ //
111
+ // This is done with the Mutex still unlocked, because the Mutex might still
112
+ // be locked by the `notify` process above.
113
+ if !awoken {
114
+ self . timed_out . fetch_add ( 1 , Ordering :: Relaxed ) ;
115
+ }
116
+
69
117
unsafe { mutex. lock ( ) } ;
118
+ awoken
119
+ }
70
120
71
- result. expect ( "Ticktimer: failure to send WaitForCondition command" ) ;
121
+ pub unsafe fn wait ( & self , mutex : & Mutex ) {
122
+ // Wait for 0 ms, which is a special case to "wait forever"
123
+ self . wait_ms ( mutex, 0 ) ;
72
124
}
73
125
74
126
pub unsafe fn wait_timeout ( & self , mutex : & Mutex , dur : Duration ) -> bool {
75
- let mut counter = self . counter . lock ( ) . unwrap ( ) ;
76
- * counter += 1 ;
77
- unsafe { mutex. unlock ( ) } ;
78
- drop ( counter) ;
79
-
80
127
let mut millis = dur. as_millis ( ) as usize ;
128
+ // Ensure we don't wait for 0 ms, which would cause us to wait forever
81
129
if millis == 0 {
82
130
millis = 1 ;
83
131
}
84
-
85
- let result = blocking_scalar (
86
- ticktimer_server ( ) ,
87
- crate :: os:: xous:: services:: TicktimerScalar :: WaitForCondition ( self . index ( ) , millis)
88
- . into ( ) ,
89
- ) ;
90
- unsafe { mutex. lock ( ) } ;
91
-
92
- let result = result. expect ( "Ticktimer: failure to send WaitForCondition command" ) [ 0 ] == 0 ;
93
-
94
- // If we awoke due to a timeout, decrement the wake count, as that would not have
95
- // been done in the `notify()` call.
96
- if !result {
97
- * self . counter . lock ( ) . unwrap ( ) -= 1 ;
98
- }
99
- result
132
+ self . wait_ms ( mutex, millis)
100
133
}
101
134
}
102
135
103
136
impl Drop for Condvar {
104
137
fn drop ( & mut self ) {
105
- scalar (
106
- ticktimer_server ( ) ,
107
- crate :: os:: xous:: services:: TicktimerScalar :: FreeCondition ( self . index ( ) ) . into ( ) ,
108
- )
109
- . ok ( ) ;
138
+ let remaining_count = self . counter . load ( Ordering :: Relaxed ) ;
139
+ let timed_out = self . timed_out . load ( Ordering :: Relaxed ) ;
140
+ assert ! (
141
+ remaining_count - timed_out == 0 ,
142
+ "counter was {} and timed_out was {} not 0" ,
143
+ remaining_count,
144
+ timed_out
145
+ ) ;
146
+ scalar ( ticktimer_server ( ) , TicktimerScalar :: FreeCondition ( self . index ( ) ) . into ( ) ) . ok ( ) ;
110
147
}
111
148
}
0 commit comments