Skip to content

Commit ac5efa2

Browse files
committed
Allow holding ChannelMonitorUpdates until later, completing one
In the coming commits, we need to delay `ChannelMonitorUpdate`s until future actions (specifically `Event` handling). However, because we should only notify users once of a given `ChannelMonitorUpdate` and they must be provided in-order, we need to track which ones have or have not been given to users and, once updating resumes, fly the ones that haven't already made it to users. To do this we simply add a `bool` in the `ChannelMonitorUpdate` set stored in the `Channel` which indicates if an update flew and decline to provide new updates back to the `ChannelManager` if any updates have their flown bit unset. Further, because we'll now by releasing `ChannelMonitorUpdate`s which were already stored in the pending list, we now need to support getting a `Completed` result for a monitor which isn't the only pending monitor (or even out of order), thus we also rewrite the way monitor updates are marked completed.
1 parent 88c63e9 commit ac5efa2

File tree

4 files changed

+132
-62
lines changed

4 files changed

+132
-62
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ fn test_monitor_and_persister_update_fail() {
146146
let mut node_0_per_peer_lock;
147147
let mut node_0_peer_state_lock;
148148
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
149-
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
149+
if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
150150
// Check that even though the persister is returning a InProgress,
151151
// because the update is bogus, ultimately the error that's returned
152152
// should be a PermanentFailure.

lightning/src/ln/channel.rs

Lines changed: 113 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,16 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
479479
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
480480
pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;
481481

482+
struct PendingChannelMonitorUpdate {
483+
update: ChannelMonitorUpdate,
484+
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
485+
/// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
486+
/// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
487+
///
488+
/// [`ChannelManager`]: super::channelmanager::ChannelManager
489+
blocked: bool,
490+
}
491+
482492
// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
483493
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
484494
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
@@ -744,7 +754,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
744754
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
745755
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
746756
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
747-
pending_monitor_updates: Vec<ChannelMonitorUpdate>,
757+
pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
748758
}
749759

750760
#[cfg(any(test, fuzzing))]
@@ -1979,28 +1989,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
19791989
}
19801990

19811991
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
1992+
let release_cs_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
19821993
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
1983-
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
1984-
let mut additional_update = self.build_commitment_no_status_check(logger);
1985-
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
1986-
// strictly increasing by one, so decrement it here.
1987-
self.latest_monitor_update_id = monitor_update.update_id;
1988-
monitor_update.updates.append(&mut additional_update.updates);
1989-
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
1990-
self.pending_monitor_updates.push(monitor_update);
1994+
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
1995+
// Even if we aren't supposed to let new monitor updates with commitment state
1996+
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
1997+
// matter what. Sadly, to push a new monitor update which flies before others
1998+
// already queued, we have to insert it into the pending queue and update the
1999+
// update_ids of all the following monitors.
2000+
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
2001+
let mut additional_update = self.build_commitment_no_status_check(logger);
2002+
// build_commitment_no_status_check may bump latest_monitor_id but we want them
2003+
// to be strictly increasing by one, so decrement it here.
2004+
self.latest_monitor_update_id = monitor_update.update_id;
2005+
monitor_update.updates.append(&mut additional_update.updates);
2006+
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2007+
update: monitor_update, blocked: false,
2008+
});
2009+
self.pending_monitor_updates.len() - 1
2010+
} else {
2011+
let insert_pos = self.pending_monitor_updates.iter().position(|upd| upd.blocked)
2012+
.unwrap_or(self.pending_monitor_updates.len());
2013+
let new_mon_id = self.pending_monitor_updates.get(insert_pos)
2014+
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
2015+
monitor_update.update_id = new_mon_id;
2016+
self.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
2017+
update: monitor_update, blocked: false,
2018+
});
2019+
for held_update in self.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
2020+
held_update.update.update_id += 1;
2021+
}
2022+
if msg.is_some() {
2023+
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
2024+
let update = self.build_commitment_no_status_check(logger);
2025+
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2026+
update, blocked: true,
2027+
});
2028+
}
2029+
insert_pos
2030+
};
2031+
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
19912032
UpdateFulfillCommitFetch::NewClaim {
1992-
monitor_update: self.pending_monitor_updates.last().unwrap(),
2033+
monitor_update: &self.pending_monitor_updates.get(unblocked_update_pos)
2034+
.expect("We just pushed the monitor update").update,
19932035
htlc_value_msat,
19942036
}
19952037
},
1996-
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
1997-
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
1998-
self.pending_monitor_updates.push(monitor_update);
1999-
UpdateFulfillCommitFetch::NewClaim {
2000-
monitor_update: self.pending_monitor_updates.last().unwrap(),
2001-
htlc_value_msat,
2002-
}
2003-
}
20042038
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
20052039
}
20062040
}
@@ -3068,7 +3102,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30683102
Ok(())
30693103
}
30703104

3071-
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
3105+
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
30723106
where L::Target: Logger
30733107
{
30743108
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3268,8 +3302,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
32683302
}
32693303
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
32703304
log_bytes!(self.channel_id));
3271-
self.pending_monitor_updates.push(monitor_update);
3272-
return Ok(self.pending_monitor_updates.last().unwrap());
3305+
return Ok(self.push_ret_blockable_mon_update(monitor_update));
32733306
}
32743307

32753308
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
@@ -3286,9 +3319,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
32863319

32873320
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
32883321
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
3289-
self.pending_monitor_updates.push(monitor_update);
32903322
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
3291-
return Ok(self.pending_monitor_updates.last().unwrap());
3323+
return Ok(self.push_ret_blockable_mon_update(monitor_update));
32923324
}
32933325

32943326
/// Public version of the below, checking relevant preconditions first.
@@ -3403,8 +3435,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
34033435
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
34043436

34053437
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3406-
self.pending_monitor_updates.push(monitor_update);
3407-
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
3438+
(self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail)
34083439
} else {
34093440
(None, Vec::new())
34103441
}
@@ -3415,7 +3446,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
34153446
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
34163447
/// generating an appropriate error *after* the channel state has been updated based on the
34173448
/// revoke_and_ack message.
3418-
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
3449+
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
34193450
where L::Target: Logger,
34203451
{
34213452
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3612,21 +3643,19 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
36123643
self.monitor_pending_failures.append(&mut revoked_htlcs);
36133644
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
36143645
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
3615-
self.pending_monitor_updates.push(monitor_update);
3616-
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
3646+
return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update)));
36173647
}
36183648

36193649
match self.free_holding_cell_htlcs(logger) {
36203650
(Some(_), htlcs_to_fail) => {
3621-
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
3651+
let mut additional_update = self.pending_monitor_updates.pop().unwrap().update;
36223652
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
36233653
// strictly increasing by one, so decrement it here.
36243654
self.latest_monitor_update_id = monitor_update.update_id;
36253655
monitor_update.updates.append(&mut additional_update.updates);
36263656

36273657
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3628-
self.pending_monitor_updates.push(monitor_update);
3629-
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
3658+
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
36303659
},
36313660
(None, htlcs_to_fail) => {
36323661
if require_commitment {
@@ -3640,13 +3669,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
36403669
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
36413670
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
36423671
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3643-
self.pending_monitor_updates.push(monitor_update);
3644-
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
3672+
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
36453673
} else {
36463674
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
36473675
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3648-
self.pending_monitor_updates.push(monitor_update);
3649-
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
3676+
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
36503677
}
36513678
}
36523679
}
@@ -3835,7 +3862,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
38353862
{
38363863
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
38373864
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3838-
self.pending_monitor_updates.clear();
3865+
let mut found_blocked = false;
3866+
self.pending_monitor_updates.retain(|upd| {
3867+
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
3868+
if upd.blocked { found_blocked = true; }
3869+
upd.blocked
3870+
});
38393871

38403872
// If we're past (or at) the FundingSent stage on an outbound channel, try to
38413873
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4378,8 +4410,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
43784410
}],
43794411
};
43804412
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
4381-
self.pending_monitor_updates.push(monitor_update);
4382-
Some(self.pending_monitor_updates.last().unwrap())
4413+
if self.push_blockable_mon_update(monitor_update) {
4414+
self.pending_monitor_updates.last().map(|upd| &upd.update)
4415+
} else { None }
43834416
} else { None };
43844417
let shutdown = if send_shutdown {
43854418
Some(msgs::Shutdown {
@@ -4951,8 +4984,44 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
49514984
(self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
49524985
}
49534986

4954-
pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
4955-
self.pending_monitor_updates.first()
4987+
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
4988+
/// further blocked monitor update exists after the next.
4989+
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
4990+
for i in 0..self.pending_monitor_updates.len() {
4991+
if self.pending_monitor_updates[i].blocked {
4992+
self.pending_monitor_updates[i].blocked = false;
4993+
return Some((&self.pending_monitor_updates[i].update,
4994+
self.pending_monitor_updates.len() > i + 1));
4995+
}
4996+
}
4997+
None
4998+
}
4999+
5000+
/// Pushes a new monitor update into our monitor update queue, returning whether it should be
5001+
/// immediately given to the user for persisting or if it should be held as blocked.
5002+
fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
5003+
let release_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
5004+
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
5005+
update, blocked: !release_monitor
5006+
});
5007+
release_monitor
5008+
}
5009+
5010+
/// Pushes a new monitor update into our monitor update queue, returning a reference to it if
5011+
/// it should be immediately given to the user for persisting or `None` if it should be held as
5012+
/// blocked.
5013+
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
5014+
-> Option<&ChannelMonitorUpdate> {
5015+
let release_monitor = self.push_blockable_mon_update(update);
5016+
if release_monitor { self.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
5017+
}
5018+
5019+
pub fn no_monitor_updates_pending(&self) -> bool {
5020+
self.pending_monitor_updates.is_empty()
5021+
}
5022+
5023+
pub fn complete_one_mon_update(&mut self, update_id: u64) {
5024+
self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
49565025
}
49575026

49585027
/// Returns true if funding_created was sent/received.
@@ -6000,8 +6069,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
60006069
Some(_) => {
60016070
let monitor_update = self.build_commitment_no_status_check(logger);
60026071
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
6003-
self.pending_monitor_updates.push(monitor_update);
6004-
Ok(Some(self.pending_monitor_updates.last().unwrap()))
6072+
Ok(self.push_ret_blockable_mon_update(monitor_update))
60056073
},
60066074
None => Ok(None)
60076075
}
@@ -6090,8 +6158,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
60906158
}],
60916159
};
60926160
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
6093-
self.pending_monitor_updates.push(monitor_update);
6094-
Some(self.pending_monitor_updates.last().unwrap())
6161+
if self.push_blockable_mon_update(monitor_update) {
6162+
self.pending_monitor_updates.last().map(|upd| &upd.update)
6163+
} else { None }
60956164
} else { None };
60966165
let shutdown = msgs::Shutdown {
60976166
channel_id: self.channel_id,

lightning/src/ln/channelmanager.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1680,11 +1680,8 @@ macro_rules! handle_new_monitor_update {
16801680
res
16811681
},
16821682
ChannelMonitorUpdateStatus::Completed => {
1683-
if ($update_id == 0 || $chan.get_next_monitor_update()
1684-
.expect("We can't be processing a monitor update if it isn't queued")
1685-
.update_id == $update_id) &&
1686-
$chan.get_latest_monitor_update_id() == $update_id
1687-
{
1683+
$chan.complete_one_mon_update($update_id);
1684+
if $chan.no_monitor_updates_pending() {
16881685
handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
16891686
}
16901687
Ok(())
@@ -5131,11 +5128,13 @@ where
51315128
match peer_state.channel_by_id.entry(msg.channel_id) {
51325129
hash_map::Entry::Occupied(mut chan) => {
51335130
let funding_txo = chan.get().get_funding_txo();
5134-
let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
5135-
let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
5136-
let update_id = monitor_update.update_id;
5137-
handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
5138-
peer_state, per_peer_state, chan)
5131+
let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
5132+
if let Some(monitor_update) = monitor_update_opt {
5133+
let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
5134+
let update_id = monitor_update.update_id;
5135+
handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
5136+
peer_state, per_peer_state, chan)
5137+
} else { Ok(()) }
51395138
},
51405139
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
51415140
}
@@ -5250,11 +5249,13 @@ where
52505249
match peer_state.channel_by_id.entry(msg.channel_id) {
52515250
hash_map::Entry::Occupied(mut chan) => {
52525251
let funding_txo = chan.get().get_funding_txo();
5253-
let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
5254-
let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
5255-
let update_id = monitor_update.update_id;
5256-
let res = handle_new_monitor_update!(self, update_res, update_id,
5257-
peer_state_lock, peer_state, per_peer_state, chan);
5252+
let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
5253+
let res = if let Some(monitor_update) = monitor_update_opt {
5254+
let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
5255+
let update_id = monitor_update.update_id;
5256+
handle_new_monitor_update!(self, update_res, update_id,
5257+
peer_state_lock, peer_state, per_peer_state, chan)
5258+
} else { Ok(()) };
52585259
(htlcs_to_fail, res)
52595260
},
52605261
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))

0 commit comments

Comments
 (0)