@@ -1233,7 +1233,7 @@ enum NotifyOption {
1233
1233
/// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
1234
1234
/// notify or not based on whether relevant changes have been made, providing a closure to
1235
1235
/// `optionally_notify` which returns a `NotifyOption`.
1236
- struct PersistenceNotifierGuard<'a, F: Fn () -> NotifyOption> {
1236
+ struct PersistenceNotifierGuard<'a, F: FnMut () -> NotifyOption> {
1237
1237
event_persist_notifier: &'a Notifier,
1238
1238
needs_persist_flag: &'a AtomicBool,
1239
1239
should_persist: F,
@@ -1248,12 +1248,12 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
1248
1248
/// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in
1249
1249
/// other cases where losing the changes on restart may result in a force-close or otherwise
1250
1250
/// isn't ideal.
1251
- fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn () -> NotifyOption> {
1251
+ fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut () -> NotifyOption> {
1252
1252
Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist })
1253
1253
}
1254
1254
1255
- fn optionally_notify<F: Fn () -> NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F)
1256
- -> PersistenceNotifierGuard<'a, impl Fn () -> NotifyOption> {
1255
+ fn optionally_notify<F: FnMut () -> NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F)
1256
+ -> PersistenceNotifierGuard<'a, impl FnMut () -> NotifyOption> {
1257
1257
let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
1258
1258
let force_notify = cm.get_cm().process_background_events();
1259
1259
@@ -1292,7 +1292,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
1292
1292
}
1293
1293
}
1294
1294
1295
- impl<'a, F: Fn () -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
1295
+ impl<'a, F: FnMut () -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
1296
1296
fn drop(&mut self) {
1297
1297
match (self.should_persist)() {
1298
1298
NotifyOption::DoPersist => {
@@ -7778,76 +7778,82 @@ where
7778
7778
return Err(());
7779
7779
}
7780
7780
7781
- let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self );
7781
+ let mut res = Ok(() );
7782
7782
7783
- // If we have too many peers connected which don't have funded channels, disconnect the
7784
- // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
7785
- // unfunded channels taking up space in memory for disconnected peers, we still let new
7786
- // peers connect, but we'll reject new channels from them.
7787
- let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
7788
- let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
7783
+ PersistenceNotifierGuard::optionally_notify(self, || {
7784
+ // If we have too many peers connected which don't have funded channels, disconnect the
7785
+ // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
7786
+ // unfunded channels taking up space in memory for disconnected peers, we still let new
7787
+ // peers connect, but we'll reject new channels from them.
7788
+ let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
7789
+ let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
7789
7790
7790
- {
7791
- let mut peer_state_lock = self.per_peer_state.write().unwrap();
7792
- match peer_state_lock.entry(counterparty_node_id.clone()) {
7793
- hash_map::Entry::Vacant(e) => {
7794
- if inbound_peer_limited {
7795
- return Err(());
7796
- }
7797
- e.insert(Mutex::new(PeerState {
7798
- channel_by_id: HashMap::new(),
7799
- inbound_channel_request_by_id: HashMap::new(),
7800
- latest_features: init_msg.features.clone(),
7801
- pending_msg_events: Vec::new(),
7802
- in_flight_monitor_updates: BTreeMap::new(),
7803
- monitor_update_blocked_actions: BTreeMap::new(),
7804
- actions_blocking_raa_monitor_updates: BTreeMap::new(),
7805
- is_connected: true,
7806
- }));
7807
- },
7808
- hash_map::Entry::Occupied(e) => {
7809
- let mut peer_state = e.get().lock().unwrap();
7810
- peer_state.latest_features = init_msg.features.clone();
7811
-
7812
- let best_block_height = self.best_block.read().unwrap().height();
7813
- if inbound_peer_limited &&
7814
- Self::unfunded_channel_count(&*peer_state, best_block_height) ==
7815
- peer_state.channel_by_id.len()
7816
- {
7817
- return Err(());
7818
- }
7791
+ {
7792
+ let mut peer_state_lock = self.per_peer_state.write().unwrap();
7793
+ match peer_state_lock.entry(counterparty_node_id.clone()) {
7794
+ hash_map::Entry::Vacant(e) => {
7795
+ if inbound_peer_limited {
7796
+ res = Err(());
7797
+ return NotifyOption::SkipPersistNoEvents;
7798
+ }
7799
+ e.insert(Mutex::new(PeerState {
7800
+ channel_by_id: HashMap::new(),
7801
+ inbound_channel_request_by_id: HashMap::new(),
7802
+ latest_features: init_msg.features.clone(),
7803
+ pending_msg_events: Vec::new(),
7804
+ in_flight_monitor_updates: BTreeMap::new(),
7805
+ monitor_update_blocked_actions: BTreeMap::new(),
7806
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
7807
+ is_connected: true,
7808
+ }));
7809
+ },
7810
+ hash_map::Entry::Occupied(e) => {
7811
+ let mut peer_state = e.get().lock().unwrap();
7812
+ peer_state.latest_features = init_msg.features.clone();
7813
+
7814
+ let best_block_height = self.best_block.read().unwrap().height();
7815
+ if inbound_peer_limited &&
7816
+ Self::unfunded_channel_count(&*peer_state, best_block_height) ==
7817
+ peer_state.channel_by_id.len()
7818
+ {
7819
+ res = Err(());
7820
+ return NotifyOption::SkipPersistNoEvents;
7821
+ }
7819
7822
7820
- debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
7821
- peer_state.is_connected = true;
7822
- },
7823
+ debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
7824
+ peer_state.is_connected = true;
7825
+ },
7826
+ }
7823
7827
}
7824
- }
7825
7828
7826
- log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
7829
+ log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
7827
7830
7828
- let per_peer_state = self.per_peer_state.read().unwrap();
7829
- if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
7830
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
7831
- let peer_state = &mut *peer_state_lock;
7832
- let pending_msg_events = &mut peer_state.pending_msg_events;
7831
+ let per_peer_state = self.per_peer_state.read().unwrap();
7832
+ if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
7833
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
7834
+ let peer_state = &mut *peer_state_lock;
7835
+ let pending_msg_events = &mut peer_state.pending_msg_events;
7833
7836
7834
- peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
7835
- if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
7836
- // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
7837
- // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
7838
- // worry about closing and removing them.
7839
- debug_assert!(false);
7840
- None
7841
- }
7842
- ).for_each(|chan| {
7843
- pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
7844
- node_id: chan.context.get_counterparty_node_id(),
7845
- msg: chan.get_channel_reestablish(&self.logger),
7837
+ peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
7838
+ if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
7839
+ // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
7840
+ // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
7841
+ // worry about closing and removing them.
7842
+ debug_assert!(false);
7843
+ None
7844
+ }
7845
+ ).for_each(|chan| {
7846
+ pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
7847
+ node_id: chan.context.get_counterparty_node_id(),
7848
+ msg: chan.get_channel_reestablish(&self.logger),
7849
+ });
7846
7850
});
7847
- });
7848
- }
7849
- //TODO: Also re-broadcast announcement_signatures
7850
- Ok(())
7851
+ }
7852
+
7853
+ return NotifyOption::SkipPersistHandleEvents;
7854
+ //TODO: Also re-broadcast announcement_signatures
7855
+ });
7856
+ res
7851
7857
}
7852
7858
7853
7859
fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
0 commit comments