@@ -29,60 +29,78 @@ impl Parker {
29
29
// Change NOTIFIED to EMPTY and EMPTY to PARKED.
30
30
let state = self . state . fetch_sub ( 1 , Acquire ) ;
31
31
if state == NOTIFIED {
32
+ // The state has gone from NOTIFIED (1) to EMPTY (0)
32
33
return ;
33
34
}
35
+ // The state has gone from EMPTY (0) to PARKED (-1)
36
+ assert ! ( state == EMPTY ) ;
34
37
35
- // The state was set to PARKED. Wait until the `unpark` wakes us up.
38
+ // The state is now PARKED (-1) . Wait until the `unpark` wakes us up.
36
39
blocking_scalar (
37
40
ticktimer_server ( ) ,
38
41
TicktimerScalar :: WaitForCondition ( self . index ( ) , 0 ) . into ( ) ,
39
42
)
40
43
. expect ( "failed to send WaitForCondition command" ) ;
41
44
42
- self . state . swap ( EMPTY , Acquire ) ;
45
+ let state = self . state . swap ( EMPTY , Acquire ) ;
46
+ assert ! ( state == NOTIFIED || state == PARKED ) ;
43
47
}
44
48
45
49
pub unsafe fn park_timeout ( self : Pin < & Self > , timeout : Duration ) {
46
50
// Change NOTIFIED to EMPTY and EMPTY to PARKED.
47
51
let state = self . state . fetch_sub ( 1 , Acquire ) ;
48
52
if state == NOTIFIED {
53
+ // The state has gone from NOTIFIED (1) to EMPTY (0)
49
54
return ;
50
55
}
56
+ // The state has gone from EMPTY (0) to PARKED (-1)
57
+ assert ! ( state == EMPTY ) ;
51
58
52
59
// A value of zero indicates an indefinite wait. Clamp the number of
53
60
// milliseconds to the allowed range.
54
61
let millis = usize:: max ( timeout. as_millis ( ) . try_into ( ) . unwrap_or ( usize:: MAX ) , 1 ) ;
55
62
56
- let was_timeout = blocking_scalar (
63
+ // The state is now PARKED (-1). Wait until the `unpark` wakes us up,
64
+ // or things time out.
65
+ let _was_timeout = blocking_scalar (
57
66
ticktimer_server ( ) ,
58
67
TicktimerScalar :: WaitForCondition ( self . index ( ) , millis) . into ( ) ,
59
68
)
60
69
. expect ( "failed to send WaitForCondition command" ) [ 0 ]
61
70
!= 0 ;
62
71
63
72
let state = self . state . swap ( EMPTY , Acquire ) ;
64
- if was_timeout && state == NOTIFIED {
65
- // The state was set to NOTIFIED after we returned from the wait
66
- // but before we reset the state. Therefore, a wakeup is on its
67
- // way, which we need to consume here.
68
- // NOTICE: this is a priority hole.
69
- blocking_scalar (
70
- ticktimer_server ( ) ,
71
- TicktimerScalar :: WaitForCondition ( self . index ( ) , 0 ) . into ( ) ,
72
- )
73
- . expect ( "failed to send WaitForCondition command" ) ;
74
- }
73
+ assert ! ( state == PARKED || state == NOTIFIED ) ;
75
74
}
76
75
77
76
pub fn unpark ( self : Pin < & Self > ) {
78
- let state = self . state . swap ( NOTIFIED , Release ) ;
79
- if state == PARKED {
80
- // The thread is parked, wake it up.
81
- blocking_scalar (
82
- ticktimer_server ( ) ,
83
- TicktimerScalar :: NotifyCondition ( self . index ( ) , 1 ) . into ( ) ,
84
- )
85
- . expect ( "failed to send NotifyCondition command" ) ;
77
+ // If the state is already `NOTIFIED`, then another thread has
78
+ // indicated it wants to wake up the target thread.
79
+ //
80
+ // If the state is `EMPTY` then there is nothing to wake up, and
81
+ // the target thread will immediately exit from `park()` the
82
+ // next time that function is called.
83
+ if self . state . swap ( NOTIFIED , Release ) != PARKED {
84
+ return ;
85
+ }
86
+
87
+ // The thread is parked, wake it up. Keep trying until we wake something up.
88
+ // This will happen when the `NotifyCondition` call returns the fact that
89
+ // 1 condition was notified.
90
+ // Alternately, keep going until the state is seen as `EMPTY`, indicating
91
+ // the thread woke up and kept going. This can happen when the Park
92
+ // times out before we can send the NotifyCondition message.
93
+ while blocking_scalar (
94
+ ticktimer_server ( ) ,
95
+ TicktimerScalar :: NotifyCondition ( self . index ( ) , 1 ) . into ( ) ,
96
+ )
97
+ . expect ( "failed to send NotifyCondition command" ) [ 0 ]
98
+ == 0
99
+ && self . state . load ( Acquire ) != EMPTY
100
+ {
101
+ // The target thread hasn't yet hit the `WaitForCondition` call.
102
+ // Yield to let the target thread run some more.
103
+ crate :: thread:: yield_now ( ) ;
86
104
}
87
105
}
88
106
}
0 commit comments