@@ -36,7 +36,7 @@ use crate::prelude::*;
36
36
use crate :: io;
37
37
use alloc:: collections:: LinkedList ;
38
38
use crate :: sync:: { Arc , Mutex , MutexGuard , FairRwLock } ;
39
- use core:: sync:: atomic:: { AtomicBool , AtomicU32 , AtomicU8 , Ordering } ;
39
+ use core:: sync:: atomic:: { AtomicBool , AtomicU32 , AtomicI32 , Ordering } ;
40
40
use core:: { cmp, hash, fmt, mem} ;
41
41
use core:: ops:: Deref ;
42
42
use core:: convert:: Infallible ;
@@ -682,12 +682,12 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
682
682
///
683
683
/// Because the event handler calls into user code which may block, we don't want to block a
684
684
/// second thread waiting for another thread to handle events which is then blocked on user
685
- /// code, so we store an atomic enum here with three states -
685
+ /// code, so we store an atomic counter here:
686
686
/// * 0 indicates no event processor is running
687
687
/// * 1 indicates an event processor is running
688
- /// * 2 indicates an event processor is running but needs to start again from the top once it
689
- /// finishes as another thread tried to start processing events but returned early.
690
- event_processing_state : AtomicU8 ,
688
+ /// * > 1 indicates an event processor is running but needs to start again from the top once
689
+ /// it finishes as another thread tried to start processing events but returned early.
690
+ event_processing_state : AtomicI32 ,
691
691
692
692
/// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
693
693
/// value increases strictly since we don't assume access to a time source.
@@ -857,7 +857,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
857
857
message_handler,
858
858
peers : FairRwLock :: new ( HashMap :: new ( ) ) ,
859
859
node_id_to_descriptor : Mutex :: new ( HashMap :: new ( ) ) ,
860
- event_processing_state : AtomicU8 :: new ( 0 ) ,
860
+ event_processing_state : AtomicI32 :: new ( 0 ) ,
861
861
ephemeral_key_midstate,
862
862
peer_counter : AtomicCounter :: new ( ) ,
863
863
gossip_processing_backlogged : AtomicBool :: new ( false ) ,
@@ -1786,38 +1786,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1786
1786
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
1787
1787
/// [`send_data`]: SocketDescriptor::send_data
1788
1788
pub fn process_events ( & self ) {
1789
- if self . event_processing_state . compare_exchange ( 0 , 1 , Ordering :: AcqRel , Ordering :: Relaxed ) . is_err ( ) {
1790
- // If we failed to take the happy path, there may be another thread running. Try to
1791
- // either swap up to tell the existing thread to go again or claim the running lock.
1792
- loop {
1793
- match self . event_processing_state . load ( Ordering :: Acquire ) {
1794
- 0 =>
1795
- if self . event_processing_state . compare_exchange (
1796
- 0 , 1 , Ordering :: AcqRel , Ordering :: Relaxed
1797
- ) . is_ok ( ) {
1798
- // The other thread finished, we now have the lock and should do a normal
1799
- // event processing run.
1800
- break ;
1801
- } ,
1802
- 1 =>
1803
- if self . event_processing_state . compare_exchange (
1804
- 1 , 2 , Ordering :: AcqRel , Ordering :: Relaxed
1805
- ) . is_ok ( ) {
1806
- // The running thread will go again, we're done
1807
- return ;
1808
- } ,
1809
- 2 => {
1810
- // There's already a thread going that will start over once it finishes, we
1811
- // don't need to do anything
1812
- return ;
1813
- }
1814
- x => {
1815
- debug_assert ! ( false , "event_processing state was invalid: {}" , x) ;
1816
- self . event_processing_state . store ( 1 , Ordering :: Release ) ;
1817
- break ;
1818
- }
1819
- }
1820
- }
1789
+ if self . event_processing_state . fetch_add ( 1 , Ordering :: AcqRel ) > 0 {
1790
+ // If we're not the first event processor to get here, just return early, the increment
1791
+ // we just did will be treated as "go around again" at the end.
1792
+ return ;
1821
1793
}
1822
1794
1823
1795
loop {
@@ -2152,12 +2124,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2152
2124
}
2153
2125
}
2154
2126
2155
- if self . event_processing_state . compare_exchange ( 1 , 0 , Ordering :: AcqRel , Ordering :: Relaxed ) . is_err ( ) {
2156
- // If we're running we must either be in state 1 or 2, and if not state 1 we must
2157
- // now be in state 2 (go again)!
2158
- let go_again_swap =
2159
- self . event_processing_state . compare_exchange ( 2 , 1 , Ordering :: AcqRel , Ordering :: Relaxed ) ;
2160
- debug_assert ! ( go_again_swap. is_ok( ) ) ;
2127
+ if self . event_processing_state . fetch_add ( -1 , Ordering :: AcqRel ) != 1 {
2128
+ // If another thread incremented the state while we were running we should go
2129
+ // around again, but only once.
2130
+ self . event_processing_state . store ( 1 , Ordering :: Release ) ;
2161
2131
continue ;
2162
2132
}
2163
2133
break ;
0 commit comments