diff --git a/src/discv5.rs b/src/discv5.rs index 5b7d4553..90a0cf43 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -18,7 +18,7 @@ use crate::{ self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, NodeStatus, UpdateResult, }, - node_info::NodeContact, + node_info::{NodeAddress, NodeContact}, packet::ProtocolIdentity, service::{QueryKind, Service, ServiceRequest, TalkRequest}, Config, DefaultProtocolId, Enr, IpMode, @@ -74,6 +74,8 @@ pub enum Event { }, /// A new session has been established with a node. SessionEstablished(Enr, SocketAddr), + /// A session has been removed from our cache due to inactivity. + SessionsExpired(Vec), /// Our local ENR IP address has been updated. SocketUpdated(SocketAddr), /// A node has initiated a talk request. diff --git a/src/handler/mod.rs b/src/handler/mod.rs index ac566adf..66b2dcad 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -139,6 +139,8 @@ pub enum HandlerOut { socket: SocketAddr, node_id: NodeId, }, + /// These sessions have expired from the cache. + ExpiredSessions(Vec), } /// How we connected to the node. @@ -784,10 +786,10 @@ impl Handler { && match node_address.socket_addr { SocketAddr::V4(socket_addr) => enr .udp4_socket() - .map_or(true, |advertized_addr| socket_addr == advertized_addr), + .is_none_or(|advertized_addr| socket_addr == advertized_addr), SocketAddr::V6(socket_addr) => enr .udp6_socket() - .map_or(true, |advertized_addr| socket_addr == advertized_addr), + .is_none_or(|advertized_addr| socket_addr == advertized_addr), } } @@ -1231,6 +1233,9 @@ impl Handler { // handshake to re-establish a session, if applicable. message_nonce: Option, ) { + // Clear the session cache and report expired elements + self.remove_expired_sessions().await; + if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); // If a session is re-established, due to a new handshake during an ongoing @@ -1286,6 +1291,9 @@ impl Handler { remove_session: bool, ) { if remove_session { + // Remove expired sessions + self.remove_expired_sessions().await; + self.sessions.remove(node_address); METRICS .active_sessions @@ -1357,6 +1365,22 @@ impl Handler { .retain(|_, time| time.is_none() || Some(Instant::now()) < *time); } + /// Removes expired sessions and report them back to the service. + async fn remove_expired_sessions(&mut self) { + // Purge any expired sessions + let expired_sessions = self.sessions.remove_expired_values(); + + if !expired_sessions.is_empty() { + if let Err(e) = self + .service_send + .send(HandlerOut::ExpiredSessions(expired_sessions)) + .await + { + warn!(error = %e, "Failed to inform app of expired sessions") + } + } + } + /// Returns whether a session with this node does not exist and a request that initiates /// a session has been sent. fn is_awaiting_session_to_be_established(&mut self, node_address: &NodeAddress) -> bool { diff --git a/src/lru_time_cache.rs b/src/lru_time_cache.rs index 4eab65d2..f14c9fdb 100644 --- a/src/lru_time_cache.rs +++ b/src/lru_time_cache.rs @@ -37,17 +37,13 @@ impl LruTimeCache { } /// Retrieves a reference to the value stored under `key`, or `None` if the key doesn't exist. - /// Also removes expired elements and updates the time. - #[allow(dead_code)] pub fn get(&mut self, key: &K) -> Option<&V> { self.get_mut(key).map(|value| &*value) } /// Retrieves a mutable reference to the value stored under `key`, or `None` if the key doesn't exist. - /// Also removes expired elements and updates the time. pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { let now = Instant::now(); - self.remove_expired_values(now); match self.map.raw_entry_mut().from_key(key) { hashlink::linked_hash_map::RawEntryMut::Occupied(mut occupied) => { @@ -61,7 +57,6 @@ impl LruTimeCache { /// Returns a reference to the value with the given `key`, if present and not expired, without /// updating the timestamp. - #[allow(dead_code)] pub fn peek(&self, key: &K) -> Option<&V> { if let Some((value, time)) = self.map.get(key) { return if *time + self.ttl >= Instant::now() { @@ -76,7 +71,6 @@ impl LruTimeCache { /// Returns the size of the cache, i.e. the number of cached non-expired key-value pairs. pub fn len(&mut self) -> usize { - self.remove_expired_values(Instant::now()); self.map.len() } @@ -87,13 +81,19 @@ impl LruTimeCache { } /// Removes expired items from the cache. - fn remove_expired_values(&mut self, now: Instant) { + pub fn remove_expired_values(&mut self) -> Vec { + let mut expired_elements = Vec::new(); + let now = Instant::now(); while let Some((_front, (_value, time))) = self.map.front() { if *time + self.ttl >= now { break; } - self.map.pop_front(); + // Store the expired key + if let Some((k, _v)) = self.map.pop_front() { + expired_elements.push(k); + } } + expired_elements } } diff --git a/src/service.rs b/src/service.rs index 5062bdb4..f2640ab4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -416,6 +416,9 @@ impl Service { } self.send_event(Event::UnverifiableEnr{enr, socket, node_id}); } + HandlerOut::ExpiredSessions(expired_sessions) => { + self.send_event(Event::SessionsExpired(expired_sessions)); + } } } event = Service::bucket_maintenance_poll(&self.kbuckets) => {