@@ -376,14 +376,6 @@ impl Peer {
376
376
}
377
377
}
378
378
379
- struct PeerHolder < Descriptor : SocketDescriptor > {
380
- /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold
381
- /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`]
382
- /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages
383
- /// handed to the `MessageHandler`s for a given peer is already guaranteed.
384
- peers : HashMap < Descriptor , Mutex < Peer > > ,
385
- }
386
-
387
379
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
388
380
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
389
381
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@@ -428,7 +420,15 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
428
420
L :: Target : Logger ,
429
421
CMH :: Target : CustomMessageHandler {
430
422
message_handler : MessageHandler < CM , RM > ,
431
- peers : FairRwLock < PeerHolder < Descriptor > > ,
423
+ /// Connection state for each connected peer - we have an outer read-write lock which is taken
424
+ /// as read while we're doing processing for a peer and taken write when a peer is being added
425
+ /// or removed.
426
+ ///
427
+ /// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold
428
+ /// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires
429
+ /// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to
430
+ /// the `MessageHandler`s for a given peer is already guaranteed.
431
+ peers : FairRwLock < HashMap < Descriptor , Mutex < Peer > > > ,
432
432
/// Only add to this set when noise completes.
433
433
/// Locked *after* peers. When an item is removed, it must be removed with the `peers` write
434
434
/// lock held. Entries may be added with only the `peers` read lock held (though the
@@ -570,9 +570,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
570
570
571
571
PeerManager {
572
572
message_handler,
573
- peers : FairRwLock :: new ( PeerHolder {
574
- peers : HashMap :: new ( ) ,
575
- } ) ,
573
+ peers : FairRwLock :: new ( HashMap :: new ( ) ) ,
576
574
node_id_to_descriptor : Mutex :: new ( HashMap :: new ( ) ) ,
577
575
event_processing_lock : Mutex :: new ( ( ) ) ,
578
576
blocked_event_processors : AtomicBool :: new ( false ) ,
@@ -591,7 +589,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
591
589
/// completed and we are sure the remote peer has the private key for the given node_id.
592
590
pub fn get_peer_node_ids ( & self ) -> Vec < PublicKey > {
593
591
let peers = self . peers . read ( ) . unwrap ( ) ;
594
- peers. peers . values ( ) . filter_map ( |peer_mutex| {
592
+ peers. values ( ) . filter_map ( |peer_mutex| {
595
593
let p = peer_mutex. lock ( ) . unwrap ( ) ;
596
594
if !p. channel_encryptor . is_ready_for_encryption ( ) || p. their_features . is_none ( ) {
597
595
return None ;
@@ -629,7 +627,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
629
627
let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act two is 50 bytes
630
628
631
629
let mut peers = self . peers . write ( ) . unwrap ( ) ;
632
- if peers. peers . insert ( descriptor, Mutex :: new ( Peer {
630
+ if peers. insert ( descriptor, Mutex :: new ( Peer {
633
631
channel_encryptor : peer_encryptor,
634
632
their_node_id : None ,
635
633
their_features : None ,
@@ -676,7 +674,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
676
674
let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act one is 50 bytes
677
675
678
676
let mut peers = self . peers . write ( ) . unwrap ( ) ;
679
- if peers. peers . insert ( descriptor, Mutex :: new ( Peer {
677
+ if peers. insert ( descriptor, Mutex :: new ( Peer {
680
678
channel_encryptor : peer_encryptor,
681
679
their_node_id : None ,
682
680
their_features : None ,
@@ -787,7 +785,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
787
785
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
788
786
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
789
787
let peers = self . peers . read ( ) . unwrap ( ) ;
790
- match peers. peers . get ( descriptor) {
788
+ match peers. get ( descriptor) {
791
789
None => {
792
790
// This is most likely a simple race condition where the user found that the socket
793
791
// was writeable, then we told the user to `disconnect_socket()`, then they called
@@ -852,7 +850,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
852
850
let peers = self . peers . read ( ) . unwrap ( ) ;
853
851
let mut msgs_to_forward = Vec :: new ( ) ;
854
852
let mut peer_node_id = None ;
855
- match peers. peers . get ( peer_descriptor) {
853
+ match peers. get ( peer_descriptor) {
856
854
None => {
857
855
// This is most likely a simple race condition where the user read some bytes
858
856
// from the socket, then we told the user to `disconnect_socket()`, then they
@@ -1288,13 +1286,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1288
1286
Ok ( should_forward)
1289
1287
}
1290
1288
1291
- fn forward_broadcast_msg ( & self , peers : & PeerHolder < Descriptor > , msg : & wire:: Message < <<CMH as core:: ops:: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1289
+ fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as core:: ops:: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1292
1290
match msg {
1293
1291
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1294
1292
log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}" , except_node, msg) ;
1295
1293
let encoded_msg = encode_msg ! ( msg) ;
1296
1294
1297
- for ( _, peer_mutex) in peers. peers . iter ( ) {
1295
+ for ( _, peer_mutex) in peers. iter ( ) {
1298
1296
let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1299
1297
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1300
1298
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
@@ -1320,7 +1318,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1320
1318
log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced node: {:?}" , except_node, msg) ;
1321
1319
let encoded_msg = encode_msg ! ( msg) ;
1322
1320
1323
- for ( _, peer_mutex) in peers. peers . iter ( ) {
1321
+ for ( _, peer_mutex) in peers. iter ( ) {
1324
1322
let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1325
1323
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1326
1324
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
@@ -1345,7 +1343,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1345
1343
log_gossip ! ( self . logger, "Sending message to all peers except {:?}: {:?}" , except_node, msg) ;
1346
1344
let encoded_msg = encode_msg ! ( msg) ;
1347
1345
1348
- for ( _, peer_mutex) in peers. peers . iter ( ) {
1346
+ for ( _, peer_mutex) in peers. iter ( ) {
1349
1347
let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1350
1348
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1351
1349
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
@@ -1426,7 +1424,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1426
1424
}
1427
1425
let descriptor_opt = self . node_id_to_descriptor. lock( ) . unwrap( ) . get( $node_id) . cloned( ) ;
1428
1426
match descriptor_opt {
1429
- Some ( descriptor) => match peers. peers . get( & descriptor) {
1427
+ Some ( descriptor) => match peers. get( & descriptor) {
1430
1428
Some ( peer_mutex) => {
1431
1429
let peer_lock = peer_mutex. lock( ) . unwrap( ) ;
1432
1430
if peer_lock. their_features. is_none( ) {
@@ -1625,7 +1623,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1625
1623
self . enqueue_message ( & mut * get_peer_for_forwarding ! ( & node_id) , & msg) ;
1626
1624
}
1627
1625
1628
- for ( descriptor, peer_mutex) in peers. peers . iter ( ) {
1626
+ for ( descriptor, peer_mutex) in peers. iter ( ) {
1629
1627
self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1630
1628
}
1631
1629
}
@@ -1639,7 +1637,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1639
1637
// lock).
1640
1638
1641
1639
if let Some ( mut descriptor) = self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) {
1642
- if let Some ( peer_mutex) = peers. peers . remove ( & descriptor) {
1640
+ if let Some ( peer_mutex) = peers. remove ( & descriptor) {
1643
1641
if let Some ( msg) = msg {
1644
1642
log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
1645
1643
log_pubkey!( node_id) ,
@@ -1667,7 +1665,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1667
1665
1668
1666
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
1669
1667
let mut peers = self . peers . write ( ) . unwrap ( ) ;
1670
- let peer_option = peers. peers . remove ( descriptor) ;
1668
+ let peer_option = peers. remove ( descriptor) ;
1671
1669
match peer_option {
1672
1670
None => {
1673
1671
// This is most likely a simple race condition where the user found that the socket
@@ -1703,7 +1701,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1703
1701
let mut peers_lock = self . peers . write ( ) . unwrap ( ) ;
1704
1702
if let Some ( mut descriptor) = self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) {
1705
1703
log_trace ! ( self . logger, "Disconnecting peer with id {} due to client request" , node_id) ;
1706
- peers_lock. peers . remove ( & descriptor) ;
1704
+ peers_lock. remove ( & descriptor) ;
1707
1705
self . message_handler . chan_handler . peer_disconnected ( & node_id, no_connection_possible) ;
1708
1706
descriptor. disconnect_socket ( ) ;
1709
1707
}
@@ -1716,7 +1714,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1716
1714
let mut peers_lock = self . peers . write ( ) . unwrap ( ) ;
1717
1715
self . node_id_to_descriptor . lock ( ) . unwrap ( ) . clear ( ) ;
1718
1716
let peers = & mut * peers_lock;
1719
- for ( mut descriptor, peer) in peers. peers . drain ( ) {
1717
+ for ( mut descriptor, peer) in peers. drain ( ) {
1720
1718
if let Some ( node_id) = peer. lock ( ) . unwrap ( ) . their_node_id {
1721
1719
log_trace ! ( self . logger, "Disconnecting peer with id {} due to client request to disconnect all peers" , node_id) ;
1722
1720
self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
@@ -1755,7 +1753,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1755
1753
{
1756
1754
let peers_lock = self . peers . read ( ) . unwrap ( ) ;
1757
1755
1758
- for ( descriptor, peer_mutex) in peers_lock. peers . iter ( ) {
1756
+ for ( descriptor, peer_mutex) in peers_lock. iter ( ) {
1759
1757
let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1760
1758
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_node_id . is_none ( ) {
1761
1759
// The peer needs to complete its handshake before we can exchange messages. We
@@ -1779,7 +1777,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1779
1777
1780
1778
if ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1781
1779
|| peer. awaiting_pong_timer_tick_intervals as u64 >
1782
- MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock. peers . len ( ) as u64
1780
+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock. len ( ) as u64
1783
1781
{
1784
1782
descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1785
1783
continue ;
@@ -1805,7 +1803,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1805
1803
{
1806
1804
let mut peers_lock = self . peers . write ( ) . unwrap ( ) ;
1807
1805
for descriptor in descriptors_needing_disconnect. iter ( ) {
1808
- if let Some ( peer) = peers_lock. peers . remove ( & descriptor) {
1806
+ if let Some ( peer) = peers_lock. remove ( descriptor) {
1809
1807
if let Some ( node_id) = peer. lock ( ) . unwrap ( ) . their_node_id {
1810
1808
log_trace ! ( self . logger, "Disconnecting peer with id {} due to ping timeout" , node_id) ;
1811
1809
self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) ;
@@ -1935,7 +1933,7 @@ mod tests {
1935
1933
let chan_handler = test_utils:: TestChannelMessageHandler :: new ( ) ;
1936
1934
let mut peers = create_network ( 2 , & cfgs) ;
1937
1935
establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1938
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 1 ) ;
1936
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 1 ) ;
1939
1937
1940
1938
let secp_ctx = Secp256k1 :: new ( ) ;
1941
1939
let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peers[ 1 ] . our_node_secret ) ;
@@ -1948,7 +1946,7 @@ mod tests {
1948
1946
peers[ 0 ] . message_handler . chan_handler = & chan_handler;
1949
1947
1950
1948
peers[ 0 ] . process_events ( ) ;
1951
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 0 ) ;
1949
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 0 ) ;
1952
1950
}
1953
1951
1954
1952
#[ test]
@@ -1957,17 +1955,17 @@ mod tests {
1957
1955
let cfgs = create_peermgr_cfgs ( 2 ) ;
1958
1956
let peers = create_network ( 2 , & cfgs) ;
1959
1957
establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1960
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 1 ) ;
1958
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 1 ) ;
1961
1959
1962
1960
// peers[0] awaiting_pong is set to true, but the Peer is still connected
1963
1961
peers[ 0 ] . timer_tick_occurred ( ) ;
1964
1962
peers[ 0 ] . process_events ( ) ;
1965
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 1 ) ;
1963
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 1 ) ;
1966
1964
1967
1965
// Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
1968
1966
peers[ 0 ] . timer_tick_occurred ( ) ;
1969
1967
peers[ 0 ] . process_events ( ) ;
1970
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 0 ) ;
1968
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 0 ) ;
1971
1969
}
1972
1970
1973
1971
#[ test]
@@ -2029,9 +2027,9 @@ mod tests {
2029
2027
peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
2030
2028
2031
2029
// If we get a single timer tick before completion, that's fine
2032
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 1 ) ;
2030
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 1 ) ;
2033
2031
peers[ 0 ] . timer_tick_occurred ( ) ;
2034
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 1 ) ;
2032
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 1 ) ;
2035
2033
2036
2034
assert_eq ! ( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
2037
2035
peers[ 0 ] . process_events ( ) ;
@@ -2040,7 +2038,7 @@ mod tests {
2040
2038
2041
2039
// ...but if we get a second timer tick, we should disconnect the peer
2042
2040
peers[ 0 ] . timer_tick_occurred ( ) ;
2043
- assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . peers . len( ) , 0 ) ;
2041
+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . len( ) , 0 ) ;
2044
2042
2045
2043
assert ! ( peers[ 0 ] . read_event( & mut fd_a, & fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . is_err( ) ) ;
2046
2044
}
0 commit comments