diff --git a/src/kbucket/bucket.rs b/src/kbucket/bucket.rs index ea5fd8a2..b943f966 100644 --- a/src/kbucket/bucket.rs +++ b/src/kbucket/bucket.rs @@ -410,7 +410,7 @@ where // Adjust `first_connected_pos` accordingly. match old_status.state { ConnectionState::Connected => { - if (self.first_connected_pos == Some(pos.0)) && pos.0 == self.nodes.len() { + if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() { // It was the last connected node. self.first_connected_pos = None } diff --git a/src/service.rs b/src/service.rs index 2661f846..5062bdb4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1504,7 +1504,7 @@ impl Service { _ => connection_direction, }; - debug!(node = %node_id, %direction, "Session established with Node"); + debug!(node = %node_id, %direction, %socket, "Session established with Node"); self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction)); } @@ -1616,13 +1616,16 @@ impl Service { let Some(ip_votes) = self.ip_votes.as_mut() else { return false; }; - match (ip_votes.majority(), is_ipv6) { + // Here we check the number of non-expired votes, rather than the majority. As if the + // local router is not SNAT'd we can have many votes but none for the same port and we + // therefore do excessive pinging. + match (ip_votes.has_minimum_threshold(), is_ipv6) { // We don't have enough ipv4 votes, but this is an IPv4-only node. - ((None, Some(_)), false) | + ((false, true), false) | // We don't have enough ipv6 votes, but this is an IPv6 node. - ((Some(_), None), true) | + ((true, false), true) | // We don't have enough ipv6 or ipv4 nodes, ping this peer. - ((None, None), _,) => true, + ((false, false), _,) => true, // We have enough votes do nothing. ((_, _), _,) => false, } diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index c2222dbc..e6587f46 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -2,14 +2,17 @@ use enr::NodeId; use fnv::FnvHashMap; use std::{ collections::HashMap, + hash::Hash, net::{SocketAddr, SocketAddrV4, SocketAddrV6}, time::{Duration, Instant}, }; /// A collection of IP:Ports for our node reported from external peers. pub(crate) struct IpVote { - /// The current collection of IP:Port votes. - votes: HashMap, + /// The current collection of IP:Port votes for ipv4. + ipv4_votes: HashMap, + /// The current collection of IP:Port votes for ipv6. + ipv6_votes: HashMap, /// The minimum number of votes required before an IP/PORT is accepted. minimum_threshold: usize, /// The time votes remain valid. @@ -23,46 +26,86 @@ impl IpVote { panic!("Setting enr_peer_update_min to a value less than 2 will cause issues with discovery with peers behind NAT"); } IpVote { - votes: HashMap::new(), + ipv4_votes: HashMap::new(), + ipv6_votes: HashMap::new(), minimum_threshold, vote_duration, } } pub fn insert(&mut self, key: NodeId, socket: impl Into) { - self.votes - .insert(key, (socket.into(), Instant::now() + self.vote_duration)); + match socket.into() { + SocketAddr::V4(socket) => { + self.ipv4_votes + .insert(key, (socket, Instant::now() + self.vote_duration)); + } + SocketAddr::V6(socket) => { + self.ipv6_votes + .insert(key, (socket, Instant::now() + self.vote_duration)); + } + } } - /// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None. - pub fn majority(&mut self) -> (Option, Option) { - // remove any expired votes + /// Returns true if we have more than the minimum number of non-expired votes for a given ip + /// version. + pub fn has_minimum_threshold(&mut self) -> (bool, bool) { let instant = Instant::now(); - self.votes.retain(|_, v| v.1 > instant); - - // count votes, take majority - let mut ip4_count: FnvHashMap = FnvHashMap::default(); - let mut ip6_count: FnvHashMap = FnvHashMap::default(); - for (socket, _) in self.votes.values() { - // NOTE: here we depend on addresses being already cleaned up. No mapped or compat - // addresses should be present. This is done in the codec. - match socket { - SocketAddr::V4(socket) => *ip4_count.entry(*socket).or_insert_with(|| 0) += 1, - SocketAddr::V6(socket) => *ip6_count.entry(*socket).or_insert_with(|| 0) += 1, + self.ipv4_votes.retain(|_, v| v.1 > instant); + self.ipv6_votes.retain(|_, v| v.1 > instant); + + ( + self.ipv4_votes.len() >= self.minimum_threshold, + self.ipv6_votes.len() >= self.minimum_threshold, + ) + } + + /// Filter the stale votes and return the majority `SocketAddr` if it exists. + /// If there are not enough votes to meet the threshold this returns None. + fn filter_stale_find_most_frequent( + votes: &HashMap, + minimum_threshold: usize, + ) -> (HashMap, Option) { + let mut updated = HashMap::default(); + let mut counter: FnvHashMap = FnvHashMap::default(); + let mut max: Option<(K, usize)> = None; + let now = Instant::now(); + + for (node_id, (vote, instant)) in votes { + // Discard stale votes. + if instant <= &now { + continue; + } + updated.insert(*node_id, (*vote, *instant)); + + let count = counter.entry(*vote).or_default(); + *count += 1; + let current_max = max.map(|(_v, m)| m).unwrap_or_default(); + if *count >= current_max && *count >= minimum_threshold { + max = Some((*vote, *count)); } } - // find the maximum socket addr - let ip4_majority = majority(ip4_count.into_iter(), &self.minimum_threshold); - let ip6_majority = majority(ip6_count.into_iter(), &self.minimum_threshold); - (ip4_majority, ip6_majority) + (updated, max.map(|m| m.0)) } -} -fn majority(iter: impl Iterator, threshold: &usize) -> Option { - iter.filter(|(_k, count)| count >= threshold) - .max_by_key(|(_k, count)| *count) - .map(|(k, _count)| k) + /// Returns the majority `SocketAddr`'s of both IPv4 and IPv6 if they exist. If there are not enough votes to meet the threshold this returns None for each stack. + pub fn majority(&mut self) -> (Option, Option) { + let (updated_ipv4_votes, ipv4_majority) = Self::filter_stale_find_most_frequent::< + SocketAddrV4, + >( + &self.ipv4_votes, self.minimum_threshold + ); + self.ipv4_votes = updated_ipv4_votes; + + let (updated_ipv6_votes, ipv6_majority) = Self::filter_stale_find_most_frequent::< + SocketAddrV6, + >( + &self.ipv6_votes, self.minimum_threshold + ); + self.ipv6_votes = updated_ipv6_votes; + + (ipv4_majority, ipv6_majority) + } } #[cfg(test)] @@ -88,7 +131,8 @@ mod tests { votes.insert(NodeId::random(), socket_3); votes.insert(NodeId::random(), socket_3); - assert_eq!(votes.majority(), (Some(socket_2), None)); + // Assert that in a draw situation a majority is still chosen. + assert!(votes.majority().0.is_some()); } #[test]