Skip to content

Setup Support for delaying ChannelMonitorUpdate flight until an Event completes #2111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1744,9 +1744,14 @@ macro_rules! process_events_body {
result = NotifyOption::DoPersist;
}

for (event, _action) in pending_events {
let mut post_event_actions = Vec::new();

for (event, action_opt) in pending_events {
$event_to_handle = event;
$handle_event;
if let Some(action) = action_opt {
post_event_actions.push(action);
}
}

{
Expand All @@ -1756,6 +1761,12 @@ macro_rules! process_events_body {
$self.pending_events_processor.store(false, Ordering::Release);
}

if !post_event_actions.is_empty() {
$self.handle_post_event_actions(post_event_actions);
// If we had some actions, go around again as we may have more events now
processed_all_events = false;
}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
}
Expand Down Expand Up @@ -5926,6 +5937,72 @@ where
self.pending_outbound_payments.clear_pending_payments()
}

fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
let mut errors = Vec::new();
loop {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
let peer_state = &mut *peer_state_lck;
if self.pending_events.lock().unwrap().iter()
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint, counterparty_node_id
}))
{
// Check that, while holding the peer lock, we don't have another event
// blocking any monitor updates for this channel. If we do, let those
// events be the ones that ultimately release the monitor update(s).
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage

}
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update);
let update_id = monitor_update.update_id;
if let Err(e) = handle_new_monitor_update!(self, update_res, update_id,
peer_state_lck, peer_state, per_peer_state, chan)
{
errors.push((e, counterparty_node_id));
}
if further_update_exists {
// If there are more `ChannelMonitorUpdate`s to process, restart at the
// top of the loop.
continue;
}
} else {
log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update",
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
}
}
} else {
log_debug!(self.logger,
"Got a release post-RAA monitor update for peer {} but the channel is gone",
log_pubkey!(counterparty_node_id));
}
break;
}
for (err, counterparty_node_id) in errors {
let res = Err::<(), _>(err);
let _ = handle_error!(self, res, counterparty_node_id);
}
}

fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
for action in actions {
match action {
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint, counterparty_node_id
} => {
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
}
}
}
}

/// Processes any events asynchronously in the order they were generated since the last call
/// using the given event handler.
///
Expand Down