Skip to content

Commit 7f35f63

Browse files
committed
f much simpler design from @andrei-21
1 parent f868d86 commit 7f35f63

File tree

1 file changed

+13
-44
lines changed

1 file changed

+13
-44
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::prelude::*;
3636
use crate::io;
3737
use alloc::collections::LinkedList;
3838
use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
39-
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
39+
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
4040
use core::{cmp, hash, fmt, mem};
4141
use core::ops::Deref;
4242
use core::convert::Infallible;
@@ -682,12 +682,12 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
682682
///
683683
/// Because the event handler calls into user code which may block, we don't want to block a
684684
/// second thread waiting for another thread to handle events which is then blocked on user
685-
/// code, so we store an atomic enum here with three states -
685+
/// code, so we store an atomic counter here:
686686
/// * 0 indicates no event processor is running
687687
/// * 1 indicates an event processor is running
688-
/// * 2 indicates an event processor is running but needs to start again from the top once it
689-
/// finishes as another thread tried to start processing events but returned early.
690-
event_processing_state: AtomicU8,
688+
/// * > 1 indicates an event processor is running but needs to start again from the top once
689+
/// it finishes as another thread tried to start processing events but returned early.
690+
event_processing_state: AtomicI32,
691691

692692
/// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
693693
/// value increases strictly since we don't assume access to a time source.
@@ -857,7 +857,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
857857
message_handler,
858858
peers: FairRwLock::new(HashMap::new()),
859859
node_id_to_descriptor: Mutex::new(HashMap::new()),
860-
event_processing_state: AtomicU8::new(0),
860+
event_processing_state: AtomicI32::new(0),
861861
ephemeral_key_midstate,
862862
peer_counter: AtomicCounter::new(),
863863
gossip_processing_backlogged: AtomicBool::new(false),
@@ -1786,38 +1786,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17861786
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
17871787
/// [`send_data`]: SocketDescriptor::send_data
17881788
pub fn process_events(&self) {
1789-
if self.event_processing_state.compare_exchange(0, 1, Ordering::AcqRel, Ordering::Relaxed).is_err() {
1790-
// If we failed to take the happy path, there may be another thread running. Try to
1791-
// either swap up to tell the existing thread to go again or claim the running lock.
1792-
loop {
1793-
match self.event_processing_state.load(Ordering::Acquire) {
1794-
0 =>
1795-
if self.event_processing_state.compare_exchange(
1796-
0, 1, Ordering::AcqRel, Ordering::Relaxed
1797-
).is_ok() {
1798-
// The other thread finished, we now have the lock and should do a normal
1799-
// event processing run.
1800-
break;
1801-
},
1802-
1 =>
1803-
if self.event_processing_state.compare_exchange(
1804-
1, 2, Ordering::AcqRel, Ordering::Relaxed
1805-
).is_ok() {
1806-
// The running thread will go again, we're done
1807-
return;
1808-
},
1809-
2 => {
1810-
// There's already a thread going that will start over once it finishes, we
1811-
// don't need to do anything
1812-
return;
1813-
}
1814-
x => {
1815-
debug_assert!(false, "event_processing state was invalid: {}", x);
1816-
self.event_processing_state.store(1, Ordering::Release);
1817-
break;
1818-
}
1819-
}
1820-
}
1789+
if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 {
1790+
// If we're not the first event processor to get here, just return early, the increment
1791+
// we just did will be treated as "go around again" at the end.
18211792
}
18221793

18231794
loop {
@@ -2152,12 +2123,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
21522123
}
21532124
}
21542125

2155-
if self.event_processing_state.compare_exchange(1, 0, Ordering::AcqRel, Ordering::Relaxed).is_err() {
2156-
// If we're running we must either be in state 1 or 2, and if not state 1 we must
2157-
// now be in state 2 (go again)!
2158-
let go_again_swap =
2159-
self.event_processing_state.compare_exchange(2, 1, Ordering::AcqRel, Ordering::Relaxed);
2160-
debug_assert!(go_again_swap.is_ok());
2126+
if self.event_processing_state.fetch_add(-1, Ordering::AcqRel) != 1 {
2127+
// If another thread incremented the state while we were running we should go
2128+
// around again, but only once.
2129+
self.event_processing_state.store(1, Ordering::Release);
21612130
continue;
21622131
}
21632132
break;

0 commit comments

Comments
 (0)