Skip to content

Commit e057746

Browse files
committed
Handle EventCompletionActions after events complete
This adds handling of the new `EventCompletionAction`s after `Event`s are handled, letting `ChannelMonitorUpdate`s which were blocked fly after a relevant `Event`.
1 parent 5829778 commit e057746

File tree

1 file changed

+86
-14
lines changed

1 file changed

+86
-14
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1693,23 +1693,35 @@ macro_rules! process_events_body {
16931693
}
16941694

16951695
let _single_processor = $self.pending_events_processor.lock().unwrap();
1696-
let mut next_event = $self.pending_events.lock().unwrap().front().map(|ev| (*ev).clone());
16971696
loop {
1698-
if let Some((event, _action)) = next_event {
1699-
result = NotifyOption::DoPersist;
1700-
let ev_clone;
1701-
#[cfg(debug_assertions)] {
1702-
ev_clone = event.clone();
1697+
let mut next_event = $self.pending_events.lock().unwrap().front().map(|ev| (*ev).clone());
1698+
let mut post_event_actions = Vec::new();
1699+
loop {
1700+
if let Some((event, action_opt)) = next_event {
1701+
result = NotifyOption::DoPersist;
1702+
let _ev_clone: Event;
1703+
#[cfg(debug_assertions)] {
1704+
_ev_clone = event.clone();
1705+
}
1706+
$event_to_handle = event;
1707+
$handle_event;
1708+
let mut pending_events = $self.pending_events.lock().unwrap();
1709+
#[cfg(debug_assertions)] {
1710+
debug_assert_eq!(_ev_clone, pending_events.front().unwrap().0);
1711+
}
1712+
debug_assert_eq!(action_opt, pending_events.front().unwrap().1);
1713+
if let Some(action) = action_opt {
1714+
post_event_actions.push(action);
1715+
}
1716+
pending_events.pop_front();
1717+
next_event = pending_events.front().map(|ev| ev.clone());
1718+
} else {
1719+
break;
17031720
}
1704-
$event_to_handle = event;
1705-
$handle_event;
1706-
let mut pending_events = $self.pending_events.lock().unwrap();
1707-
debug_assert_eq!(ev_clone, pending_events.front().unwrap().0);
1708-
pending_events.pop_front();
1709-
next_event = pending_events.front().map(|ev| ev.clone());
1710-
} else {
1711-
break;
17121721
}
1722+
if post_event_actions.is_empty() { break; }
1723+
$self.handle_post_event_actions(post_event_actions);
1724+
// If we had some actions, go around again as we may have more events now
17131725
}
17141726

17151727
if result == NotifyOption::DoPersist {
@@ -5840,6 +5852,66 @@ where
58405852
self.pending_outbound_payments.clear_pending_payments()
58415853
}
58425854

5855+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
5856+
loop {
5857+
let per_peer_state = self.per_peer_state.read().unwrap();
5858+
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
5859+
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
5860+
let peer_state = &mut *peer_state_lck;
5861+
if self.pending_events.lock().unwrap().iter()
5862+
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5863+
channel_funding_outpoint, counterparty_node_id
5864+
}))
5865+
{
5866+
// Check that, while holding the peer lock, we don't have another event
5867+
// blocking any monitor updates for this channel. If we do, let those
5868+
// events be the ones that ultimately release the monitor update(s).
5869+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
5870+
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
5871+
return;
5872+
}
5873+
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
5874+
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
5875+
if let Some((monitor_update, further_update_exists)) = chan.get_mut().fly_next_unflown_monitor_update() {
5876+
log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
5877+
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
5878+
let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update);
5879+
let update_id = monitor_update.update_id;
5880+
let _ = handle_error!(self,
5881+
handle_new_monitor_update!(self, update_res, update_id,
5882+
peer_state_lck, peer_state, per_peer_state, chan),
5883+
counterparty_node_id);
5884+
if further_update_exists {
5885+
// If there are more `ChannelMonitorUpdate`s to process, restart at the
5886+
// top of the loop.
5887+
continue;
5888+
}
5889+
} else {
5890+
log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update",
5891+
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
5892+
}
5893+
}
5894+
} else {
5895+
log_debug!(self.logger,
5896+
"Got a release post-RAA monitor update for peer {} but the channel is gone",
5897+
log_pubkey!(counterparty_node_id));
5898+
}
5899+
break;
5900+
}
5901+
}
5902+
5903+
fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
5904+
for action in actions {
5905+
match action {
5906+
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5907+
channel_funding_outpoint, counterparty_node_id
5908+
} => {
5909+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
5910+
}
5911+
}
5912+
}
5913+
}
5914+
58435915
/// Processes any events asynchronously in the order they were generated since the last call
58445916
/// using the given event handler.
58455917
///

0 commit comments

Comments
 (0)