@@ -5739,6 +5739,64 @@ where
5739
5739
self . pending_outbound_payments . clear_pending_payments ( )
5740
5740
}
5741
5741
5742
+ /// (Partially) handles an [`EventCompletionAction`]. If `true` is returned the same `action`
5743
+ /// should be processed again.
5744
+ fn handle_post_event_action ( & self , action : & EventCompletionAction ) -> bool {
5745
+ match action {
5746
+ EventCompletionAction :: ReleaseRAAChannelMonitorUpdate {
5747
+ channel_funding_outpoint, counterparty_node_id
5748
+ } => {
5749
+ let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5750
+ if let Some ( peer_state_mtx) = per_peer_state. get ( & counterparty_node_id) {
5751
+ let mut peer_state_lck = peer_state_mtx. lock ( ) . unwrap ( ) ;
5752
+ let peer_state = & mut * peer_state_lck;
5753
+ if self . pending_events . lock ( ) . unwrap ( ) . iter ( )
5754
+ . any ( |( _ev, action_opt) | action_opt. as_ref ( ) == Some ( & action) )
5755
+ {
5756
+ // Check that, while holding the peer lock, we don't have another event
5757
+ // blocking any monitor updates for this channel. If we do, let those
5758
+ // events be the ones that ultimately release the monitor update(s).
5759
+ log_trace ! ( self . logger, "Delaying monitor unlock for channel {} as another event is pending" ,
5760
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5761
+ return false ;
5762
+ }
5763
+ if let hash_map:: Entry :: Occupied ( mut chan) = peer_state. channel_by_id . entry ( channel_funding_outpoint. to_channel_id ( ) ) {
5764
+ debug_assert_eq ! ( chan. get( ) . get_funding_txo( ) . unwrap( ) , * channel_funding_outpoint) ;
5765
+ if let Some ( ( monitor_update, further_update_exists) ) = chan. get_mut ( ) . fly_next_unflown_monitor_update ( ) {
5766
+ log_debug ! ( self . logger, "Unlocking monitor updating for channel {} and updating monitor" ,
5767
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5768
+ let update_res = self . chain_monitor . update_channel ( * channel_funding_outpoint, monitor_update) ;
5769
+ let update_id = monitor_update. update_id ;
5770
+ let _ = handle_error ! ( self ,
5771
+ handle_new_monitor_update!( self , update_res, update_id,
5772
+ peer_state_lck, peer_state, per_peer_state, chan) ,
5773
+ * counterparty_node_id) ;
5774
+ if further_update_exists {
5775
+ return true ;
5776
+ }
5777
+ } else {
5778
+ log_trace ! ( self . logger, "Unlocked monitor updating for channel {} without monitors to update" ,
5779
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5780
+ }
5781
+ }
5782
+ } else {
5783
+ log_debug ! ( self . logger,
5784
+ "Got a release post-RAA monitor update for peer {} but the channel is gone" ,
5785
+ log_pubkey!( * counterparty_node_id) ) ;
5786
+ }
5787
+ } ,
5788
+ }
5789
+ false
5790
+ }
5791
+
5792
+ fn handle_post_event_actions ( & self , actions : Vec < EventCompletionAction > ) {
5793
+ for action in actions {
5794
+ loop {
5795
+ if !self . handle_post_event_action ( & action) { break ; }
5796
+ }
5797
+ }
5798
+ }
5799
+
5742
5800
/// Processes any events asynchronously in the order they were generated since the last call
5743
5801
/// using the given event handler.
5744
5802
///
@@ -5759,22 +5817,34 @@ where
5759
5817
}
5760
5818
5761
5819
let _single_processor = self . pending_events_processor . lock ( ) . unwrap ( ) ;
5762
- let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5763
5820
loop {
5764
- if let Some ( ( event, _action) ) = next_event {
5765
- result = NotifyOption :: DoPersist ;
5766
- let ev_clone;
5767
- #[ cfg( debug_assertions) ] {
5768
- ev_clone = event. clone ( ) ;
5821
+ let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5822
+ let mut post_event_actions = Vec :: new ( ) ;
5823
+ loop {
5824
+ if let Some ( ( event, action_opt) ) = next_event {
5825
+ result = NotifyOption :: DoPersist ;
5826
+ let _ev_clone: Event ;
5827
+ #[ cfg( debug_assertions) ] {
5828
+ _ev_clone = event. clone ( ) ;
5829
+ }
5830
+ handler ( event) . await ;
5831
+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5832
+ #[ cfg( debug_assertions) ] {
5833
+ debug_assert_eq ! ( _ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5834
+ }
5835
+ debug_assert_eq ! ( action_opt, pending_events. front( ) . unwrap( ) . 1 ) ;
5836
+ if let Some ( action) = action_opt {
5837
+ post_event_actions. push ( action) ;
5838
+ }
5839
+ pending_events. pop_front ( ) ;
5840
+ next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5841
+ } else {
5842
+ break ;
5769
5843
}
5770
- handler ( event) . await ;
5771
- let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5772
- debug_assert_eq ! ( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5773
- pending_events. pop_front ( ) ;
5774
- next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5775
- } else {
5776
- break ;
5777
5844
}
5845
+ if post_event_actions. is_empty ( ) { break ; }
5846
+ self . handle_post_event_actions ( post_event_actions) ;
5847
+ // If we had some actions, go around again as we may have more events now
5778
5848
}
5779
5849
5780
5850
if result == NotifyOption :: DoPersist {
@@ -5871,22 +5941,34 @@ where
5871
5941
}
5872
5942
5873
5943
let _single_processor = self . pending_events_processor . lock ( ) . unwrap ( ) ;
5874
- let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5875
5944
loop {
5876
- if let Some ( ( event, _action) ) = next_event {
5877
- result = NotifyOption :: DoPersist ;
5878
- let ev_clone;
5879
- #[ cfg( debug_assertions) ] {
5880
- ev_clone = event. clone ( ) ;
5945
+ let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5946
+ let mut post_event_actions = Vec :: new ( ) ;
5947
+ loop {
5948
+ if let Some ( ( event, action_opt) ) = next_event {
5949
+ result = NotifyOption :: DoPersist ;
5950
+ let _ev_clone: Event ;
5951
+ #[ cfg( debug_assertions) ] {
5952
+ _ev_clone = event. clone ( ) ;
5953
+ }
5954
+ handler. handle_event ( event) ;
5955
+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5956
+ #[ cfg( debug_assertions) ] {
5957
+ debug_assert_eq ! ( _ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5958
+ }
5959
+ debug_assert_eq ! ( action_opt, pending_events. front( ) . unwrap( ) . 1 ) ;
5960
+ if let Some ( action) = action_opt {
5961
+ post_event_actions. push ( action) ;
5962
+ }
5963
+ pending_events. pop_front ( ) ;
5964
+ next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5965
+ } else {
5966
+ break ;
5881
5967
}
5882
- handler. handle_event ( event) ;
5883
- let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5884
- debug_assert_eq ! ( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5885
- pending_events. pop_front ( ) ;
5886
- next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5887
- } else {
5888
- break ;
5889
5968
}
5969
+ if post_event_actions. is_empty ( ) { break ; }
5970
+ self . handle_post_event_actions ( post_event_actions) ;
5971
+ // If we had some actions, go around again as we may have more events now
5890
5972
}
5891
5973
5892
5974
result
0 commit comments